Source code for libqtile.widget.statusnotifier

# Copyright (c) 2021 elParaguayo
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
import os
from functools import partial

# dbus_next is incompatible with deferred type evaluation
from typing import Callable, List, Optional

import cairocffi
from dbus_next import InterfaceNotFoundError, InvalidBusNameError, InvalidObjectPathError
from dbus_next.aio import MessageBus
from dbus_next.constants import PropertyAccess
from dbus_next.errors import DBusError
from dbus_next.service import ServiceInterface, dbus_property, method, signal

try:
    from xdg.IconTheme import getIconPath

    has_xdg = True
except ImportError:
    has_xdg = False

from libqtile import bar
from libqtile.images import Img
from libqtile.log_utils import logger
from libqtile.resources.status_notifier.statusnotifieritem import STATUS_NOTIFIER_ITEM_SPEC
from libqtile.utils import add_signal_receiver, create_task
from libqtile.widget import base

# StatusNotifier seems to have two potential interface names.
# While KDE appears to be the default, we should also listen
# for items on freedesktop.
BUS_NAMES = ["org.kde.StatusNotifierWatcher", "org.freedesktop.StatusNotifierWatcher"]

ITEM_INTERFACES = ["org.kde.StatusNotifierItem", "org.freedesktop.StatusNotifierItem"]

STATUSNOTIFIER_PATH = "/StatusNotifierItem"
PROTOCOL_VERSION = 0


