Test Failed
Pull Request — master (#421)
by Daniel
01:18
created

pynvim.plugin.host   D

Complexity

Total Complexity 58

Size/Duplication

Total Lines 268
Duplicated Lines 0 %

Test Coverage

Coverage 25.14%

Importance

Changes 0
Metric Value
eloc 206
dl 0
loc 268
rs 4.5599
c 0
b 0
f 0
ccs 46
cts 183
cp 0.2514
wmc 58

17 Methods

Rating   Name   Duplication   Size   Complexity  
A Host._copy_attributes() 0 5 3
A Host.shutdown() 0 4 1
A Host._unload() 0 13 5
A Host.start() 0 6 2
A Host._configure_nvim_for() 0 7 2
A Host._wrap_function() 0 16 5
A Host._on_notification() 0 13 3
A Host._on_specs_request() 0 6 3
C Host._discover_functions() 0 43 10
A Host._missing_handler_error() 0 8 3
A Host._on_error_event() 0 6 1
A Host.__init__() 0 17 2
A Host._on_async_err() 0 3 1
B Host._load() 0 33 7
A Host._wrap_delayed_function() 0 19 4
A Host._on_request() 0 14 3
A Host._discover_classes() 0 5 3

How to fix   Complexity   

Complexity

Complex classes like pynvim.plugin.host often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""Implements a Nvim host for python plugins."""
2 6
import imp
3 6
import inspect
4 6
import logging
5 6
import os
6 6
import os.path
7 6
import re
8 6
from functools import partial
9 6
from traceback import format_exc
10
11 6
from . import script_host
12 6
from ..api import decode_if_bytes, walk
13 6
from ..compat import IS_PYTHON3, find_module
14 6
from ..msgpack_rpc import ErrorResponse
15 6
from ..util import format_exc_skip, get_client_info
16
17 6
__all__ = ('Host')
18
19 6
logger = logging.getLogger(__name__)
20 6
error, debug, info, warn = (logger.error, logger.debug, logger.info,
21
                            logger.warning,)
22
23 6
host_method_spec = {"poll": {}, "specs": {"nargs": 1}, "shutdown": {}}
24
25
26 6
class Host(object):
27
28
    """Nvim host for python plugins.
29
30
    Takes care of loading/unloading plugins and routing msgpack-rpc
31
    requests/notifications to the appropriate handlers.
32
    """
33
34 6
    def __init__(self, nvim):
35
        """Set handlers for plugin_load/plugin_unload."""
36 6
        self.nvim = nvim
37 6
        self._specs = {}
38 6
        self._loaded = {}
39 6
        self._load_errors = {}
40 6
        self._notification_handlers = {
41
            'nvim_error_event': self._on_error_event
42
        }
43 6
        self._request_handlers = {
44
            'poll': lambda: 'ok',
45
            'specs': self._on_specs_request,
46
            'shutdown': self.shutdown
47
        }
48
49
        # Decode per default for Python3
50 6
        self._decode_default = IS_PYTHON3
51
52 6
    def _on_async_err(self, msg):
53
        # uncaught python exception
54
        self.nvim.err_write(msg, async_=True)
55
56 6
    def _on_error_event(self, kind, msg):
57
        # error from nvim due to async request
58
        # like nvim.command(..., async_=True)
59
        errmsg = "{}: Async request caused an error:\n{}\n".format(
60
            self.name, decode_if_bytes(msg))
61
        self.nvim.err_write(errmsg, async_=True)
62
63 6
    def start(self, plugins):
64
        """Start listening for msgpack-rpc requests and notifications."""
65
        self.nvim.run_loop(self._on_request,
66
                           self._on_notification,
67
                           lambda: self._load(plugins),
68
                           err_cb=self._on_async_err)
69
70 6
    def shutdown(self):
71
        """Shutdown the host."""
72
        self._unload()
73
        self.nvim.stop_loop()
74
75 6
    def _wrap_delayed_function(self, cls, delayed_handlers, name, sync,
76
                               module_handlers, path, *args):
77
        # delete the delayed handlers to be sure
78
        for handler in delayed_handlers:
79
            method_name = handler._nvim_registered_name
80
            if handler._nvim_rpc_sync:
81
                del self._request_handlers[method_name]
82
            else:
83
                del self._notification_handlers[method_name]
84
        # create an instance of the plugin and pass the nvim object
85
        plugin = cls(self._configure_nvim_for(cls))
86
87
        # discover handlers in the plugin instance
88
        self._discover_functions(plugin, module_handlers, path, False)
89
90
        if sync:
91
            self._request_handlers[name](*args)
92
        else:
93
            self._notification_handlers[name](*args)
94
95 6
    def _wrap_function(self, fn, sync, decode, nvim_bind, name, *args):
96
        if decode:
97
            args = walk(decode_if_bytes, args, decode)
98
        if nvim_bind is not None:
99
            args.insert(0, nvim_bind)
100
        try:
101
            return fn(*args)
102
        except Exception:
103
            if sync:
104
                msg = ("error caught in request handler '{} {}':\n{}"
105
                       .format(name, args, format_exc_skip(1)))
106
                raise ErrorResponse(msg)
107
            else:
108
                msg = ("error caught in async handler '{} {}'\n{}\n"
109
                       .format(name, args, format_exc_skip(1)))
110
                self._on_async_err(msg + "\n")
111
112 6
    def _on_request(self, name, args):
113
        """Handle a msgpack-rpc request."""
114
        if IS_PYTHON3:
115
            name = decode_if_bytes(name)
116
        handler = self._request_handlers.get(name, None)
117
        if not handler:
118
            msg = self._missing_handler_error(name, 'request')
119
            error(msg)
120
            raise ErrorResponse(msg)
121
122
        debug('calling request handler for "%s", args: "%s"', name, args)
123
        rv = handler(*args)
124
        debug("request handler for '%s %s' returns: %s", name, args, rv)
125
        return rv
126
127 6
    def _on_notification(self, name, args):
128
        """Handle a msgpack-rpc notification."""
129
        if IS_PYTHON3:
130
            name = decode_if_bytes(name)
131
        handler = self._notification_handlers.get(name, None)
132
        if not handler:
133
            msg = self._missing_handler_error(name, 'notification')
134
            error(msg)
135
            self._on_async_err(msg + "\n")
136
            return
137
138
        debug('calling notification handler for "%s", args: "%s"', name, args)
139
        handler(*args)
140
141 6
    def _missing_handler_error(self, name, kind):
142
        msg = 'no {} handler registered for "{}"'.format(kind, name)
143
        pathmatch = re.match(r'(.+):[^:]+:[^:]+', name)
144
        if pathmatch:
145
            loader_error = self._load_errors.get(pathmatch.group(1))
146
            if loader_error is not None:
147
                msg = msg + "\n" + loader_error
148
        return msg
149
150 6
    def _load(self, plugins):
151 6
        has_script = False
152 6
        for path in plugins:
153
            err = None
154
            if path in self._loaded:
155
                error('{} is already loaded'.format(path))
156
                continue
157
            try:
158
                if path == "script_host.py":
159
                    module = script_host
160
                    has_script = True
161
                else:
162
                    directory, name = os.path.split(os.path.splitext(path)[0])
163
                    file, pathname, descr = find_module(name, [directory])
164
                    module = imp.load_module(name, file, pathname, descr)
165
                handlers = []
166
                self._discover_classes(module, handlers, path)
167
                self._discover_functions(module, handlers, path, False)
168
                if not handlers:
169
                    error('{} exports no handlers'.format(path))
170
                    continue
171
                self._loaded[path] = {'handlers': handlers, 'module': module}
172
            except Exception as e:
173
                err = ('Encountered {} loading plugin at {}: {}\n{}'
174
                       .format(type(e).__name__, path, e, format_exc(5)))
175
                error(err)
176
                self._load_errors[path] = err
177
178 6
        kind = ("script-host" if len(plugins) == 1 and has_script
179
                else "rplugin-host")
180 6
        self.nvim.api.set_client_info(
181
            *get_client_info(kind, 'host', host_method_spec),
182
            async_=True)
183
184 6
    def _unload(self):
185
        for path, plugin in self._loaded.items():
186
            handlers = plugin['handlers']
187
            for handler in handlers:
188
                method_name = handler._nvim_registered_name
189
                if hasattr(handler, '_nvim_shutdown_hook'):
190
                    handler()
191
                elif handler._nvim_rpc_sync:
192
                    del self._request_handlers[method_name]
193
                else:
194
                    del self._notification_handlers[method_name]
195
        self._specs = {}
196
        self._loaded = {}
197
198 6
    def _discover_classes(self, module, handlers, plugin_path):
199
        for _, cls in inspect.getmembers(module, inspect.isclass):
200
            if getattr(cls, '_nvim_plugin', False):
201
                # discover handlers in the plugin instance
202
                self._discover_functions(cls, handlers, plugin_path, True)
203
204 6
    def _discover_functions(self, obj, handlers, plugin_path, delay):
205
        def predicate(o):
206
            return hasattr(o, '_nvim_rpc_method_name')
207
208
        cls_handlers = []
209
        specs = []
210
        objdecode = getattr(obj, '_nvim_decode', self._decode_default)
211
        for _, fn in inspect.getmembers(obj, predicate):
212
            method = fn._nvim_rpc_method_name
213
            if fn._nvim_prefix_plugin_path:
214
                method = '{}:{}'.format(plugin_path, method)
215
            sync = fn._nvim_rpc_sync
216
            if delay:
217
                fn_wrapped = partial(self._wrap_delayed_function, obj,
218
                                     cls_handlers, method, sync,
219
                                     handlers, plugin_path)
220
            else:
221
                decode = getattr(fn, '_nvim_decode', objdecode)
222
                nvim_bind = None
223
                if fn._nvim_bind:
224
                    nvim_bind = self._configure_nvim_for(fn)
225
226
                fn_wrapped = partial(self._wrap_function, fn,
227
                                     sync, decode, nvim_bind, method)
228
            self._copy_attributes(fn, fn_wrapped)
229
            fn_wrapped._nvim_registered_name = method
230
            # register in the rpc handler dict
231
            if sync:
232
                if method in self._request_handlers:
233
                    raise Exception(('Request handler for "{}" is '
234
                                    + 'already registered').format(method))
235
                self._request_handlers[method] = fn_wrapped
236
            else:
237
                if method in self._notification_handlers:
238
                    raise Exception(('Notification handler for "{}" is '
239
                                    + 'already registered').format(method))
240
                self._notification_handlers[method] = fn_wrapped
241
            if hasattr(fn, '_nvim_rpc_spec'):
242
                specs.append(fn._nvim_rpc_spec)
243
            handlers.append(fn_wrapped)
244
            cls_handlers.append(fn_wrapped)
245
        if specs:
246
            self._specs[plugin_path] = specs
247
248 6
    def _copy_attributes(self, fn, fn2):
249
        # Copy _nvim_* attributes from the original function
250
        for attr in dir(fn):
251
            if attr.startswith('_nvim_'):
252
                setattr(fn2, attr, getattr(fn, attr))
253
254 6
    def _on_specs_request(self, path):
255
        if IS_PYTHON3:
256
            path = decode_if_bytes(path)
257
        if path in self._load_errors:
258
            self.nvim.out_write(self._load_errors[path] + '\n')
259
        return self._specs.get(path, 0)
260
261 6
    def _configure_nvim_for(self, obj):
262
        # Configure a nvim instance for obj (checks encoding configuration)
263
        nvim = self.nvim
264
        decode = getattr(obj, '_nvim_decode', self._decode_default)
265
        if decode:
266
            nvim = nvim.with_decode(decode)
267
        return nvim
268