Completed
Pull Request — master (#72)
by Björn
18:06 queued 16:50
created

MsgpackStream.poll_fd()   A

Complexity

Conditions 1

Size

Total Lines 3

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 1
CRAP Score 1.125

Importance

Changes 0
Metric Value
cc 1
dl 0
loc 3
ccs 1
cts 2
cp 0.5
crap 1.125
rs 10
c 0
b 0
f 0
1
"""Msgpack handling in the event loop pipeline."""
2 6
import logging
3
4 6
from msgpack import Packer, Unpacker
0 ignored issues
show
Configuration introduced by
The import msgpack could not be resolved.

This can be caused by one of the following:

1. Missing Dependencies

This error could indicate a configuration issue of Pylint. Make sure that your libraries are available by adding the necessary commands.

# .scrutinizer.yml
before_commands:
    - sudo pip install abc # Python2
    - sudo pip3 install abc # Python3
Tip: We are currently not using virtualenv to run pylint, when installing your modules make sure to use the command for the correct version.

2. Missing __init__.py files

This error could also result from missing __init__.py files in your module folders. Make sure that you place one file in each sub-folder.

Loading history...
5
6 6
from ..compat import unicode_errors_default
7
8 6
logger = logging.getLogger(__name__)
9 6
debug, info, warn = (logger.debug, logger.info, logger.warning,)
10
11
12 6
class MsgpackStream(object):
13
14
    """Two-way msgpack stream that wraps a event loop byte stream.
15
16
    This wraps the event loop interface for reading/writing bytes and
17
    exposes an interface for reading/writing msgpack documents.
18
    """
19
20 6
    def __init__(self, event_loop):
21
        """Wrap `event_loop` on a msgpack-aware interface."""
22 6
        self._event_loop = event_loop
23 6
        self._packer = Packer(unicode_errors=unicode_errors_default)
24 6
        self._unpacker = Unpacker()
25 6
        self._message_cb = None
26
27 6
    def threadsafe_call(self, fn):
28
        """Wrapper around `BaseEventLoop.threadsafe_call`."""
29 6
        self._event_loop.threadsafe_call(fn)
30
31 6
    def poll_fd(self, fd, on_readable, on_writable):
32
        """Wrapper around `BaseEventLoop.poll_fd`."""
33
        return self._event_loop.poll_fd(fd, on_readable, on_writable)
34
35 6
    def send(self, msg):
36
        """Queue `msg` for sending to Nvim."""
37 6
        debug('sent %s', msg)
38 6
        self._event_loop.send(self._packer.pack(msg))
39
40 6
    def run(self, message_cb):
41
        """Run the event loop to receive messages from Nvim.
42
43
        While the event loop is running, `message_cb` will be called whenever
44
        a message has been successfully parsed from the input stream.
45
        """
46 6
        self._message_cb = message_cb
47 6
        self._event_loop.run(self._on_data)
48 6
        self._message_cb = None
49
50 6
    def stop(self):
51
        """Stop the event loop."""
52 6
        self._event_loop.stop()
53
54 6
    def _on_data(self, data):
55 6
        self._unpacker.feed(data)
56 6
        while True:
57 6
            try:
58 6
                debug('waiting for message...')
59 6
                msg = next(self._unpacker)
60 6
                debug('received message: %s', msg)
61 6
                self._message_cb(msg)
62 6
            except StopIteration:
63 6
                debug('unpacker needs more data...')
64
                break
65