AsyncSession.__init__()   A
last analyzed

Complexity

Conditions 1

Size

Total Lines 12
Code Lines 10

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 7
CRAP Score 1

Importance

Changes 0
Metric Value
cc 1
eloc 10
nop 2
dl 0
loc 12
rs 9.9
c 0
b 0
f 0
ccs 7
cts 7
cp 1
crap 1
1
"""Asynchronous msgpack-rpc handling in the event loop pipeline."""
2 6
import logging
3 6
from traceback import format_exc
4
5
6 6
logger = logging.getLogger(__name__)
7 6
debug, info, warn = (logger.debug, logger.info, logger.warning,)
8
9
10 6
class AsyncSession(object):
11
12
    """Asynchronous msgpack-rpc layer that wraps a msgpack stream.
13
14
    This wraps the msgpack stream interface for reading/writing msgpack
15
    documents and exposes an interface for sending and receiving msgpack-rpc
16
    requests and notifications.
17
    """
18
19 6
    def __init__(self, msgpack_stream):
20
        """Wrap `msgpack_stream` on a msgpack-rpc interface."""
21 6
        self._msgpack_stream = msgpack_stream
22 6
        self._next_request_id = 1
23 6
        self._pending_requests = {}
24 6
        self._request_cb = self._notification_cb = None
25 6
        self._handlers = {
26
            0: self._on_request,
27
            1: self._on_response,
28
            2: self._on_notification
29
        }
30 6
        self.loop = msgpack_stream.loop
31
32 6
    def threadsafe_call(self, fn):
33
        """Wrapper around `MsgpackStream.threadsafe_call`."""
34 6
        self._msgpack_stream.threadsafe_call(fn)
35
36 6
    def request(self, method, args, response_cb):
37
        """Send a msgpack-rpc request to Nvim.
38
39
        A msgpack-rpc with method `method` and argument `args` is sent to
40
        Nvim. The `response_cb` function is called with when the response
41
        is available.
42
        """
43 6
        request_id = self._next_request_id
44 6
        self._next_request_id = request_id + 1
45 6
        self._msgpack_stream.send([0, request_id, method, args])
46 6
        self._pending_requests[request_id] = response_cb
47
48 6
    def notify(self, method, args):
49
        """Send a msgpack-rpc notification to Nvim.
50
51
        A msgpack-rpc with method `method` and argument `args` is sent to
52
        Nvim. This will have the same effect as a request, but no response
53
        will be recieved
54
        """
55 6
        self._msgpack_stream.send([2, method, args])
56
57 6
    def run(self, request_cb, notification_cb):
58
        """Run the event loop to receive requests and notifications from Nvim.
59
60
        While the event loop is running, `request_cb` and `notification_cb`
61
        will be called whenever requests or notifications are respectively
62
        available.
63
        """
64 6
        self._request_cb = request_cb
65 6
        self._notification_cb = notification_cb
66 6
        self._msgpack_stream.run(self._on_message)
67 6
        self._request_cb = None
68 6
        self._notification_cb = None
69
70 6
    def stop(self):
71
        """Stop the event loop."""
72 6
        self._msgpack_stream.stop()
73
74 6
    def close(self):
75
        """Close the event loop."""
76
        self._msgpack_stream.close()
77
78 6
    def _on_message(self, msg):
79 6
        try:
80 6
            self._handlers.get(msg[0], self._on_invalid_message)(msg)
81
        except Exception:
82
            err_str = format_exc(5)
83
            warn(err_str)
84
            self._msgpack_stream.send([1, 0, err_str, None])
85
86 6
    def _on_request(self, msg):
87
        # request
88
        #   - msg[1]: id
89
        #   - msg[2]: method name
90
        #   - msg[3]: arguments
91 6
        debug('received request: %s, %s', msg[2], msg[3])
92 6
        self._request_cb(msg[2], msg[3], Response(self._msgpack_stream,
93
                                                  msg[1]))
94
95 6
    def _on_response(self, msg):
96
        # response to a previous request:
97
        #   - msg[1]: the id
98
        #   - msg[2]: error(if any)
99
        #   - msg[3]: result(if not errored)
100 6
        debug('received response: %s, %s', msg[2], msg[3])
101 6
        self._pending_requests.pop(msg[1])(msg[2], msg[3])
102
103 6
    def _on_notification(self, msg):
104
        # notification/event
105
        #   - msg[1]: event name
106
        #   - msg[2]: arguments
107 6
        debug('received notification: %s, %s', msg[1], msg[2])
108 6
        self._notification_cb(msg[1], msg[2])
109
110 6
    def _on_invalid_message(self, msg):
111
        error = 'Received invalid message %s' % msg
112
        warn(error)
113
        self._msgpack_stream.send([1, 0, error, None])
114
115
116 6
class Response(object):
117
118
    """Response to a msgpack-rpc request that came from Nvim.
119
120
    When Nvim sends a msgpack-rpc request, an instance of this class is
121
    created for remembering state required to send a response.
122
    """
123
124 6
    def __init__(self, msgpack_stream, request_id):
125
        """Initialize the Response instance."""
126 6
        self._msgpack_stream = msgpack_stream
127 6
        self._request_id = request_id
128
129 6
    def send(self, value, error=False):
130
        """Send the response.
131
132
        If `error` is True, it will be sent as an error.
133
        """
134 6
        if error:
135
            resp = [1, self._request_id, value, None]
136
        else:
137 6
            resp = [1, self._request_id, None, value]
138 6
        debug('sending response to request %d: %s', self._request_id, resp)
139
        self._msgpack_stream.send(resp)
140