Completed
Push — master ( 858ae1...f1c0c2 )
by Stephan
39s
created

KeyBinder.handle_keypress()   A

Complexity

Conditions 2

Size

Total Lines 17

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 2
dl 0
loc 17
rs 9.4285
c 1
b 0
f 0
1
"""Xlib-based global hotkey-binding code"""
2
3
__author__ = "Stephan Sokolow (deitarion/SSokolow)"
4
__license__ = "GNU GPL 2.0 or later"
5
6
import logging
7
8
import gobject, gtk
9
from Xlib import X
10
from Xlib.display import Display
11
from Xlib.error import BadAccess, DisplayConnectionError
12
from Xlib.protocol.event import KeyPress as XKeyPress
13
14
from .util import powerset, XInitError
15
16
# Allow MyPy to work without depending on the `typing` package
17
# (And silence complaints from only using the imported types in comments)
18
try:
19
    # pylint: disable=unused-import
20
    from typing import (Any, Callable, Dict, Iterable, Iterator, List,  # NOQA
21
                        Optional, Sequence, Sized, Tuple)
22
23
    from Xlib.error import XError          # NOQA
24
    from .commands import CommandRegistry  # NOQA
25
    from .wm import WindowManager          # NOQA
26
    from .util import CommandCB            # NOQA
27
except:  # pylint: disable=bare-except
28
    pass
29
30
class KeyBinder(object):
31
    """A convenience class for wrapping C{XGrabKey}."""
32
33
    #: @todo: Figure out how to set the modifier mask in X11 and use
34
    #:        C{gtk.accelerator_get_default_mod_mask()} to feed said code.
35
    ignored_modifiers = ['Mod2Mask', 'LockMask']
36
37
    #: Used to pass state from L{cb_xerror}
38
    keybind_failed = False
39
40
    def __init__(self, xdisplay=None):  # type: (Optional[Display]) -> None
41
        """Connect to X11 and the Glib event loop.
42
43
        @param xdisplay: A C{python-xlib} display handle.
44
        @type xdisplay: C{Xlib.display.Display}
45
        """
46
        try:
47
            self.xdisp = xdisplay or Display()
48
        except (UnicodeDecodeError, DisplayConnectionError), err:
49
            raise XInitError("python-xlib failed with %s when asked to open"
50
                             " a connection to the X server. Cannot bind keys."
51
                             "\n\tIt's unclear why this happens, but it is"
52
                             " usually fixed by deleting your ~/.Xauthority"
53
                             " file and rebooting."
54
                             % err.__class__.__name__)
55
56
        self.xroot = self.xdisp.screen().root
57
        self._keys = {}  # type: Dict[Tuple[int, int], Callable]
58
59
        # Resolve these at runtime to avoid NameErrors
60
        self._ignored_modifiers = [getattr(X, name) for name in
61
                                   self.ignored_modifiers]  # type: List[int]
62
63
        # We want to receive KeyPress events
64
        self.xroot.change_attributes(event_mask=X.KeyPressMask)
65
66
        # Set up a handler to catch XGrabKey() failures
67
        self.xdisp.set_error_handler(self.cb_xerror)
68
69
        # Merge python-xlib into the Glib event loop
70
        # Source: http://www.pygtk.org/pygtk2tutorial/sec-MonitoringIO.html
71
        gobject.io_add_watch(self.xroot.display,
72
                             gobject.IO_IN, self.cb_xevent)
73
74
    def bind(self, accel, callback):  # type: (str, Callable[[], None]) -> bool
75
        """Bind a global key combination to a callback.
76
77
        @param accel: An accelerator as either a string to be parsed by
78
            C{gtk.accelerator_parse()} or a tuple as returned by it.)
79
        @param callback: The function to call when the key is pressed.
80
81
        @type accel: C{str} or C{(int, gtk.gdk.ModifierType)} or C{(int, int)}
82
        @type callback: C{function}
83
84
        @returns: A boolean indicating whether the provided keybinding was
85
            parsed successfully. (But not whether it was registered
86
            successfully due to the asynchronous nature of the C{XGrabKey}
87
            request.)
88
        @rtype: C{bool}
89
        """
90
        keycode, modmask = self.parse_accel(accel)
91
        if keycode is None or modmask is None:
92
            return False
93
94
        # Ignore modifiers like Mod2 (NumLock) and Lock (CapsLock)
95
        self._keys[(keycode, 0)] = callback  # Null modifiers seem to be a risk
96
        for mmask in self._vary_modmask(modmask, self._ignored_modifiers):
97
            self._keys[(keycode, mmask)] = callback
98
            self.xroot.grab_key(keycode, mmask,
99
                                1, X.GrabModeAsync, X.GrabModeAsync)
100
101
        # If we don't do this, then nothing works.
102
        # I assume it flushes the XGrabKey calls to the server.
103
        self.xdisp.sync()
104
105
        # React to any cb_xerror that might have resulted from xdisp.sync()
