Completed
Pull Request — master (#72)
by Björn
18:06 queued 16:50
created

AsyncSession.poll_fd()   A

Complexity

Conditions 1

Size

Total Lines 3

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 1
CRAP Score 1.125

Importance

Changes 0
Metric Value
cc 1
dl 0
loc 3
ccs 1
cts 2
cp 0.5
crap 1.125
rs 10
c 0
b 0
f 0
1
"""Asynchronous msgpack-rpc handling in the event loop pipeline."""
2 6
import logging
3 6
from traceback import format_exc
4
5
6 6
logger = logging.getLogger(__name__)
7 6
debug, info, warn = (logger.debug, logger.info, logger.warning,)
8
9
10 6
class AsyncSession(object):
11
12
    """Asynchronous msgpack-rpc layer that wraps a msgpack stream.
13
14
    This wraps the msgpack stream interface for reading/writing msgpack
15
    documents and exposes an interface for sending and receiving msgpack-rpc
16
    requests and notifications.
17
    """
18
19 6
    def __init__(self, msgpack_stream):
20
        """Wrap `msgpack_stream` on a msgpack-rpc interface."""
21 6
        self._msgpack_stream = msgpack_stream
22 6
        self._next_request_id = 1
23 6
        self._pending_requests = {}
24 6
        self._request_cb = self._notification_cb = None
25 6
        self._handlers = {
26
            0: self._on_request,
27
            1: self._on_response,
28
            2: self._on_notification
29
        }
30
31 6
    def threadsafe_call(self, fn):
32
        """Wrapper around `MsgpackStream.threadsafe_call`."""
33 6
        self._msgpack_stream.threadsafe_call(fn)
34
35 6
    def poll_fd(self, fd, on_readable, on_writable):
36
        """Wrapper around `BaseEventLoop.poll_fd`."""
37
        return self._msgpack_stream.poll_fd(fd, on_readable, on_writable)
38
39 6
    def request(self, method, args, response_cb):
40
        """Send a msgpack-rpc request to Nvim.
41
42
        A msgpack-rpc with method `method` and argument `args` is sent to
43
        Nvim. The `response_cb` function is called with when the response
44
        is available.
45
        """
46 6
        request_id = self._next_request_id
47 6
        self._next_request_id = request_id + 1
48 6
        self._msgpack_stream.send([0, request_id, method, args])
49 6
        self._pending_requests[request_id] = response_cb
50
51 6
    def notify(self, method, args):
52
        """Send a msgpack-rpc notification to Nvim.
53
54
        A msgpack-rpc with method `method` and argument `args` is sent to
55
        Nvim. This will have the same effect as a request, but no response
56
        will be recieved
57
        """
58 6
        self._msgpack_stream.send([2, method, args])
59
60 6
    def run(self, request_cb, notification_cb):
61
        """Run the event loop to receive requests and notifications from Nvim.
62
63
        While the event loop is running, `request_cb` and `_notification_cb`
64
        will be called whenever requests or notifications are respectively
65
        available.
66
        """
67 6
        self._request_cb = request_cb
68 6
        self._notification_cb = notification_cb
69 6
        self._msgpack_stream.run(self._on_message)
70 6
        self._request_cb = None
71 6
        self._notification_cb = None
72
73 6
    def stop(self):
74
        """Stop the event loop."""
75 6
        self._msgpack_stream.stop()
76
77 6
    def _on_message(self, msg):
78 6
        try:
79 6
            self._handlers.get(msg[0], self._on_invalid_message)(msg)
80
        except Exception:
0 ignored issues
show
Best Practice introduced by
Catching very general exceptions such as Exception is usually not recommended.

Generally, you would want to handle very specific errors in the exception handler. This ensure that you do not hide other types of errors which should be fixed.

So, unless you specifically plan to handle any error, consider adding a more specific exception.

Loading history...
81
            err_str = format_exc(5)
82
            warn(err_str)
83
            self._msgpack_stream.send([1, 0, err_str, None])
84
85 6
    def _on_request(self, msg):
86
        # request
87
        #   - msg[1]: id
88
        #   - msg[2]: method name
89
        #   - msg[3]: arguments
90 6
        debug('received request: %s, %s', msg[2], msg[3])
91 6
        self._request_cb(msg[2], msg[3], Response(self._msgpack_stream,
92
                                                  msg[1]))
93
94 6
    def _on_response(self, msg):
95
        # response to a previous request:
96
        #   - msg[1]: the id
97
        #   - msg[2]: error(if any)
98
        #   - msg[3]: result(if not errored)
99 6
        debug('received response: %s, %s', msg[2], msg[3])
100 6
        self._pending_requests.pop(msg[1])(msg[2], msg[3])
101
102 6
    def _on_notification(self, msg):
103
        # notification/event
104
        #   - msg[1]: event name
105
        #   - msg[2]: arguments
106 6
        debug('received notification: %s, %s', msg[1], msg[2])
107 6
        self._notification_cb(msg[1], msg[2])
108
109 6
    def _on_invalid_message(self, msg):
110
        error = 'Received invalid message %s' % msg
111
        warn(error)
112 6
        self._msgpack_stream.send([1, 0, error, None])
113
114
115 6
class Response(object):
116
117
    """Response to a msgpack-rpc request that came from Nvim.
118
119
    When Nvim sends a msgpack-rpc request, an instance of this class is
120
    created for remembering state required to send a response.
121
    """
122
123 6
    def __init__(self, msgpack_stream, request_id):
124
        """Initialize the Response instance."""
125 6
        self._msgpack_stream = msgpack_stream
126 6
        self._request_id = request_id
127
128 6
    def send(self, value, error=False):
129
        """Send the response.
130
131
        If `error` is True, it will be sent as an error.
132
        """
133 6
        if error:
134
            resp = [1, self._request_id, value, None]
135
        else:
136 6
            resp = [1, self._request_id, None, value]
137 6
        debug('sending response to request %d: %s', self._request_id, resp)
138
        self._msgpack_stream.send(resp)
139