Source code for libqtile.widget.pulse_volume

# -*- coding: utf-8 -*-
import logging

from libqtile.widget._pulse_audio import ffi, lib
from libqtile.widget.volume import Volume

log = logging.getLogger(__name__)


@ffi.def_extern()
def qtile_pa_context_changed(context, userdata):
    """callback for connecting status update"""
    widget = ffi.from_handle(userdata)
    widget.on_connection_change(context)


@ffi.def_extern()
def qtile_on_sink_info(context, info, eol, userdata):
    """callback for each output sink that pulseaudio server has"""
    widget = ffi.from_handle(userdata)
    widget.on_sink_info(info, eol)


@ffi.def_extern()
def qtile_on_server_info(context, info, userdata):
    """callback with a pulseaudio server info"""
    widget = ffi.from_handle(userdata)
    widget.on_server_info(info)


@ffi.def_extern()
def qtile_on_sink_update(context, event_type, sink_index, userdata):
    """callback for updates made to sinks"""
    widget = ffi.from_handle(userdata)
    widget.on_sink_update(event_type, sink_index)


[docs]class PulseVolume(Volume): defaults = [ ('limit_max_volume', False, 'Limit maximum volume to 100%'), ] def __init__(self, **config): Volume.__init__(self, **config) self.add_defaults(PulseVolume.defaults) self.connected = None self._subscribed = False self.default_sink_name = None self.default_sink = None self.handle = ffi.new_handle(self) self.client_name = ffi.new('char[]', b'Qtile-pulse') self.connect() def finalize(self): lib.pa_context_disconnect(self.context) lib.pa_mainloop_quit(self.loop, 1) lib.pa_context_unref(self.context) lib.pa_mainloop_free(self.loop) def connect(self): """ issue a connection to pulse audio server. result of a connection would be passed to `on_connection_change` method """ # create a loop and api entry point self.loop = lib.pa_mainloop_new() self.api = lib.pa_mainloop_get_api(self.loop) # create context (e.g. connection) self.context = lib.pa_context_new(self.api, self.client_name) lib.pa_context_connect(self.context, ffi.NULL, 0, ffi.NULL) lib.pa_context_set_state_callback( self.context, lib.qtile_pa_context_changed, self.handle ) def on_connection_change(self, context): """a callback from pulse lib indicating connection status""" state = lib.pa_context_get_state(context) if state == lib.PA_CONTEXT_READY: # ready self.connected = True # once onnection is established we need to get server information self.timeout_add(0.1, self.get_server_info) log.debug('Connection to pulseaudio ready') elif state == lib.PA_CONTEXT_FAILED: # failed to connect self.connected = False self._subscribed = False log.warning('Failed to connect to pulseaudio, retrying in 10s') self.timeout_add(10, self.connect) elif state == lib.PA_CONTEXT_TERMINATED: # done self.connected = False self._subscribed = False log.debug('Connection to pulseaudio terminated cleanly') elif state == lib.PA_CONTEXT_UNCONNECTED: self.connected = False self._subscribed = False log.warning("Disconnected from pulseaudio") def get_server_info(self): lib.pa_context_get_server_info( self.context, lib.qtile_on_server_info, self.handle ) def on_server_info(self, info): self.default_sink_name = ffi.string(info.default_sink_name) \ .decode('utf-8') self.timeout_add(0.1, self.get_sinks) def get_sinks(self): lib.pa_context_get_sink_info_list( self.context, lib.qtile_on_sink_info, self.handle ) def on_sink_info(self, sink, eol): if eol: # dont operate on sink in case its an eol callback if not self._subscribed: self.timeout_add(0.1, self.subscribe_to_sink_events) return name = ffi.string(sink.name).decode('utf-8') if name == self.default_sink_name: self.default_sink = { 'name': name, 'description': ffi.string(sink.description).decode('utf-8'), 'index': int(sink.index), 'base_volume': sink.base_volume, 'muted': bool(sink.mute), 'channels': sink.volume.channels, 'values': list(sink.volume.values), } self.update() def subscribe_to_sink_events(self): op = lib.pa_context_subscribe( self.context, lib.PA_SUBSCRIPTION_MASK_SINK, ffi.NULL, ffi.NULL, ) self.wait_for_operation(op) lib.pa_context_set_subscribe_callback( self.context, lib.qtile_on_sink_update, self.handle ) self._subscribed = True def on_sink_update(self, event_type, sink_index): self.timeout_add(0.1, self.get_sinks) def wait_for_operation(self, op): state = lib.pa_operation_get_state(op) while state == lib.PA_OPERATION_RUNNING: lib.pa_mainloop_iterate(self.loop, 0, ffi.NULL) state = lib.pa_operation_get_state(op) def change_volume(self, volume): """ order pulseaudio to apply new volume """ # store new volume to "speed up" widget update so that we don't have # to wait a callback from pulseaudio self.default_sink['values'] = list(volume.values) op = lib.pa_context_set_sink_volume_by_index( self.context, self.default_sink['index'], volume, ffi.NULL, ffi.NULL ) if op: self.wait_for_operation(op) def cmd_mute(self): op = lib.pa_context_set_sink_mute_by_index( self.context, self.default_sink['index'], not self.default_sink['muted'], ffi.NULL, ffi.NULL ) if op: self.wait_for_operation(op) def cmd_increase_vol(self, value=None): if value is None: value = self.step base = self.default_sink['base_volume'] volume = ffi.new('pa_cvolume *', { 'channels': self.default_sink['channels'], 'values': self.default_sink['values'], }) lib.pa_cvolume_inc( volume, int(value * base / 100), ) # check that we dont go over 100% in case its set in config if self.limit_max_volume: volume.values = [(i if i <= base else base) for i in volume.values] self.change_volume(volume) def cmd_decrease_vol(self, value=None): if value is None: value = self.step volume_level = int(value * self.default_sink['base_volume'] / 100) if not volume_level and max(self.default_sink['values']) == 0: # can't be lower than zero return volume = ffi.new('pa_cvolume *', { 'channels': self.default_sink['channels'], 'values': self.default_sink['values'], }) lib.pa_cvolume_dec(volume, volume_level) self.change_volume(volume) def button_press(self, x, y, button): Volume.button_press(self, x, y, button) self.poll() def poll(self): lib.pa_mainloop_iterate(self.loop, 0, ffi.NULL) self.update() def update(self): """ same method as in Volume widgets except that here we don't need to manually re-schedule update """ vol = self.get_volume() if vol != self.volume: self.volume = vol # Update the underlying canvas size before actually attempting # to figure out how big it is and draw it. self._update_drawer() self.bar.draw() def get_volume(self): if self.default_sink: if self.default_sink['muted']: return -1 base = self.default_sink['base_volume'] if not base: return -1 current = max(self.default_sink['values']) return int(current * 100 / base) return -1 def timer_setup(self): self.poll() if self.update_interval is not None: self.timeout_add(self.update_interval, self.timer_setup) if self.theme_path: self.setup_images()