Completed
Push — master ( 320cb7...016099 )
by Björn
67:09 queued 42:08
created

AsyncSession.run()   A

Complexity

Conditions 1

Size

Total Lines 12

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 6
CRAP Score 1

Importance

Changes 0
Metric Value
cc 1
dl 0
loc 12
ccs 6
cts 6
cp 1
crap 1
rs 9.4285
c 0
b 0
f 0
1
"""Asynchronous msgpack-rpc handling in the event loop pipeline."""
2 5
import logging
3 5
from traceback import format_exc
4
5
6 5
logger = logging.getLogger(__name__)
7 5
debug, info, warn = (logger.debug, logger.info, logger.warning,)
8
9
10 5
class AsyncSession(object):
11
12
    """Asynchronous msgpack-rpc layer that wraps a msgpack stream.
13
14
    This wraps the msgpack stream interface for reading/writing msgpack
15
    documents and exposes an interface for sending and receiving msgpack-rpc
16
    requests and notifications.
17
    """
18
19 5
    def __init__(self, msgpack_stream):
20
        """Wrap `msgpack_stream` on a msgpack-rpc interface."""
21 5
        self._msgpack_stream = msgpack_stream
22 5
        self._next_request_id = 1
23 5
        self._pending_requests = {}
24 5
        self._request_cb = self._notification_cb = None
25 5
        self._handlers = {
26
            0: self._on_request,
27
            1: self._on_response,
28
            2: self._on_notification
29
        }
30
31 5
    def threadsafe_call(self, fn):
32
        """Wrapper around `MsgpackStream.threadsafe_call`."""
33 5
        self._msgpack_stream.threadsafe_call(fn)
34
35 5
    def request(self, method, args, response_cb):
36
        """Send a msgpack-rpc request to Nvim.
37
38
        A msgpack-rpc with method `method` and argument `args` is sent to
39
        Nvim. The `response_cb` function is called with when the response
40
        is available.
41
        """
42 5
        request_id = self._next_request_id
43 5
        self._next_request_id = request_id + 1
44 5
        self._msgpack_stream.send([0, request_id, method, args])
45 5
        self._pending_requests[request_id] = response_cb
46
47 5
    def notify(self, method, args):
48
        """Send a msgpack-rpc notification to Nvim.
49
50
        A msgpack-rpc with method `method` and argument `args` is sent to
51
        Nvim. This will have the same effect as a request, but no response
52
        will be recieved
53
        """
54 5
        self._msgpack_stream.send([2, method, args])
55
56 5
    def run(self, request_cb, notification_cb):
57
        """Run the event loop to receive requests and notifications from Nvim.
58
59
        While the event loop is running, `request_cb` and `_notification_cb`
60
        will be called whenever requests or notifications are respectively
61
        available.
62
        """
63 5
        self._request_cb = request_cb
64 5
        self._notification_cb = notification_cb
65 5
        self._msgpack_stream.run(self._on_message)
66 5
        self._request_cb = None
67 5
        self._notification_cb = None
68
69 5
    def stop(self):
70
        """Stop the event loop."""
71 5
        self._msgpack_stream.stop()
72
73 5
    def close(self):
74 5
        """Close the event loop."""
75 5
        self._msgpack_stream.close()
76
77
    def _on_message(self, msg):
78
        try:
79
            self._handlers.get(msg[0], self._on_invalid_message)(msg)
80
        except Exception:
0 ignored issues
show
Best Practice introduced by
Catching very general exceptions such as Exception is usually not recommended.

Generally, you would want to handle very specific errors in the exception handler. This ensure that you do not hide other types of errors which should be fixed.

So, unless you specifically plan to handle any error, consider adding a more specific exception.

Loading history...
81 5
            err_str = format_exc(5)
82
            warn(err_str)
83
            self._msgpack_stream.send([1, 0, err_str, None])
84
85
    def _on_request(self, msg):
86 5
        # request
87 5
        #   - msg[1]: id
88
        #   - msg[2]: method name
89
        #   - msg[3]: arguments
90 5
        debug('received request: %s, %s', msg[2], msg[3])
91
        self._request_cb(msg[2], msg[3], Response(self._msgpack_stream,
92
                                                  msg[1]))
93
94
    def _on_response(self, msg):
95 5
        # response to a previous request:
96 5
        #   - msg[1]: the id
97
        #   - msg[2]: error(if any)
98 5
        #   - msg[3]: result(if not errored)
99
        debug('received response: %s, %s', msg[2], msg[3])
100
        self._pending_requests.pop(msg[1])(msg[2], msg[3])
101
102 5
    def _on_notification(self, msg):
103 5
        # notification/event
104
        #   - msg[1]: event name
105 5
        #   - msg[2]: arguments
106
        debug('received notification: %s, %s', msg[1], msg[2])
107
        self._notification_cb(msg[1], msg[2])
108
109
    def _on_invalid_message(self, msg):
110
        error = 'Received invalid message %s' % msg
111 5
        warn(error)
112
        self._msgpack_stream.send([1, 0, error, None])
113
114
115
class Response(object):
116
117
    """Response to a msgpack-rpc request that came from Nvim.
118
119 5
    When Nvim sends a msgpack-rpc request, an instance of this class is
120
    created for remembering state required to send a response.
121 5
    """
122 5
123
    def __init__(self, msgpack_stream, request_id):
124 5
        """Initialize the Response instance."""
125
        self._msgpack_stream = msgpack_stream
126
        self._request_id = request_id
127
128
    def send(self, value, error=False):
129 5
        """Send the response.
130
131
        If `error` is True, it will be sent as an error.
132 5
        """
133 5
        if error:
134 5
            resp = [1, self._request_id, value, None]
135
        else:
136
            resp = [1, self._request_id, None, value]
137
        debug('sending response to request %d: %s', self._request_id, resp)
138
        self._msgpack_stream.send(resp)
139