Test Failed
Pull Request — master (#421)
by Daniel
01:18
created

pynvim.msgpack_rpc.session   A

Complexity

Total Complexity 32

Size/Duplication

Total Lines 240
Duplicated Lines 0 %

Test Coverage

Coverage 77.14%

Importance

Changes 0
Metric Value
eloc 150
dl 0
loc 240
rs 9.84
c 0
b 0
f 0
ccs 108
cts 140
cp 0.7714
wmc 32

15 Methods

Rating   Name   Duplication   Size   Complexity  
A Session._blocking_request() 0 11 1
A Session._enqueue_notification_and_stop() 0 3 1
A Session.close() 0 3 1
A Session.__init__() 0 9 1
A Session.threadsafe_call() 0 14 2
A Session._on_notification() 0 14 2
A Session._enqueue_notification() 0 2 1
A Session._on_request() 0 21 3
A Session._yielding_request() 0 11 1
A Session._enqueue_request_and_stop() 0 3 1
B Session.run() 0 40 6
A Session._enqueue_request() 0 2 1
A Session.stop() 0 3 1
A Session.next_message() 0 14 4
B Session.request() 0 39 6
1
"""Synchronous msgpack-rpc session layer."""
2 6
import logging
3 6
import threading
4 6
from collections import deque
5 6
from traceback import format_exc
6
7 6
import greenlet
8
9 6
from ..compat import check_async
10
11 6
logger = logging.getLogger(__name__)
12 6
error, debug, info, warn = (logger.error, logger.debug, logger.info,
13
                            logger.warning,)
14
15
16 6
class Session(object):
17
18
    """Msgpack-rpc session layer that uses coroutines for a synchronous API.
19
20
    This class provides the public msgpack-rpc API required by this library.
21
    It uses the greenlet module to handle requests and notifications coming
22
    from Nvim with a synchronous API.
23
    """
24
25 6
    def __init__(self, async_session):
26
        """Wrap `async_session` on a synchronous msgpack-rpc interface."""
27 6
        self._async_session = async_session
28 6
        self._request_cb = self._notification_cb = None
29 6
        self._pending_messages = deque()
30 6
        self._is_running = False
31 6
        self._setup_exception = None
32 6
        self.loop = async_session.loop
33 6
        self._loop_thread = None
34
35 6
    def threadsafe_call(self, fn, *args, **kwargs):
36
        """Wrapper around `AsyncSession.threadsafe_call`."""
37 6
        def handler():
38 6
            try:
39 6
                fn(*args, **kwargs)
40
            except Exception:
41
                warn("error caught while excecuting async callback\n%s\n",
42
                     format_exc())
43
44 6
        def greenlet_wrapper():
45 6
            gr = greenlet.greenlet(handler)
46 6
            gr.switch()
47
48 6
        self._async_session.threadsafe_call(greenlet_wrapper)
49
50 6
    def next_message(self):
51
        """Block until a message(request or notification) is available.
52
53
        If any messages were previously enqueued, return the first in queue.
54
        If not, run the event loop until one is received.
55
        """
56 6
        if self._is_running:
57 6
            raise Exception('Event loop already running')
58 6
        if self._pending_messages:
59 6
            return self._pending_messages.popleft()
60 6
        self._async_session.run(self._enqueue_request_and_stop,
61
                                self._enqueue_notification_and_stop)
62 6
        if self._pending_messages:
63 6
            return self._pending_messages.popleft()
64
65 6
    def request(self, method, *args, **kwargs):
66
        """Send a msgpack-rpc request and block until as response is received.
67
68
        If the event loop is running, this method must have been called by a
69
        request or notification handler running on a greenlet. In that case,
70
        send the quest and yield to the parent greenlet until a response is
71
        available.
72
73
        When the event loop is not running, it will perform a blocking request
74
        like this:
75
        - Send the request
76
        - Run the loop until the response is available
77
        - Put requests/notifications received while waiting into a queue
78
79
        If the `async_` flag is present and True, a asynchronous notification
80
        is sent instead. This will never block, and the return value or error
81
        is ignored.
82
        """
83 6
        async_ = check_async(kwargs.pop('async_', None), kwargs, False)
84 6
        if async_:
85 6
            self._async_session.notify(method, args)
86 6
            return
87
88 6
        if kwargs:
89
            raise ValueError("request got unsupported keyword argument(s): {}"
90
                             .format(', '.join(kwargs.keys())))
91
92 6
        if self._is_running:
93 6
            v = self._yielding_request(method, args)
94
        else:
95 6
            v = self._blocking_request(method, args)
96 6
        if not v:
97
            # EOF
98
            raise IOError('EOF')
99 6
        err, rv = v
100 6
        if err:
101 6
            info("'Received error: %s", err)
102 6
            raise self.error_wrapper(err)
103 6
        return rv
104
105 6
    def run(self, request_cb, notification_cb, setup_cb=None):
106
        """Run the event loop to receive requests and notifications from Nvim.
107
108
        Like `AsyncSession.run()`, but `request_cb` and `notification_cb` are
109
        inside greenlets.
110
        """
111 6
        self._request_cb = request_cb
112 6
        self._notification_cb = notification_cb
113 6
        self._is_running = True
114 6
        self._setup_exception = None
115 6
        self._loop_thread = threading.current_thread()
116
117 6
        def on_setup():
118 6
            try:
119 6
                setup_cb()
120
            except Exception as e:
121
                self._setup_exception = e
122
                self.stop()
123
124 6
        if setup_cb:
125
            # Create a new greenlet to handle the setup function
126 6
            gr = greenlet.greenlet(on_setup)
127 6
            gr.switch()
128
129 6
        if self._setup_exception:
130
            error('Setup error: {}'.format(self._setup_exception))
131
            raise self._setup_exception
132
133
        # Process all pending requests and notifications
134 6
        while self._pending_messages:
135
            msg = self._pending_messages.popleft()
136
            getattr(self, '_on_{}'.format(msg[0]))(*msg[1:])
137 6
        self._async_session.run(self._on_request, self._on_notification)
138 6
        self._is_running = False
139 6
        self._request_cb = None
140 6
        self._notification_cb = None
141 6
        self._loop_thread = None
142
143 6
        if self._setup_exception:
144
            raise self._setup_exception
145
146 6
    def stop(self):
147
        """Stop the event loop."""
148 6
        self._async_session.stop()
149
150 6
    def close(self):
151
        """Close the event loop."""
152
        self._async_session.close()
153
154 6
    def _yielding_request(self, method, args):
155 6
        gr = greenlet.getcurrent()
156 6
        parent = gr.parent
157
158 6
        def response_cb(err, rv):
159 6
            debug('response is available for greenlet %s, switching back', gr)
160 6
            gr.switch(err, rv)
161
162 6
        self._async_session.request(method, args, response_cb)
163 6
        debug('yielding from greenlet %s to wait for response', gr)
164 6
        return parent.switch()
165
166 6
    def _blocking_request(self, method, args):
167 6
        result = []
168
169 6
        def response_cb(err, rv):
170 6
            result.extend([err, rv])
171 6
            self.stop()
172
173 6
        self._async_session.request(method, args, response_cb)
174 6
        self._async_session.run(self._enqueue_request,
175
                                self._enqueue_notification)
176 6
        return result
177
178 6
    def _enqueue_request_and_stop(self, name, args, response):
179
        self._enqueue_request(name, args, response)
180
        self.stop()
181
182 6
    def _enqueue_notification_and_stop(self, name, args):
183 6
        self._enqueue_notification(name, args)
184 6
        self.stop()
185
186 6
    def _enqueue_request(self, name, args, response):
187
        self._pending_messages.append(('request', name, args, response,))
188
189 6
    def _enqueue_notification(self, name, args):
190 6
        self._pending_messages.append(('notification', name, args,))
191
192 6
    def _on_request(self, name, args, response):
193 6
        def handler():
194 6
            try:
195 6
                rv = self._request_cb(name, args)
196 6
                debug('greenlet %s finished executing, '
197
                      + 'sending %s as response', gr, rv)
198 6
                response.send(rv)
199
            except ErrorResponse as err:
200
                warn("error response from request '%s %s': %s", name,
201
                     args, format_exc())
202
                response.send(err.args[0], error=True)
203
            except Exception as err:
204
                warn("error caught while processing request '%s %s': %s", name,
205
                     args, format_exc())
206
                response.send(repr(err) + "\n" + format_exc(5), error=True)
207 6
            debug('greenlet %s is now dying...', gr)
208
209
        # Create a new greenlet to handle the request
210 6
        gr = greenlet.greenlet(handler)
211 6
        debug('received rpc request, greenlet %s will handle it', gr)
212 6
        gr.switch()
213
214 6
    def _on_notification(self, name, args):
215
        def handler():
216
            try:
217
                self._notification_cb(name, args)
218
                debug('greenlet %s finished executing', gr)
219
            except Exception:
220
                warn("error caught while processing notification '%s %s': %s",
221
                     name, args, format_exc())
222
223
            debug('greenlet %s is now dying...', gr)
224
225
        gr = greenlet.greenlet(handler)
226
        debug('received rpc notification, greenlet %s will handle it', gr)
227
        gr.switch()
228
229
230 6
class ErrorResponse(BaseException):
231
232
    """Raise this in a request handler to respond with a given error message.
233
234
    Unlike when other exceptions are caught, this gives full control off the
235
    error response sent. When "ErrorResponse(msg)" is caught "msg" will be
236
    sent verbatim as the error response.No traceback will be appended.
237
    """
238
239
    pass
240