class StatusNotifierItem:  # noqa: E303
    """
    Class object which represents an StatusNotiferItem object.

    The item is responsible for interacting with the
    application.
    """

    icon_map = {
        "Icon": ("_icon", "get_icon_pixmap"),
        "Attention": ("_attention_icon", "get_attention_icon_pixmap"),
        "Overlay": ("_overlay_icon", "get_overlay_icon_pixmap"),
    }

    def __init__(self, bus, service, path=None, icon_theme=None):
        self.bus = bus
        self.service = service
        self.surfaces = {}
        self._pixmaps = {}
        self._icon = None
        self._overlay_icon = None
        self._attention_icon = None
        self.on_icon_changed = None
        self.icon_theme = icon_theme
        self.icon = None
        self.path = path if path else STATUSNOTIFIER_PATH

    def __eq__(self, other):
        # Convenience method to find Item in list by service path
        if isinstance(other, StatusNotifierItem):
            return other.service == self.service
        elif isinstance(other, str):
            return other == self.service
        else:
            return False

    async def start(self):
        # Create a proxy object connecting for the item.
        # Some apps provide the incorrect path to the StatusNotifier object
        # We can try falling back to the default if that fails.
        # See: https://github.com/qtile/qtile/issues/3418
        # Note: this loop will run a maximum of two times and returns False
        # if the no object is available.
        found_path = False

        while not found_path:
            try:
                introspection = await self.bus.introspect(self.service, self.path)
                found_path = True
            except InvalidBusNameError:
                # This is probably an Ayatana indicator which doesn't provide the service name.
                # We'll pick it up via the message handler so we can ignore this.
                return False
            except InvalidObjectPathError:
                logger.info("Cannot find %s path on %s.", self.path, self.service)
                if self.path == STATUSNOTIFIER_PATH:
                    return False

                # Try the default ('/StatusNotifierItem')
                self.path = STATUSNOTIFIER_PATH

        try:
            obj = self.bus.get_proxy_object(self.service, self.path, introspection)
        except InvalidBusNameError:
            return False

        # Try to connect to the bus object and verify there's a valid
        # interface available
        # TODO: This may not ever fail given we've specified the underying
        # schema so dbus-next has not attempted any introspection.
        interface_found = False
        for interface in ITEM_INTERFACES:
            try:
                self.item = obj.get_interface(interface)
                interface_found = True
                break
            except InterfaceNotFoundError:
                continue

        if not interface_found:
            logger.info(
                "Unable to find StatusNotifierItem interface on %s. Falling back to default spec.",
                self.service,
            )
            try:
                obj = self.bus.get_proxy_object(
                    self.service, STATUSNOTIFIER_PATH, STATUS_NOTIFIER_ITEM_SPEC
                )
                self.item = obj.get_interface("org.kde.StatusNotifierItem")
            except InterfaceNotFoundError:
                logger.warning(
                    "Failed to find StatusNotifierItem interface on %s and fallback to default spec also failed.",
                    self.service,
                )
                return False

        # Trying to get the local icon (first without fallback because there might be application-provided icons)
        await self._get_local_icon(fallback=False)

        # If there's no XDG icon, try to use icon provided by application
        if self.icon:
            self.item.on_new_icon(self._update_local_icon)
        else:
            # Get initial application icons:
            for icon in ["Icon", "Attention", "Overlay"]:
                await self._get_icon(icon)

            if self.has_icons:
                # Attach listeners for when the icon is updated
                self.item.on_new_icon(self._new_icon)
                self.item.on_new_attention_icon(self._new_attention_icon)
                self.item.on_new_overlay_icon(self._new_overlay_icon)

        if not self.has_icons:
            logger.warning(
                "Cannot find icon in current theme and no icon provided by StatusNotifierItem."
            )
            # No "local" icon and no application-provided icons are available.
            # The "local" icon may be updated at a later time, so "_update_local_icon"
            # gets registered for "on_new_icon" with the option to fall back to
            # a default icon.
            self.item.on_new_icon(self._update_local_icon)
            await self._get_local_icon()

        return True

    async def _get_local_icon(self, fallback=True):
        # Default to XDG icon
        # Some implementations don't provide an IconName property so we
        # need to catch an error if we can't read it.
        # We can't use hasattr to check this as the method will be created
        # where we've used the default XML spec to provide the object introspection
        try:
            icon_name = await self.item.get_icon_name()
        except DBusError:
            return

        try:
            icon_path = await self.item.get_icon_theme_path()
            self.icon = self._get_custom_icon(icon_name, icon_path)
        except (AttributeError, DBusError):
            pass

        if not self.icon:
            self.icon = self._get_xdg_icon(icon_name)

        if not self.icon and fallback:
            # Use fallback icon libqtile/resources/status_notifier/fallback_icon.png
            logger.warning("Could not find icon for '%s'. Using fallback icon.", icon_name)
            root = os.sep.join(os.path.abspath(__file__).split(os.sep)[:-2])
            path = os.path.join(root, "resources", "status_notifier", "fallback_icon.png")
            self.icon = Img.from_path(path)

    def _create_task_and_draw(self, coro):
        task = create_task(coro)
        task.add_done_callback(self._redraw)

    def _update_local_icon(self):
        self.icon = None
        self._create_task_and_draw(self._get_local_icon())

    def _new_icon(self):
        self._create_task_and_draw(self._get_icon("Icon"))

    def _new_attention_icon(self):
        self._create_task_and_draw(self._get_icon("Attention"))

    def _new_overlay_icon(self):
        self._create_task_and_draw(self._get_icon("Overlay"))

    def _get_custom_icon(self, icon_name, icon_path):
        for ext in [".png", ".svg"]:
            path = os.path.join(icon_path, icon_name + ext)
            if os.path.isfile(path):
                return Img.from_path(path)

        return None

    def _get_xdg_icon(self, icon_name):
        if not has_xdg:
            return

        path = getIconPath(icon_name, theme=self.icon_theme, extensions=["png", "svg"])

        if not path:
            return None

        return Img.from_path(path)

    async def _get_icon(self, icon_name):
        """
        Requests the pixmap for the given `icon_name` and
        adds to an internal dictionary for later retrieval.
        """
        attr, method = self.icon_map[icon_name]
        pixmap = getattr(self.item, method, None)
        if pixmap is None:
            return
        icon_pixmap = await pixmap()

        # Items can present multiple pixmaps for different
        # size of icons. We want to keep these so we can pick
        # the best size when redering the icon later.
        # Also, the bytes sent for the pixmap are big-endian
        # but Cairo expects little-endian so we need to
        # reorder them.
        self._pixmaps[icon_name] = {
            size: self._reorder_bytes(icon_bytes) for size, _, icon_bytes in icon_pixmap
        }

    def _reorder_bytes(self, icon_bytes):
        """
        Method loops over the array and reverses every
        4 bytes (representing one RGBA pixel).
        """
        arr = bytearray(icon_bytes)
        for i in range(0, len(arr), 4):
            arr[i : i + 4] = arr[i : i + 4][::-1]

        return arr

    def _redraw(self, result):
        """Method to invalidate icon cache and redraw icons."""
        self._invalidate_icons()
        if self.on_icon_changed is not None:
            self.on_icon_changed(self)

    def _invalidate_icons(self):
        self.surfaces = {}

    def _get_sizes(self):
        """Returns list of available icon sizes."""
        if not self._pixmaps.get("Icon", False):
            return []

        return sorted([size for size in self._pixmaps["Icon"]])

    def _get_surfaces(self, size):
        """
        Creates a Cairo ImageSurface for each available icon
        for the given size.
        """
        raw_surfaces = {}
        for icon in self._pixmaps:
            if size in self._pixmaps[icon]:
                srf = cairocffi.ImageSurface.create_for_data(
                    self._pixmaps[icon][size], cairocffi.FORMAT_ARGB32, size, size
                )
                raw_surfaces[icon] = srf
        return raw_surfaces

    def get_icon(self, size):
        """
        Returns a cairo ImageSurface for the selected `size`.

        Will pick the appropriate icon and add any overlay as required.
        """
        # Use existing icon if generated previously
        if size in self.surfaces:
            return self.surfaces[size]

        # Create a blank ImageSurface to hold the icon
        icon = cairocffi.ImageSurface(cairocffi.FORMAT_ARGB32, size, size)

        if self.icon:
            base_icon = self.icon.surface
            icon_size = base_icon.get_width()
            overlay = None

        else:
            # Find best matching icon size:
            # We get all available sizes and filter this list so it only shows
            # the icon sizes bigger than the requested size (we prefer to
            # shrink icons rather than scale them up)
            all_sizes = self._get_sizes()
            sizes = [s for s in all_sizes if s >= size]

            # TODO: This is messy. Shouldn't return blank icon
            # If there are no sizes at all (i.e. no icon) then we return empty
            # icon
            if not all_sizes:
                return icon

            # Choose the first available size. If there are none (i.e. we
            # request icon size bigger than the largest provided by the app),
            # we just take the largest icon
            icon_size = sizes[0] if sizes else all_sizes[-1]

            srfs = self._get_surfaces(icon_size)

            # TODO: This shouldn't happen...
            if not srfs:
                return icon

            # TODO: Check spec for when to use "attention"
            base_icon = srfs.get("Attention", srfs["Icon"])
            overlay = srfs.get("Overlay", None)

        with cairocffi.Context(icon) as ctx:
            scale = size / icon_size
            ctx.scale(scale, scale)
            ctx.set_source_surface(base_icon)
            ctx.paint()
            if overlay:
                ctx.set_source_surface(overlay)
                ctx.paint()

        # Store the surface for next time
        self.surfaces[size] = icon

        return icon

    def activate(self):
        if hasattr(self.item, "call_activate"):
            create_task(self._activate())

    async def _activate(self):
        # Call Activate method and pass window position hints
        await self.item.call_activate(0, 0)

    @property
    def has_icons(self):
        return any(bool(icon) for icon in self._pixmaps.values()) or self.icon is not None


