Source code for libqtile.command

# Copyright (c) 2008, Aldo Cortesi. All rights reserved.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.

import abc
import inspect
import traceback
import os
import six
import sys

from . import ipc
from .utils import get_cache_dir
from .log_utils import logger


class CommandError(Exception):
    pass


class CommandException(Exception):
    pass


class _SelectError(Exception):
    def __init__(self, name, sel):
        Exception.__init__(self)
        self.name = name
        self.sel = sel


SUCCESS = 0
ERROR = 1
EXCEPTION = 2

SOCKBASE = "qtilesocket.%s"


def formatSelector(lst):
    """
        Takes a list of (name, sel) tuples, and returns a formatted
        selector expression.
    """
    expr = []
    for name, sel in iter(lst):
        if expr:
            expr.append(".")
        expr.append(name)
        if sel is not None:
            expr.append("[%s]" % repr(sel))
    return "".join(expr)


class _Server(ipc.Server):
    def __init__(self, fname, qtile, conf, eventloop):
        if os.path.exists(fname):
            os.unlink(fname)
        ipc.Server.__init__(self, fname, self.call, eventloop)
        self.qtile = qtile
        self.widgets = {}
        for i in conf.screens:
            for j in i.gaps:
                if hasattr(j, "widgets"):
                    for w in j.widgets:
                        if w.name:
                            self.widgets[w.name] = w

    def call(self, data):
        selectors, name, args, kwargs = data
        try:
            obj = self.qtile.select(selectors)
        except _SelectError as v:
            e = formatSelector([(v.name, v.sel)])
            s = formatSelector(selectors)
            return (ERROR, "No object %s in path '%s'" % (e, s))
        cmd = obj.command(name)
        if not cmd:
            return (ERROR, "No such command.")
        logger.info("Command: %s(%s, %s)", name, args, kwargs)
        try:
            return (SUCCESS, cmd(*args, **kwargs))
        except CommandError as v:
            return (ERROR, v.args[0])
        except Exception:
            return (EXCEPTION, traceback.format_exc())


class _Command(object):
    def __init__(self, call, selectors, name):
        """
            :command A string command name specification
            :*args Arguments to be passed to the specified command
            :*kwargs Arguments to be passed to the specified command
        """
        self.selectors = selectors
        self.name = name
        self.call = call

    def __call__(self, *args, **kwargs):
        return self.call(self.selectors, self.name, *args, **kwargs)


class _CommandTree(six.with_metaclass(abc.ABCMeta)):
    """A hierarchical collection of objects that contain commands

    CommandTree objects act as containers, allowing them to be nested. The
    commands themselves appear on the object as callable attributes.
    """
    def __init__(self, selectors, myselector, parent):
        self.selectors = selectors
        self.myselector = myselector
        self.parent = parent

    @property
    def path(self):
        s = self.selectors[:]
        if self.name:
            s += [(self.name, self.myselector)]
        return formatSelector(s)

    @property
    @abc.abstractmethod
    def name(self):
        pass

    @property
    @abc.abstractmethod
    def _contains(self):
        pass

    def call(self, selectors, name, *args, **kwargs):
        if self.parent:
            return self.parent.call(selectors, name, *args, **kwargs)
        else:
            raise NotImplementedError()

    def __getitem__(self, select):
        if self.myselector:
            raise KeyError("No such key: %s" % select)
        return self.__class__(self.selectors, select, self)

    def __getattr__(self, name):
        nextSelector = self.selectors[:]
        if self.name:
            nextSelector.append((self.name, self.myselector))
        if name in self._contains:
            return _TreeMap[name](nextSelector, None, self)
        else:
            return _Command(self.call, nextSelector, name)


class _TLayout(_CommandTree):
    name = "layout"
    _contains = ["group", "window", "screen"]


class _TWidget(_CommandTree):
    name = "widget"
    _contains = ["bar", "screen", "group"]


class _TBar(_CommandTree):
    name = "bar"
    _contains = ["screen"]


class _TWindow(_CommandTree):
    name = "window"
    _contains = ["group", "screen", "layout"]


class _TScreen(_CommandTree):
    name = "screen"
    _contains = ["layout", "window", "bar"]


class _TGroup(_CommandTree):
    name = "group"
    _contains = ["layout", "window", "screen"]


_TreeMap = {
    "layout": _TLayout,
    "widget": _TWidget,
    "bar": _TBar,
    "window": _TWindow,
    "screen": _TScreen,
    "group": _TGroup,
}


class _CommandRoot(six.with_metaclass(abc.ABCMeta, _CommandTree)):
    """This class constructs the entire hierarchy of callable commands from a conf object"""
    name = None
    _contains = ["layout", "widget", "screen", "bar", "window", "group"]

    def __init__(self):
        _CommandTree.__init__(self, [], None, None)

    def __getitem__(self, select):
        raise KeyError("No such key: %s" % select)

    @abc.abstractmethod
    def call(self, selectors, name, *args, **kwargs):
        """This method is called for issued commands.

        Parameters
        ==========
        selectors :
            A list of (name, selector) tuples.
        name :
            Command name.
        """
        pass


def find_sockfile(display=None):
    """
        Finds the appropriate socket file.
    """
    display = display or os.environ.get('DISPLAY') or ':0.0'
    if '.' not in display:
        display += '.0'
    cache_directory = get_cache_dir()
    return os.path.join(cache_directory, SOCKBASE % display)


