Source code for libqtile.widget.pulse_volume

# -*- coding: utf-8 -*-
# Copyright (c) 2023 elParaguayo
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
import asyncio

import pulsectl_asyncio
from pulsectl import PulseError

from libqtile import qtile
from libqtile.command.base import expose_command
from libqtile.log_utils import logger
from libqtile.utils import create_task
from libqtile.widget.volume import Volume

lock = asyncio.Lock()


class PulseConnection:
    """
    Class object to manage the connection to the pulse server and send
    volume/mute status to subscribed clients.
    """

    def __init__(self):
        self._subscribed = False
        self._event_handler = None
        self.default_sink = None
        self.default_sink_name = None
        self.pulse = None
        self.configured = False
        self.callbacks = []
        self.qtile = qtile
        self.timer = None

    async def _configure(self):
        # Use a lock to prevent multiple connection attempts
        async with lock:
            if self.configured:
                return

            # Create pulse async object but don't connect
            self.pulse = pulsectl_asyncio.PulseAsync("qtile-pulse")

            # Try to connect
            await self._check_pulse_connection()

            self.configured = True

    async def _check_pulse_connection(self):
        """
        The PulseAsync object subscribes to connection state events so we
        need to check periodically whether the connection has been lost.
        """
        if not self.pulse.connected:
            # Check if we were previously connected to the server and,
            # if so, stop the event handler
            if self._subscribed:
                if self._event_handler is not None:
                    self._event_handler.cancel()
                    self._event_handler = None
                self._subscribed = False
            try:
                await self.pulse.connect()
                logger.debug("Connection to pulseaudio ready")
            except PulseError:
                logger.warning("Failed to connect to pulseaudio, retrying in 10s")
            else:
                # We're connected so get details of the default sink
                await self.get_server_info()

                # Start event listeners for sink and server events
                self._event_handler = create_task(self._event_listener())
                self._subscribed = True

        # Set a timer to check status in 10 seconds time
        self.timer = self.qtile.call_later(10, create_task, self._check_pulse_connection())

    async def _event_listener(self):
        """Listens for sink and server events from the server."""
        async for event in self.pulse.subscribe_events("sink", "server"):
            # Sink events will signify volume changes
            if event.facility == "sink":
                await self.get_sink_info()
            # Server events include when the default sink changes
            elif event.facility == "server":
                await self.get_server_info()

    async def get_server_info(self):
        """Updates the default sink name."""
        info = await self.pulse.server_info()
        self.default_sink_name = info.default_sink_name
        await self.get_sink_info()

    async def get_sink_info(self):
        """Gets a reference to the default sink and triggers update for subscribed clients."""
        sinks = [
            sink for sink in await self.pulse.sink_list() if sink.name == self.default_sink_name
        ]
        if not sinks:
            logger.warning("Could not get info for default sink")
            self.default_sink = None
            return

        self.default_sink = sinks[0]
        self.update_clients()

    def get_volume(self):
        """Gets volume and mute status for default sink."""
        if not self.pulse.connected:
            return None, None

        if self.default_sink:
            mute = self.default_sink.mute
            if mute:
                return -1, mute
            base = self.default_sink.base_volume
            if not base:
                return -1, mute
            current = self.default_sink.volume.value_flat
            return round(current * 100 / base), mute
        return -1, 0

    def update_clients(self):
        """Sends volume and mute status to subscribed clients."""
        for callback in self.callbacks:
            callback(*self.get_volume())

    def subscribe(self, callback):
        """
        Subscribes a client for callback events.

        The first subscription will trigger the connection to the
        pulse server.
        """
        need_configure = not bool(self.callbacks)
        self.callbacks.append(callback)

        if need_configure:
            create_task(self._configure())

    def unsubscribe(self, callback):
        """
        Unsubscribes a client from callback events.

        Removing the last client closes the connection with the
        pulse server and cancels future calls to connect.
        """
        try:
            self.callbacks.remove(callback)
        except ValueError:
            pass

        if not self.callbacks:
            self.pulse.close()

            # Prevent future calls to connect to the server
            if self.timer:
                self.timer.cancel()
                self.timer = None

            self.configured = False


pulse = PulseConnection()


[docs]class PulseVolume(Volume): """ Volume widget for systems using PulseAudio. The widget connects to the PulseAudio server by using the libpulse library and so should be updated virtually instantly rather than needing to poll the volume status regularly (NB this means that the ``update_interval`` parameter serves no purpose for this widget). The widget relies on the `pulsectl_asyncio <https://pypi.org/project/pulsectl-asyncio/>`__ library to access the libpulse bindings. If you are using python 3.11 you must use ``pulsectl_asyncio >= 1.0.0``. """ defaults = [ ("limit_max_volume", False, "Limit maximum volume to 100%"), ] def __init__(self, **config): Volume.__init__(self, **config) self.add_defaults(PulseVolume.defaults) self.volume = 0 self.mute = 0 self._previous_state = (-1.0, -1) def _configure(self, qtile, bar): Volume._configure(self, qtile, bar) if self.theme_path: self.setup_images() pulse.subscribe(self.get_vals) async def _change_volume(self, volume): """Sets volume on default sink.""" await pulse.pulse.volume_set_all_chans(pulse.default_sink, volume) async def _mute(self): """Toggles mute status of default sink.""" await pulse.pulse.sink_mute(pulse.default_sink.index, not pulse.default_sink.mute)
[docs] @expose_command() def mute(self): """Mute the sound device.""" create_task(self._mute())
[docs] @expose_command() def increase_vol(self, value=None): """Increase volume.""" if not value: value = pulse.default_sink.volume.value_flat + (self.step / 100.0) base = pulse.default_sink.base_volume if self.limit_max_volume and value > base: value = base create_task(self._change_volume(value))
[docs] @expose_command() def decrease_vol(self, value=None): """Decrease volume.""" if not value: value = pulse.default_sink.volume.value_flat - (self.step / 100.0) value = max(value, 0) create_task(self._change_volume(value))
def get_vals(self, vol, mute): if (vol, mute) != self._previous_state: self.volume = vol self.mute = mute self._previous_state = (vol, mute) self.update() def update(self): """ same method as in Volume widgets except that here we don't need to manually re-schedule update """ if pulse.pulse is None or not pulse.pulse.connected: return # Update the underlying canvas size before actually attempting # to figure out how big it is and draw it. length = self.length self._update_drawer() if self.length == length: self.draw() else: self.bar.draw() def finalize(self): # Close the connection to the server pulse.unsubscribe(self.get_vals) Volume.finalize(self)