class StatusNotifierWatcher(ServiceInterface):  # noqa: E303
    """
    DBus service that creates a StatusNotifierWatcher interface
    on the bus and listens for applications wanting to register
    items.
    """

    def __init__(self, service: str):
        super().__init__(service)
        self._items: List[str] = []
        self._hosts: List[str] = []
        self.service = service
        self.on_item_added: Optional[Callable] = None
        self.on_host_added: Optional[Callable] = None
        self.on_item_removed: Optional[Callable] = None
        self.on_host_removed: Optional[Callable] = None

    async def start(self):
        # Set up and register the service on ths bus
        self.bus = await MessageBus().connect()
        self.bus.add_message_handler(self._message_handler)
        self.bus.export("/StatusNotifierWatcher", self)
        await self.bus.request_name(self.service)

        # We need to listen for interfaces being removed from
        # the bus so we can remove icons when the application
        # is closed.
        await self._setup_listeners()

    def _message_handler(self, message):
        """
        Low level method to check incoming messages.

        Ayatana indicators seem to register themselves by passing their object
        path rather than the service providing that object. We therefore need
        to identify the sender of the message in order to register the service.

        Returning False so senders receieve a reply (returning True prevents
        reply being sent)
        """
        if message.member != "RegisterStatusNotifierItem":
            return False

        # If the argument is not an object path (starting with "/") then we assume
        # it is the bus name and we don't need to do anything else.
        if not message.body[0].startswith("/"):
            return False

        if message.sender not in self._items:
            self._items.append(message.sender)
            if self.on_item_added is not None:
                self.on_item_added(message.sender, message.body[0])
            self.StatusNotifierItemRegistered(message.sender)
        return False

    async def _setup_listeners(self):
        """
        Register a MatchRule to receive signals when interfaces are added
        and removed from the bus.
        """
        await add_signal_receiver(
            self._name_owner_changed,
            session_bus=True,
            signal_name="NameOwnerChanged",
            dbus_interface="org.freedesktop.DBus",
        )

    def _name_owner_changed(self, message):
        # We need to track when an interface has been removed from the bus
        # We use the NameOwnerChanged signal and check if the new owner is
        # empty.
        name, _, new_owner = message.body

        # Check if one of our registered items or hosts has been removed.
        # If so, remove from our list and emit relevant signal
        if new_owner == "" and name in self._items:
            self._items.remove(name)
            self.StatusNotifierItemUnregistered(name)

        if new_owner == "" and name in self._hosts:
            self._hosts.remove(name)
            self.StatusNotifierHostUnregistered(name)

    @method()
    def RegisterStatusNotifierItem(self, service: "s"):  # type: ignore  # noqa: F821, N802
        if service not in self._items:
            self._items.append(service)
            if self.on_item_added is not None:
                self.on_item_added(service)
            self.StatusNotifierItemRegistered(service)

    @method()
    def RegisterStatusNotifierHost(self, service: "s"):  # type: ignore  # noqa: F821, N802
        if service not in self._hosts:
            self._hosts.append(service)
            self.StatusNotifierHostRegistered(service)

    @dbus_property(access=PropertyAccess.READ)
    def RegisteredStatusNotifierItems(self) -> "as":  # type: ignore  # noqa: F722, F821, N802
        return self._items

    @dbus_property(access=PropertyAccess.READ)
    def IsStatusNotifierHostRegistered(self) -> "b":  # type: ignore  # noqa: F821, N802
        # Note: applications may not register items unless this
        # returns True
        return len(self._hosts) > 0

    @dbus_property(access=PropertyAccess.READ)
    def ProtocolVersion(self) -> "i":  # type: ignore  # noqa: F821, N802
        return PROTOCOL_VERSION

    @signal()
    def StatusNotifierItemRegistered(self, service) -> "s":  # type: ignore  # noqa: F821, N802
        return service

    @signal()
    def StatusNotifierItemUnregistered(self, service) -> "s":  # type: ignore  # noqa: F821, N802
        if self.on_item_removed is not None:
            self.on_item_removed(service)
        return service

    @signal()
    def StatusNotifierHostRegistered(self, service) -> "s":  # type: ignore  # noqa: F821, N802
        if self.on_host_added is not None:
            self.on_host_added(service)
        return service

    @signal()
    def StatusNotifierHostUnregistered(self, service) -> "s":  # type: ignore  # noqa: F821, N802
        if self.on_host_removed is not None:
            self.on_host_removed(service)
        return service