[docs]class Client(_CommandRoot): """Exposes a command tree used to communicate with a running instance of Qtile""" def __init__(self, fname=None, is_json=False): if not fname: fname = find_sockfile() self.client = ipc.Client(fname, is_json) _CommandRoot.__init__(self) def call(self, selectors, name, *args, **kwargs): state, val = self.client.call((selectors, name, args, kwargs)) if state == SUCCESS: return val elif state == ERROR: raise CommandError(val) else: raise CommandException(val)
class CommandRoot(_CommandRoot): def __init__(self, qtile): self.qtile = qtile super(CommandRoot, self).__init__() def call(self, selectors, name, *args, **kwargs): state, val = self.qtile.server.call((selectors, name, args, kwargs)) if state == SUCCESS: return val elif state == ERROR: raise CommandError(val) else: raise CommandException(val) class _Call(object): """ Parameters ========== command : A string command name specification args : Arguments to be passed to the specified command kwargs : Arguments to be passed to the specified command """ def __init__(self, selectors, name, *args, **kwargs): self.selectors = selectors self.name = name self.args = args self.kwargs = kwargs # Conditionals self.layout = None def when(self, layout=None, when_floating=True): self.layout = layout self.when_floating = when_floating return self def check(self, q): if self.layout: if self.layout == 'floating': if q.currentWindow.floating: return True return False if q.currentLayout.name != self.layout: return False if q.currentWindow and q.currentWindow.floating \ and not self.when_floating: return False return True class _LazyTree(_CommandRoot): def call(self, selectors, name, *args, **kwargs): return _Call(selectors, name, *args, **kwargs) lazy = _LazyTree() class CommandObject(six.with_metaclass(abc.ABCMeta)): """Base class for objects that expose commands Each command should be a method named `cmd_X`, where X is the command name. A CommandObject should also implement `._items()` and `._select()` methods (c.f. docstring for `.items()` and `.select()`). """ def select(self, selectors): """Return a selected object Recursively finds an object specified by a list of `(name, selector)` items. Raises _SelectError if the object does not exist. """ if not selectors: return self name, selector = selectors[0] next_selector = selectors[1:] root, items = self.items(name) # if non-root object and no selector given # if no items in container, but selector is given # if selector is not in the list of contained items if (root is False and selector is None) or \ (items is None and selector is not None) or \ (items is not None and selector and selector not in items): raise _SelectError(name, selector) obj = self._select(name, selector) if obj is None: raise _SelectError(name, selector) return obj.select(next_selector) def items(self, name): """Build a list of contained items for the given item class Returns a tuple `(root, items)` for the specified item class, where: root: True if this class accepts a "naked" specification without an item seletion (e.g. "layout" defaults to current layout), and False if it does not (e.g. no default "widget"). items: a list of contained items """ ret = self._items(name) if ret is None: # Not finding information for a particular item class is OK here; # we don't expect layouts to have a window, etc. return False, [] return ret @abc.abstractmethod def _items(self, name): """Generate the items for a given Same return as `.items()`. Return `None` if name is not a valid item class. """ pass @abc.abstractmethod def _select(self, name, sel): """Select the given item of the given item class This method is called with the following guarantees: - `name` is a valid selector class for this item - `sel` is a valid selector for this item - the `(name, sel)` tuple is not an "impossible" combination (e.g. a selector is specified when `name` is not a containment object). Return None if no such object exists """ pass def command(self, name): return getattr(self, "cmd_" + name, None) @property def commands(self): cmds = [i[4:] for i in dir(self) if i.startswith("cmd_")] return cmds def cmd_commands(self): """Returns a list of possible commands for this object Used by __qsh__ for command completion and online help """ return self.commands def cmd_items(self, name): """Returns a list of contained items for the specified name Used by __qsh__ to allow navigation of the object graph. """ return self.items(name) def docSig(self, name): # inspect.signature introduced in Python 3.3 if sys.version_info < (3, 3): args, varargs, varkw, defaults = inspect.getargspec(self.command(name)) if args and args[0] == "self": args = args[1:] return name + inspect.formatargspec(args, varargs, varkw, defaults) sig = inspect.signature(self.command(name)) args = list(sig.parameters) if args and args[0] == "self": args = args[1:] sig = sig.replace(parameters=args) return name + str(sig) def docText(self, name): return inspect.getdoc(self.command(name)) or "" def doc(self, name): spec = self.docSig(name) htext = self.docText(name) return spec + '\n' + htext def cmd_doc(self, name): """Returns the documentation for a specified command name Used by __qsh__ to provide online help. """ if name in self.commands: return self.doc(name) else: raise CommandError("No such command: %s" % name) def cmd_eval(self, code): """Evaluates code in the same context as this function Return value is tuple `(success, result)`, success being a boolean and result being a string representing the return value of eval, or None if exec was used instead. """ try: try: return (True, str(eval(code))) except SyntaxError: exec(code) return (True, None) except: # noqa: E722 error = traceback.format_exc().strip().split("\n")[-1] return (False, error) def cmd_function(self, function, *args, **kwargs): """Call a function with current object as argument""" try: function(self, *args, **kwargs) except Exception: error = traceback.format_exc() logger.error('Exception calling "%s":\n%s' % (function, error))