Test Failed
Pull Request — master (#421)
by Daniel
01:18
created

pynvim.plugin.host.Host._wrap_delayed_function()   A

Complexity

Conditions 4

Size

Total Lines 19
Code Lines 12

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 1
CRAP Score 16.0213

Importance

Changes 0
Metric Value
cc 4
eloc 12
nop 8
dl 0
loc 19
rs 9.8
c 0
b 0
f 0
ccs 1
cts 11
cp 0.0909
crap 16.0213

How to fix   Many Parameters   

Many Parameters

Methods with many parameters are not only hard to understand, but their parameters also often become inconsistent when you need more, or different data.

There are several approaches to avoid long parameter lists:

1
"""Implements a Nvim host for python plugins."""
2 6
import imp
3 6
import inspect
4 6
import logging
5 6
import os
6 6
import os.path
7 6
import re
8 6
from functools import partial
9 6
from traceback import format_exc
10
11 6
from . import script_host
12 6
from ..api import decode_if_bytes, walk
13 6
from ..compat import IS_PYTHON3, find_module
14 6
from ..msgpack_rpc import ErrorResponse
15 6
from ..util import format_exc_skip, get_client_info
16
17 6
__all__ = ('Host')
18
19 6
logger = logging.getLogger(__name__)
20 6
error, debug, info, warn = (logger.error, logger.debug, logger.info,
21
                            logger.warning,)
22
23 6
host_method_spec = {"poll": {}, "specs": {"nargs": 1}, "shutdown": {}}
24
25
26 6
class Host(object):
27
28
    """Nvim host for python plugins.
29
30
    Takes care of loading/unloading plugins and routing msgpack-rpc
31
    requests/notifications to the appropriate handlers.
32
    """
33
34 6
    def __init__(self, nvim):
35
        """Set handlers for plugin_load/plugin_unload."""
36 6
        self.nvim = nvim
37 6
        self._specs = {}
38 6
        self._loaded = {}
39 6
        self._load_errors = {}
40 6
        self._notification_handlers = {
41
            'nvim_error_event': self._on_error_event
42
        }
43 6
        self._request_handlers = {
44
            'poll': lambda: 'ok',
45
            'specs': self._on_specs_request,
46
            'shutdown': self.shutdown
47
        }
48
49
        # Decode per default for Python3
50 6
        self._decode_default = IS_PYTHON3
51
52 6
    def _on_async_err(self, msg):
53
        # uncaught python exception
54
        self.nvim.err_write(msg, async_=True)
55
56 6
    def _on_error_event(self, kind, msg):
57
        # error from nvim due to async request
58
        # like nvim.command(..., async_=True)
59
        errmsg = "{}: Async request caused an error:\n{}\n".format(
60
            self.name, decode_if_bytes(msg))
61
        self.nvim.err_write(errmsg, async_=True)
62
63 6
    def start(self, plugins):
64
        """Start listening for msgpack-rpc requests and notifications."""
65
        self.nvim.run_loop(self._on_request,
66
                           self._on_notification,
67
                           lambda: self._load(plugins),
68
                           err_cb=self._on_async_err)
69
70 6
    def shutdown(self):
71
        """Shutdown the host."""
72
        self._unload()
73
        self.nvim.stop_loop()
74
75 6
    def _wrap_delayed_function(self, cls, delayed_handlers, name, sync,
76
                               module_handlers, path, *args):
77
        # delete the delayed handlers to be sure
78
        for handler in delayed_handlers:
79
            method_name = handler._nvim_registered_name
80
            if handler._nvim_rpc_sync:
81
                del self._request_handlers[method_name]
82
            else:
83
                del self._notification_handlers[method_name]
84
        # create an instance of the plugin and pass the nvim object
85
        plugin = cls(self._configure_nvim_for(cls))
86
87
        # discover handlers in the plugin instance
88
        self._discover_functions(plugin, module_handlers, path, False)
89
90
        if sync:
91
            self._request_handlers[name](*args)
92
        else:
93
            self._notification_handlers[name](*args)
94
95 6
    def _wrap_function(self, fn, sync, decode, nvim_bind, name, *args):
96
        if decode:
97
            args = walk(decode_if_bytes, args, decode)
98
        if nvim_bind is not None:
99
            args.insert(0, nvim_bind)
100
        try:
101
            return fn(*args)
102
        except Exception:
103
            if sync:
104
                msg = ("error caught in request handler '{} {}':\n{}"
105
                       .format(name, args, format_exc_skip(1)))
106
                raise ErrorResponse(msg)
107
            else:
108
                msg = ("error caught in async handler '{} {}'\n{}\n"
109
                       .format(name, args, format_exc_skip(1)))
110
                self._on_async_err(msg + "\n")
111
112 6
    def _on_request(self, name, args):
113
        """Handle a msgpack-rpc request."""
114
        if IS_PYTHON3:
115
            name = decode_if_bytes(name)
116
        handler = self._request_handlers.get(name, None)
117
        if not handler:
118
            msg = self._missing_handler_error(name, 'request')
119
            error(msg)
120
            raise ErrorResponse(msg)
121
122
        debug('calling request handler for "%s", args: "%s"', name, args)
123
        rv = handler(*args)
124
        debug("request handler for '%s %s' returns: %s", name, args, rv)
125
        return rv
126
127 6
    def _on_notification(self, name, args):
128
        """Handle a msgpack-rpc notification."""
129
        if IS_PYTHON3:
130
            name = decode_if_bytes(name)
131
        handler = self._notification_handlers.get(name, None)
132
        if not handler:
133
            msg = self._missing_handler_error(name, 'notification')
134
            error(msg)
135
            self._on_async_err(msg + "\n")
136
            return
137
138
        debug('calling notification handler for "%s", args: "%s"', name, args)
139
        handler(*args)
140
141 6
    def _missing_handler_error(self, name, kind):
142
        msg = 'no {} handler registered for "{}"'.format(kind, name)
143
        pathmatch = re.match(r'(.+):[^:]+:[^:]+', name)
144
        if pathmatch:
145
            loader_error = self._load_errors.get(pathmatch.group(1))
146
            if loader_error is not None:
147
                msg = msg + "\n" + loader_error
148
        return msg
149
150 6
    def _load(self, plugins):
151 6
        has_script = False
152 6
        for path in plugins:
153
            err = None
154
            if path in self._loaded:
155
                error('{} is already loaded'.format(path))
156
                continue
157
            try:
158
                if path == "script_host.py":
159
                    module = script_host
160
                    has_script = True
161
                else:
162
                    directory, name = os.path.split(os.path.splitext(path)[0])
163
                    file, pathname, descr = find_module(name, [directory])
164
                    module = imp.load_module(name, file, pathname, descr)
165
                handlers = []
166
                self._discover_classes(module, handlers, path)
167
                self._discover_functions(module, handlers, path, False)
168
                if not handlers:
169
                    error('{} exports no handlers'.format(path))
170
                    continue
171
                self._loaded[path] = {'handlers': handlers, 'module': module}
172
            except Exception as e:
173
                err = ('Encountered {} loading plugin at {}: {}\n{}'
174
                       .format(type(e).__name__, path, e, format_exc(5)))
175
                error(err)
176
                self._load_errors[path] = err
177
178 6
        kind = ("script-host" if len(plugins) == 1 and has_script
179
                else "rplugin-host")
180 6
        self.nvim.api.set_client_info(
181
            *get_client_info(kind, 'host', host_method_spec),
182
            async_=True)
183
184 6
    def _unload(self):
185
        for path, plugin in self._loaded.items():
186
            handlers = plugin['handlers']
187
            for handler in handlers:
188
                method_name = handler._nvim_registered_name
189
                if hasattr(handler, '_nvim_shutdown_hook'):
190
                    handler()
191
                elif handler._nvim_rpc_sync:
192
                    del self._request_handlers[method_name]
193
                else:
194
                    del self._notification_handlers[method_name]
195
        self._specs = {}
196
        self._loaded = {}
197
198 6
    def _discover_classes(self, module, handlers, plugin_path):
199
        for _, cls in inspect.getmembers(module, inspect.isclass):
200
            if getattr(cls, '_nvim_plugin', False):
201
                # discover handlers in the plugin instance
202
                self._discover_functions(cls, handlers, plugin_path, True)
203
204 6
    def _discover_functions(self, obj, handlers, plugin_path, delay):
205
        def predicate(o):
206
            return hasattr(o, '_nvim_rpc_method_name')
207
208
        cls_handlers = []
209
        specs = []
210
        objdecode = getattr(obj, '_nvim_decode', self._decode_default)
211
        for _, fn in inspect.getmembers(obj, predicate):
212
            method = fn._nvim_rpc_method_name
213
            if fn._nvim_prefix_plugin_path:
214
                method = '{}:{}'.format(plugin_path, method)
215
            sync = fn._nvim_rpc_sync
216
            if delay:
217
                fn_wrapped = partial(self._wrap_delayed_function, obj,
218
                                     cls_handlers, method, sync,
219
                                     handlers, plugin_path)
220
            else:
221
                decode = getattr(fn, '_nvim_decode', objdecode)
222
                nvim_bind = None
223
                if fn._nvim_bind:
224
                    nvim_bind = self._configure_nvim_for(fn)
225
226
                fn_wrapped = partial(self._wrap_function, fn,
227
                                     sync, decode, nvim_bind, method)
228
            self._copy_attributes(fn, fn_wrapped)
229
            fn_wrapped._nvim_registered_name = method
230
            # register in the rpc handler dict
231
            if sync:
232
                if method in self._request_handlers:
233
                    raise Exception(('Request handler for "{}" is '
234
                                    + 'already registered').format(method))
235
                self._request_handlers[method] = fn_wrapped
236
            else:
237
                if method in self._notification_handlers:
238
                    raise Exception(('Notification handler for "{}" is '
239
                                    + 'already registered').format(method))
240
                self._notification_handlers[method] = fn_wrapped
241
            if hasattr(fn, '_nvim_rpc_spec'):
242
                specs.append(fn._nvim_rpc_spec)
243
            handlers.append(fn_wrapped)
244
            cls_handlers.append(fn_wrapped)
245
        if specs:
246
            self._specs[plugin_path] = specs
247
248 6
    def _copy_attributes(self, fn, fn2):
249
        # Copy _nvim_* attributes from the original function
250
        for attr in dir(fn):
251
            if attr.startswith('_nvim_'):
252
                setattr(fn2, attr, getattr(fn, attr))
253
254 6
    def _on_specs_request(self, path):
255
        if IS_PYTHON3:
256
            path = decode_if_bytes(path)
257
        if path in self._load_errors:
258
            self.nvim.out_write(self._load_errors[path] + '\n')
259
        return self._specs.get(path, 0)
260
261 6
    def _configure_nvim_for(self, obj):
262
        # Configure a nvim instance for obj (checks encoding configuration)
263
        nvim = self.nvim
264
        decode = getattr(obj, '_nvim_decode', self._decode_default)
265
        if decode:
266
            nvim = nvim.with_decode(decode)
267
        return nvim
268