class StatusNotifierHost:  # noqa: E303
    """
    Host object to act as a bridge between the widget and the DBus objects.

    The Host collates items returned from multiple watcher interfaces and
    collates them into a single list for the widget to access.
    """

    def __init__(self):
        self.watchers: List[StatusNotifierWatcher] = []
        self.items: List[StatusNotifierItem] = []
        self.name = "qtile"
        self.icon_theme: str = None
        self.started = False
        self._on_item_added: List[Callable] = []
        self._on_item_removed: List[Callable] = []
        self._on_icon_changed: List[Callable] = []

    async def start(
        self,
        on_item_added: Optional[Callable] = None,
        on_item_removed: Optional[Callable] = None,
        on_icon_changed: Optional[Callable] = None,
    ):
        """
        Starts the host if not already started.

        Widgets should register their callbacks via this method.
        """
        if on_item_added:
            self._on_item_added.append(on_item_added)

        if on_item_removed:
            self._on_item_removed.append(on_item_removed)

        if on_icon_changed:
            self._on_icon_changed.append(on_icon_changed)

        if self.started:
            if on_item_added:
                for item in self.items:
                    on_item_added(item)
            return

        self.bus = await MessageBus().connect()
        await self.bus.request_name("org.freedesktop.StatusNotifierHost-qtile")
        for iface in BUS_NAMES:
            w = StatusNotifierWatcher(iface)
            w.on_item_added = self.add_item
            w.on_item_removed = self.remove_item
            await w.start()

            # Not quite following spec here as we're not registering
            # the host on the bus.
            w.RegisterStatusNotifierHost(self.name)
            self.watchers.append(w)
            self.started = True

    def item_added(self, item, service, future):
        success = future.result()
        # If StatusNotifierItem object was created successfully then we
        # add to our list and redraw the bar
        if success:
            self.items.append(item)
            for callback in self._on_item_added:
                callback(item)

        # It's an invalid item so let's remove it from the watchers
        else:
            for w in self.watchers:
                try:
                    w._items.remove(service)
                except ValueError:
                    pass

    def add_item(self, service, path=None):
        """
        Creates a StatusNotifierItem for the given service and tries to
        start it.
        """
        item = StatusNotifierItem(self.bus, service, path=path, icon_theme=self.icon_theme)
        item.on_icon_changed = self.item_icon_changed
        if item not in self.items:
            task = create_task(item.start())
            task.add_done_callback(partial(self.item_added, item, service))

    def remove_item(self, interface):
        # Check if the interface is in out list of items and, if so,
        # remove it and redraw the bar
        if interface in self.items:
            self.items.remove(interface)
            for callback in self._on_item_removed:
                callback(interface)

    def item_icon_changed(self, item):
        for callback in self._on_icon_changed:
            callback(item)


