# Copyright (c) 2010-2011 Aldo Cortesi
# Copyright (c) 2010 Philip Kranz
# Copyright (c) 2011 Mounier Florian
# Copyright (c) 2011 Paul Colomiets
# Copyright (c) 2011-2012 roger
# Copyright (c) 2011-2012, 2014 Tycho Andersen
# Copyright (c) 2012 Dustin Lacewell
# Copyright (c) 2012 Laurie Clark-Michalek
# Copyright (c) 2012-2014 Craig Barnes
# Copyright (c) 2013 Tao Sauvage
# Copyright (c) 2014 ramnes
# Copyright (c) 2014 Sean Vig
# Copyright (C) 2015, Juan Riquelme González
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
from __future__ import annotations
import abc
import glob
import os
import pickle
import string
from collections import deque
from libqtile import hook, pangocffi, utils
from libqtile.command.base import CommandObject, SelectError, expose_command
from libqtile.command.client import InteractiveCommandClient
from libqtile.command.interface import CommandError, QtileCommandInterface
from libqtile.log_utils import logger
from libqtile.widget import base
class AbstractCompleter(metaclass=abc.ABCMeta):
@abc.abstractmethod
def __init__(self, qtile: CommandObject) -> None:
pass
@abc.abstractmethod
def actual(self) -> str | None:
pass
@abc.abstractmethod
def reset(self) -> None:
pass
@abc.abstractmethod
def complete(self, txt: str, aliases: dict[str, str] | None = None) -> str:
"""
Perform the requested completion on the given text.
The completer can optionally support aliases, which map strings to commands. The
completer should include the aliases in the completion results.
"""
# pragma: no cover
class NullCompleter(AbstractCompleter):
def __init__(self, qtile) -> None:
self.qtile = qtile
def actual(self) -> str:
return ""
def reset(self) -> None:
pass
def complete(self, txt: str, _aliases: dict[str, str] | None = None) -> str:
return txt
class FileCompleter(AbstractCompleter):
def __init__(self, qtile, _testing=False) -> None:
self._testing = _testing
self.qtile = qtile
self.thisfinal = None # type: str | None
self.lookup = None # type: list[tuple[str, str]] | None
self.reset()
def actual(self) -> str | None:
return self.thisfinal
def reset(self) -> None:
self.lookup = None
def complete(self, txt: str, _aliases: dict[str, str] | None = None) -> str:
"""Returns the next completion for txt, or None if there is no completion"""
if self.lookup is None:
self.lookup = []
if txt == "" or txt[0] not in "~/":
txt = "~/" + txt
path = os.path.expanduser(txt)
if os.path.isdir(path):
files = glob.glob(os.path.join(path, "*"))
prefix = txt
else:
files = glob.glob(path + "*")
prefix = os.path.dirname(txt)
prefix = prefix.rstrip("/") or "/"
for f in files:
display = os.path.join(prefix, os.path.basename(f))
if os.path.isdir(f):
display += "/"
self.lookup.append((display, f))
self.lookup.sort()
self.offset = -1
self.lookup.append((txt, txt))
self.offset += 1
if self.offset >= len(self.lookup):
self.offset = 0
ret = self.lookup[self.offset]
self.thisfinal = ret[1]
return ret[0]
class QshCompleter(AbstractCompleter):
def __init__(self, qtile: CommandObject) -> None:
q = QtileCommandInterface(qtile)
self.client = InteractiveCommandClient(q)
self.thisfinal = None # type: str | None
self.reset()
def actual(self) -> str | None:
return self.thisfinal
def reset(self) -> None:
self.lookup = None # type: list[tuple[str, str]] | None
self.path = ""
self.offset = -1
def complete(self, txt: str, _aliases: dict[str, str] | None = None) -> str:
txt = txt.lower()
if self.lookup is None:
self.lookup = []
path = txt.split(".")[:-1]
self.path = ".".join(path)
term = txt.split(".")[-1]
if len(self.path) > 0:
self.path += "."
contains_cmd = f"self.client.{self.path}_contains"
try:
contains = eval(contains_cmd)
except AttributeError:
contains = []
for obj in contains:
if obj.lower().startswith(term):
self.lookup.append((obj, obj))
commands_cmd = f"self.client.{self.path}commands()"
try:
commands = eval(commands_cmd)
except (CommandError, AttributeError):
commands = []
for cmd in commands:
if cmd.lower().startswith(term):
self.lookup.append((cmd + "()", cmd + "()"))
self.offset = -1
self.lookup.append((term, term))
self.offset += 1
if self.offset >= len(self.lookup):
self.offset = 0
ret = self.lookup[self.offset]
self.thisfinal = self.path + ret[0]
return self.path + ret[0]
class GroupCompleter(AbstractCompleter):
def __init__(self, qtile: CommandObject) -> None:
self.qtile = qtile
self.thisfinal = None # type: str | None
self.lookup = None # type: list[tuple[str, str]] | None
self.offset = -1
def actual(self) -> str | None:
"""Returns the current actual value"""
return self.thisfinal
def reset(self) -> None:
self.lookup = None
self.offset = -1
def complete(self, txt: str, _aliases: dict[str, str] | None = None) -> str:
"""Returns the next completion for txt, or None if there is no completion"""
txt = txt.lower()
if not self.lookup:
self.lookup = []
for group in self.qtile.groups_map.keys():
if group.lower().startswith(txt):
self.lookup.append((group, group))
self.lookup.sort()
self.offset = -1
self.lookup.append((txt, txt))
self.offset += 1
if self.offset >= len(self.lookup):
self.offset = 0
ret = self.lookup[self.offset]
self.thisfinal = ret[1]
return ret[0]
class WindowCompleter(AbstractCompleter):
def __init__(self, qtile: CommandObject) -> None:
self.qtile = qtile
self.thisfinal = None # type: str | None
self.lookup = None # type: list[tuple[str, str]] | None
self.offset = -1
def actual(self) -> str | None:
"""Returns the current actual value"""
return self.thisfinal
def reset(self) -> None:
self.lookup = None
self.offset = -1
def complete(self, txt: str, _aliases: dict[str, str] | None = None) -> str:
"""Returns the next completion for txt, or None if there is no completion"""
if self.lookup is None:
self.lookup = []
for wid, window in self.qtile.windows_map.items():
if window.group and window.name.lower().startswith(txt):
self.lookup.append((window.name, wid))
self.lookup.sort()
self.offset = -1
self.lookup.append((txt, txt))
self.offset += 1
if self.offset >= len(self.lookup):
self.offset = 0
ret = self.lookup[self.offset]
self.thisfinal = ret[1]
return ret[0]
class CommandCompleter:
"""
Parameters
==========
_testing :
disables reloading of the lookup table to make testing possible.
"""
DEFAULTPATH = "/bin:/usr/bin:/usr/local/bin"
def __init__(self, qtile, _testing=False):
self.lookup = None # type: list[tuple[str, str]] | None
self.offset = -1
self.thisfinal = None # type: str | None
self._testing = _testing
def actual(self) -> str | None:
"""Returns the current actual value"""
return self.thisfinal
def executable(self, fpath: str):
return os.access(fpath, os.X_OK)
def reset(self) -> None:
self.lookup = None
self.offset = -1
def complete(self, txt: str, aliases: dict[str, str] | None = None) -> str:
"""Returns the next completion for txt, or None if there is no completion"""
if self.lookup is None:
# Lookup is a set of (display value, actual value) tuples.
self.lookup = []
if txt and txt[0] in "~/":
path = os.path.expanduser(txt)
if os.path.isdir(path):
files = glob.glob(os.path.join(path, "*"))
prefix = txt
else:
files = glob.glob(path + "*")
prefix = os.path.dirname(txt)
prefix = prefix.rstrip("/") or "/"
for f in files:
if self.executable(f):
display = os.path.join(prefix, os.path.basename(f))
if os.path.isdir(f):
display += "/"
self.lookup.append((display, f))
else:
dirs = os.environ.get("PATH", self.DEFAULTPATH).split(":")
for d in dirs:
try:
d = os.path.expanduser(d)
for cmd in glob.iglob(os.path.join(d, f"{txt}*")):
if self.executable(cmd):
self.lookup.append(
(os.path.basename(cmd), cmd),
)
except OSError:
pass
if aliases:
for alias in aliases:
if alias.startswith(txt):
self.lookup.append((alias, aliases[alias]))
self.lookup.sort()
self.offset = -1
self.lookup.append((txt, txt))
self.offset += 1
if self.offset >= len(self.lookup):
self.offset = 0
ret = self.lookup[self.offset]
self.thisfinal = ret[1]
return ret[0]
[docs]class Prompt(base._TextBox):
"""A widget that prompts for user input
Input should be started using the ``.start_input()`` method on this class.
"""
completers = {
"file": FileCompleter,
"qshell": QshCompleter,
"cmd": CommandCompleter,
"group": GroupCompleter,
"window": WindowCompleter,
None: NullCompleter,
}
defaults = [
("cursor", True, "Show a cursor"),
("cursorblink", 0.5, "Cursor blink rate. 0 to disable."),
("cursor_color", "bef098", "Color for the cursor and text over it."),
("prompt", "{prompt}: ", "Text displayed at the prompt"),
("record_history", True, "Keep a record of executed commands"),
("max_history", 100, "Commands to keep in history. 0 for no limit."),
("ignore_dups_history", False, "Don't store duplicates in history"),
(
"bell_style",
"audible",
"Alert at the begin/end of the command history. "
+ "Possible values: 'audible' (X11 only), 'visual' and None.",
),
("visual_bell_color", "ff0000", "Color for the visual bell (changes prompt background)."),
("visual_bell_time", 0.2, "Visual bell duration (in seconds)."),
]
def __init__(self, **config) -> None:
base._TextBox.__init__(self, "", **config)
self.add_defaults(Prompt.defaults)
self.active = False
self.completer = None # type: AbstractCompleter | None
self.aliases: dict[str, str] | None = None
# If history record is on, get saved history or create history record
if self.record_history:
self.history_path = os.path.join(utils.get_cache_dir(), "prompt_history")
if os.path.exists(self.history_path):
with open(self.history_path, "rb") as f:
try:
self.history = pickle.load(f)
if self.ignore_dups_history:
self._dedup_history()
except: # noqa: E722
# unfortunately, pickle doesn't wrap its errors, so we
# can't detect what's a pickle error and what's not.
logger.exception("failed to load prompt history")
self.history = {
x: deque(maxlen=self.max_history) for x in self.completers
}
# self.history of size does not match.
if len(self.history) != len(self.completers):
self.history = {
x: deque(maxlen=self.max_history) for x in self.completers
}
if self.max_history != self.history[list(self.history)[0]].maxlen:
self.history = {
x: deque(self.history[x], self.max_history) for x in self.completers
}
else:
self.history = {x: deque(maxlen=self.max_history) for x in self.completers}
def _configure(self, qtile, bar) -> None:
self.markup = True
base._TextBox._configure(self, qtile, bar)
def f(win):
if self.active and not win == self.bar.window:
self._unfocus()
hook.subscribe.client_focus(f)
# Define key handlers (action to do when a specific key is hit)
keyhandlers = {
"Tab": self._trigger_complete,
"BackSpace": self._delete_char(),
"Delete": self._delete_char(False),
"KP_Delete": self._delete_char(False),
"Escape": self._unfocus,
"Return": self._send_cmd,
"KP_Enter": self._send_cmd,
"Up": self._get_prev_cmd,
"KP_Up": self._get_prev_cmd,
"Down": self._get_next_cmd,
"KP_Down": self._get_next_cmd,
"Left": self._move_cursor(),
"KP_Left": self._move_cursor(),
"Right": self._move_cursor("right"),
"KP_Right": self._move_cursor("right"),
}
self.keyhandlers = {qtile.core.keysym_from_name(k): v for k, v in keyhandlers.items()}
printables = {x: self._write_char for x in range(127) if chr(x) in string.printable}
self.keyhandlers.update(printables)
self.tab = qtile.core.keysym_from_name("Tab")
self.bell_style: str
if self.bell_style == "audible" and qtile.core.name != "x11":
self.bell_style = "visual"
logger.warning("Prompt widget only supports audible bell under X11")
if self.bell_style == "visual":
self.original_background = self.background
def start_input(
self,
prompt,
callback,
complete=None,
strict_completer=False,
allow_empty_input=False,
aliases: dict[str, str] | None = None,
) -> None:
"""Run the prompt
Displays a prompt and starts to take one line of keyboard input from
the user. When done, calls the callback with the input string as
argument. If history record is enabled, also allows to browse between
previous commands with ↑ and ↓, and execute them (untouched or
modified). When history is exhausted, fires an alert. It tries to
mimic, in some way, the shell behavior.
Parameters
==========
complete :
Tab-completion. Can be None, "cmd", "file", "group", "qshell" or
"window".
prompt :
text displayed at the prompt, e.g. "spawn: "
callback :
function to call with returned value.
complete :
completer to use.
strict_completer :
When True the return value wil be the exact completer result where
available.
allow_empty_input :
When True, an empty value will still call the callback function
aliases :
Dictionary mapping aliases to commands. If the entered command is a key in
this dict, the command it maps to will be executed instead.
"""
if self.cursor and self.cursorblink and not self.active:
self.timeout_add(self.cursorblink, self._blink)
self.display = self.prompt.format(prompt=prompt)
self.display = pangocffi.markup_escape_text(self.display)
self.active = True
self.user_input = ""
self.archived_input = ""
self.show_cursor = self.cursor
self.cursor_position = 0
self.callback = callback
self.aliases = aliases
self.completer = self.completers[complete](self.qtile)
self.strict_completer = strict_completer
self.allow_empty_input = allow_empty_input
self._update()
self.bar.widget_grab_keyboard(self)
if self.record_history:
self.completer_history = self.history[complete]
self.position = len(self.completer_history)
def calculate_length(self) -> int:
if self.text:
width = min(self.layout.width, self.bar.width) + self.actual_padding * 2
return width
else:
return 0
def _blink(self) -> None:
self.show_cursor = not self.show_cursor
self._update()
if self.active:
self.timeout_add(self.cursorblink, self._blink)
def _highlight_text(self, text) -> str:
color = utils.hex(self.cursor_color)
text = f'<span foreground="{color}">{text}</span>'
if self.show_cursor:
text = f"<u>{text}</u>"
return text
def _update(self) -> None:
if self.active:
self.text = self.archived_input or self.user_input
cursor = pangocffi.markup_escape_text(" ")
if self.cursor_position < len(self.text):
txt1 = self.text[: self.cursor_position]
txt2 = self.text[self.cursor_position]
txt3 = self.text[self.cursor_position + 1 :]
for text in (txt1, txt2, txt3):
text = pangocffi.markup_escape_text(text)
txt2 = self._highlight_text(txt2)
self.text = f"{txt1}{txt2}{txt3}{cursor}"
else:
self.text = pangocffi.markup_escape_text(self.text)
self.text += self._highlight_text(cursor)
self.text = self.display + self.text
else:
self.text = ""
self.bar.draw()
def _trigger_complete(self) -> None:
# Trigger the auto completion in user input
assert self.completer is not None
self.user_input = self.completer.complete(self.user_input, self.aliases)
self.cursor_position = len(self.user_input)
def _history_to_input(self) -> None:
# Move actual command (when exploring history) to user input and update
# history position (right after the end)
if self.archived_input:
self.user_input = self.archived_input
self.archived_input = ""
self.position = len(self.completer_history)
def _insert_before_cursor(self, charcode) -> None:
# Insert a character (given their charcode) in input, before the cursor
txt1 = self.user_input[: self.cursor_position]
txt2 = self.user_input[self.cursor_position :]
self.user_input = txt1 + chr(charcode) + txt2
self.cursor_position += 1
def _delete_char(self, backspace=True):
# Return a function that deletes character from the input text.
# If backspace is True, function will emulate backspace, else Delete.
def f():
self._history_to_input()
step = -1 if backspace else 0
if not backspace and self.cursor_position == len(self.user_input):
self._alert()
elif len(self.user_input) > 0 and self.cursor_position + step > -1:
txt1 = self.user_input[: self.cursor_position + step]
txt2 = self.user_input[self.cursor_position + step + 1 :]
self.user_input = txt1 + txt2
if step:
self.cursor_position += step
else:
self._alert()
return f
def _write_char(self):
# Add pressed (legal) char key to user input.
# No LookupString in XCB... oh, the shame! Unicode users beware!
self._history_to_input()
self._insert_before_cursor(self.key)
def _unfocus(self):
# Remove focus from the widget
self.active = False
self._update()
self.bar.widget_ungrab_keyboard()
def _send_cmd(self):
# Send the prompted text for execution
self._unfocus()
if self.strict_completer:
self.user_input = self.actual_value or self.user_input
del self.actual_value
self._history_to_input()
if self.user_input or self.allow_empty_input:
# If history record is activated, also save command in history
if self.record_history:
# ensure no dups in history
if self.ignore_dups_history and (self.user_input in self.completer_history):
self.completer_history.remove(self.user_input)
self.position -= 1
self.completer_history.append(self.user_input)
if self.position < self.max_history:
self.position += 1
os.makedirs(os.path.dirname(self.history_path), exist_ok=True)
with open(self.history_path, mode="wb") as f:
pickle.dump(self.history, f, protocol=2)
self.callback(self.user_input)
def _alert(self):
# Fire an alert (audible or visual), if bell style is not None.
if self.bell_style == "audible":
self.qtile.core.conn.conn.core.Bell(0)
elif self.bell_style == "visual":
self.background = self.visual_bell_color
self.timeout_add(self.visual_bell_time, self._stop_visual_alert)
def _stop_visual_alert(self):
self.background = self.original_background
self._update()
def _get_prev_cmd(self):
# Get the previous command in history.
# If there isn't more previous commands, ring system bell
if self.record_history:
if not self.position:
self._alert()
else:
self.position -= 1
self.archived_input = self.completer_history[self.position]
self.cursor_position = len(self.archived_input)
def _get_next_cmd(self):
# Get the next command in history.
# If the last command was already reached, ring system bell.
if self.record_history:
if self.position == len(self.completer_history):
self._alert()
elif self.position < len(self.completer_history):
self.position += 1
if self.position == len(self.completer_history):
self.archived_input = ""
else:
self.archived_input = self.completer_history[self.position]
self.cursor_position = len(self.archived_input)
def _cursor_to_left(self):
# Move cursor to left, if possible
if self.cursor_position:
self.cursor_position -= 1
else:
self._alert()
def _cursor_to_right(self):
# move cursor to right, if possible
command = self.archived_input or self.user_input
if self.cursor_position < len(command):
self.cursor_position += 1
else:
self._alert()
def _move_cursor(self, direction="left"):
# Move the cursor to left or right, according to direction
if direction == "left":
return self._cursor_to_left
elif direction == "right":
return self._cursor_to_right
def _get_keyhandler(self, k):
# Return the action (a function) to do according the pressed key (k).
self.key = k
if k in self.keyhandlers:
if k != self.tab:
self.actual_value = self.completer.actual()
self.completer.reset()
return self.keyhandlers[k]
def process_key_press(self, keysym: int):
"""Key press handler for the minibuffer.
Currently only supports ASCII characters.
"""
handle_key = self._get_keyhandler(keysym)
if handle_key:
handle_key()
del self.key
self._update()
[docs] @expose_command()
def fake_keypress(self, key: str) -> None:
self.process_key_press(self.qtile.core.keysym_from_name(key))
[docs] @expose_command()
def info(self):
"""Returns a dictionary of info for this object"""
return dict(
name=self.name,
width=self.width,
text=self.text,
active=self.active,
)
[docs] @expose_command()
def exec_general(self, prompt, object_name, cmd_name, selector=None, completer=None):
"""
Execute a cmd of any object. For example layout, group, window, widget
, etc with a string that is obtained from start_input.
Parameters
==========
prompt :
Text displayed at the prompt.
object_name :
Name of a object in Qtile. This string has to be 'layout', 'widget',
'bar', 'window' or 'screen'.
cmd_name :
Execution command of selected object using object_name and selector.
selector :
This value select a specific object within a object list that is
obtained by object_name.
If this value is None, current object is selected. e.g. current layout,
current window and current screen.
completer:
Completer to use.
config example:
Key([alt, 'shift'], 'a',
lazy.widget['prompt'].exec_general(
'section(add)',
'layout',
'add_section'))
"""
try:
obj = self.qtile.select([(object_name, selector)])
except SelectError:
logger.warning("cannot select a object")
return
cmd = obj.command(cmd_name)
if not cmd:
logger.warning("command not found")
return
def f(args):
if args:
cmd(args)
self.start_input(prompt, f, completer)
def _dedup_history(self):
"""Filter the history deque, clearing all duplicate values."""
self.history = {x: self._dedup_deque(self.history[x]) for x in self.completers}
def _dedup_deque(self, dq):
return deque(_LastUpdatedOrdereddict.fromkeys(dq))
class _LastUpdatedOrdereddict(dict):
"""Store items in the order the keys were last added."""
def __setitem__(self, key, value):
if key in self:
del self[key]
super().__setitem__(key, value)