106
        if self.keybind_failed:
107
            self.keybind_failed = False
108
            logging.warning("Failed to bind key. It may already be in use: %s",
109
                            accel)
110
            return False
111
112
        return True
113
114
    def cb_xerror(self, err, _):  # type: (XError, Any) -> None
115
        """Used to identify when attempts to bind keys fail.
116
        @note: If you can make python-xlib's C{CatchError} actually work or if
117
               you can retrieve more information to show, feel free.
118
        """
119
        if isinstance(err, BadAccess):
120
            self.keybind_failed = True
121
        else:
122
            self.xdisp.display.default_error_handler(err)
123
124
    def cb_xevent(self, src, cond, handle=None):  # pylint: disable=W0613
125
        # type: (Any, Any, Optional[Display]) -> bool
126
        """Callback to dispatch X events to more specific handlers.
127
128
        @rtype: C{True}
129
130
        @todo: Make sure uncaught exceptions are prevented from making
131
            quicktile unresponsive in the general case.
132
        """
133
        handle = handle or self.xroot.display
134
135
        for _ in range(0, handle.pending_events()):
136
            xevent = handle.next_event()
137
            if xevent.type == X.KeyPress:
138
                self.handle_keypress(xevent)
139
140
        # Necessary for proper function
141
        return True
142
143
    def handle_keypress(self, xevent):  # type: (XKeyPress) -> None
144
        """Dispatch C{XKeyPress} events to their callbacks."""
145
        keysig = (xevent.detail, xevent.state)
146
        if keysig not in self._keys:
147
            logging.error("Received an event for an unrecognized keybind: "
148
                          "%s, %s", xevent.detail, xevent.state)
149
            return
150
151
        # Display a meaningful debug message
152
        # FIXME: Only call this code if --debug
153
        # FIXME: Proper "index" arg for keycode_to_keysym
154
        keysym = self.xdisp.keycode_to_keysym(keysig[0], 0)
155
        kbstr = gtk.accelerator_name(keysym, keysig[1]) # pylint: disable=E1101
156
        logging.debug("Received keybind: %s", kbstr)
157
158
        # Call the associated callback
159
        self._keys[keysig]()
160
161
    def parse_accel(self, accel  # type: str
162
                    ):  # type: (...) -> Tuple[Optional[int], Optional[int]]
163
        """Convert an accelerator string into the form XGrabKey needs."""
164
165
        keysym, modmask = gtk.accelerator_parse(accel)
166
        if not gtk.accelerator_valid(keysym, modmask):  # pylint: disable=E1101
167
            logging.error("Invalid keybinding: %s", accel)
168
            return None, None
169
170
        if modmask > 2**16 - 1:
171
            logging.error("Modifier out of range for XGrabKey "
172
                          "(int(modmask) > 65535). "
173
                          "Did you use <Super> instead of <Mod4>?")
174
            return None, None
175
176
        # Convert to what XGrabKey expects
177
        keycode = self.xdisp.keysym_to_keycode(keysym)
178
        if isinstance(modmask, gtk.gdk.ModifierType):
179
            modmask = modmask.real
180
181
        return keycode, modmask
182
183
    @staticmethod
184
    def _vary_modmask(modmask, ignored):
185
        # type: (int, Sequence[int]) -> Iterator[int]
186
        """Generate all possible variations on C{modmask} that need to be
187
        taken into consideration if we can't properly ignore the modifiers in
188
        C{ignored}. (Typically NumLock and CapsLock)
189
190
        @param modmask: A bitfield to be combinatorically grown.
191
        @param ignored: Modifiers to be combined with C{modmask}.
192
193
        @type modmask: C{int} or C{gtk.gdk.ModifierType}
194
        @type ignored: C{list(int)}
195
196
        @rtype: generator of C{type(modmask)}
197
        """
198
199
        for ignored in powerset(ignored):
200
            imask = reduce(lambda x, y: x | y, ignored, 0)
201
            yield modmask | imask
202
203
def init(modmask,   # type: Optional[str]
204
         mappings,  # type: Dict[str, CommandCB]
205
         commands,  # type: CommandRegistry
206
         winman     # type: WindowManager
207
         ):         # type: (...) -> Optional[KeyBinder]
208
    """Initialize the keybinder and bind the requested mappings"""
209
    # Allow modmask to be empty for keybinds which don't share a common prefix
210
    if not modmask or modmask.lower() == 'none':
211
        modmask = ''
212
213
    try:
214
        keybinder = KeyBinder()
215
    except XInitError as err:
216
        logging.error("%s", err)
217
        return None
218
    else:
219
        # TODO: Take a mapping dict with pre-modmasked keys
220
        #       and pre-closured commands
221
        for key, func in mappings.items():
222
            def call(func=func):
223
                """Closure to resolve `func` and call it on a
224
                   `WindowManager` instance"""
225
                commands.call(func, winman)
226
227
            keybinder.bind(modmask + key, call)
228
    return keybinder
229