host = StatusNotifierHost()  # noqa: E303


[docs]class StatusNotifier(base._Widget): """ A 'system tray' widget using the freedesktop StatusNotifierItem specification. As per the specification, app icons are first retrieved from the user's current theme. If this is not available then the app may provide its own icon. In order to use this functionality, users are recommended to install the `pyxdg <https://pypi.org/project/pyxdg/>`__ module to support retrieving icons from the selected theme. If the icon specified by StatusNotifierItem can not be found in the user's current theme and no other icons are provided by the app, a fallback icon is used. Left-clicking an icon will trigger an activate event. .. note:: Context menus are not currently supported by the official widget. However, a modded version of the widget which provides basic menu support is available from elParaguayo's `qtile-extras <https://github.com/elParaguayo/qtile-extras>`_ repo. """ orientations = base.ORIENTATION_BOTH defaults = [ ("icon_size", 16, "Icon width"), ("icon_theme", None, "Name of theme to use for app icons"), ("padding", 3, "Padding between icons"), ] def __init__(self, **config): base._Widget.__init__(self, bar.CALCULATED, **config) self.add_defaults(StatusNotifier.defaults) self.add_callbacks( { "Button1": self.activate, } ) self.selected_item: Optional[StatusNotifierItem] = None @property def available_icons(self): return [item for item in host.items if item.has_icons] def calculate_length(self): if not host.items: return 0 return len(self.available_icons) * (self.icon_size + self.padding) + self.padding def _configure(self, qtile, bar): if has_xdg and self.icon_theme: host.icon_theme = self.icon_theme # This is called last as it starts timers including _config_async. base._Widget._configure(self, qtile, bar) async def _config_async(self): def draw(x=None): self.bar.draw() await host.start(on_item_added=draw, on_item_removed=draw, on_icon_changed=draw) def find_icon_at_pos(self, x, y): """returns StatusNotifierItem object for icon in given position""" offset = self.padding val = x if self.bar.horizontal else y if val < offset: return None for icon in self.available_icons: offset += self.icon_size if val < offset: return icon offset += self.padding return None def button_press(self, x, y, button): icon = self.find_icon_at_pos(x, y) self.selected_item = icon if icon else None name = "Button{0}".format(button) if name in self.mouse_callbacks: self.mouse_callbacks[name]() def draw(self): self.drawer.clear(self.background or self.bar.background) if self.bar.horizontal: xoffset = self.padding yoffset = (self.bar.height - self.icon_size) // 2 for item in self.available_icons: icon = item.get_icon(self.icon_size) self.drawer.ctx.set_source_surface(icon, xoffset, yoffset) self.drawer.ctx.paint() xoffset += self.icon_size + self.padding self.drawer.draw(offsetx=self.offset, offsety=self.offsety, width=self.length) else: xoffset = (self.bar.width - self.icon_size) // 2 yoffset = self.padding for item in self.available_icons: icon = item.get_icon(self.icon_size) self.drawer.ctx.set_source_surface(icon, xoffset, yoffset) self.drawer.ctx.paint() yoffset += self.icon_size + self.padding self.drawer.draw(offsety=self.offset, offsetx=self.offsetx, height=self.length) def activate(self): """Primary action when clicking on an icon""" if not self.selected_item: return self.selected_item.activate()