Test Failed
Push — master ( a5ee34...588b1f )
by Justin M.
34s queued 10s
created

pynvim.msgpack_rpc.msgpack_stream   A

Complexity

Total Complexity 9

Size/Duplication

Total Lines 65
Duplicated Lines 0 %

Test Coverage

Coverage 97.14%

Importance

Changes 0
Metric Value
eloc 36
dl 0
loc 65
rs 10
c 0
b 0
f 0
ccs 34
cts 35
cp 0.9714
wmc 9

7 Methods

Rating   Name   Duplication   Size   Complexity  
A MsgpackStream.run() 0 9 1
A MsgpackStream.close() 0 3 1
A MsgpackStream.stop() 0 3 1
A MsgpackStream.threadsafe_call() 0 3 1
A MsgpackStream.send() 0 4 1
A MsgpackStream._on_data() 0 11 3
A MsgpackStream.__init__() 0 6 1
1
"""Msgpack handling in the event loop pipeline."""
2 6
import logging
3
4 6
from msgpack import Packer, Unpacker
5
6 6
from ..compat import unicode_errors_default
7
8 6
logger = logging.getLogger(__name__)
9 6
debug, info, warn = (logger.debug, logger.info, logger.warning,)
10
11
12 6
class MsgpackStream(object):
13
14
    """Two-way msgpack stream that wraps a event loop byte stream.
15
16
    This wraps the event loop interface for reading/writing bytes and
17
    exposes an interface for reading/writing msgpack documents.
18
    """
19
20 6
    def __init__(self, event_loop):
21
        """Wrap `event_loop` on a msgpack-aware interface."""
22 6
        self.loop = event_loop
23 6
        self._packer = Packer(unicode_errors=unicode_errors_default)
24 6
        self._unpacker = Unpacker()
25 6
        self._message_cb = None
26
27 6
    def threadsafe_call(self, fn):
28
        """Wrapper around `BaseEventLoop.threadsafe_call`."""
29 6
        self.loop.threadsafe_call(fn)
30
31 6
    def send(self, msg):
32
        """Queue `msg` for sending to Nvim."""
33 6
        debug('sent %s', msg)
34 6
        self.loop.send(self._packer.pack(msg))
35
36 6
    def run(self, message_cb):
37
        """Run the event loop to receive messages from Nvim.
38
39
        While the event loop is running, `message_cb` will be called whenever
40
        a message has been successfully parsed from the input stream.
41
        """
42 6
        self._message_cb = message_cb
43 6
        self.loop.run(self._on_data)
44 6
        self._message_cb = None
45
46 6
    def stop(self):
47
        """Stop the event loop."""
48 6
        self.loop.stop()
49
50 6
    def close(self):
51
        """Close the event loop."""
52
        self.loop.close()
53
54 6
    def _on_data(self, data):
55 6
        self._unpacker.feed(data)
56 6
        while True:
57 6
            try:
58 6
                debug('waiting for message...')
59 6
                msg = next(self._unpacker)
60 6
                debug('received message: %s', msg)
61 6
                self._message_cb(msg)
62 6
            except StopIteration:
63 6
                debug('unpacker needs more data...')
64
                break
65