Completed
Pull Request — master (#199)
by Björn
03:00
created

MsgpackStream   A

Complexity

Total Complexity 8

Size/Duplication

Total Lines 49
Duplicated Lines 0 %

Test Coverage

Coverage 100%

Importance

Changes 2
Bugs 0 Features 0
Metric Value
dl 0
loc 49
ccs 28
cts 28
cp 1
rs 10
c 2
b 0
f 0
wmc 8

6 Methods

Rating   Name   Duplication   Size   Complexity  
A stop() 0 3 1
A _on_data() 0 11 3
A send() 0 4 1
A __init__() 0 6 1
A run() 0 9 1
A threadsafe_call() 0 3 1
1
"""Msgpack handling in the event loop pipeline."""
2 6
import logging
3
4 6
from msgpack import Packer, Unpacker
0 ignored issues
show
Configuration introduced by
The import msgpack could not be resolved.

This can be caused by one of the following:

1. Missing Dependencies

This error could indicate a configuration issue of Pylint. Make sure that your libraries are available by adding the necessary commands.

# .scrutinizer.yml
before_commands:
    - sudo pip install abc # Python2
    - sudo pip3 install abc # Python3
Tip: We are currently not using virtualenv to run pylint, when installing your modules make sure to use the command for the correct version.

2. Missing __init__.py files

This error could also result from missing __init__.py files in your module folders. Make sure that you place one file in each sub-folder.

Loading history...
5
6 6
from ..compat import unicode_errors_default
7
8 6
logger = logging.getLogger(__name__)
9 6
debug, info, warn = (logger.debug, logger.info, logger.warning,)
10
11
12 6
class MsgpackStream(object):
13
14
    """Two-way msgpack stream that wraps a event loop byte stream.
15
16
    This wraps the event loop interface for reading/writing bytes and
17
    exposes an interface for reading/writing msgpack documents.
18
    """
19
20 6
    def __init__(self, event_loop):
21
        """Wrap `event_loop` on a msgpack-aware interface."""
22 6
        self._event_loop = event_loop
23 6
        self._packer = Packer(unicode_errors=unicode_errors_default)
24 6
        self._unpacker = Unpacker()
25 6
        self._message_cb = None
26
27 6
    def threadsafe_call(self, fn):
28
        """Wrapper around `BaseEventLoop.threadsafe_call`."""
29 6
        self._event_loop.threadsafe_call(fn)
30
31 6
    def send(self, msg):
32
        """Queue `msg` for sending to Nvim."""
33 6
        debug('sent %s', msg)
34 6
        self._event_loop.send(self._packer.pack(msg))
35
36 6
    def run(self, message_cb):
37
        """Run the event loop to receive messages from Nvim.
38
39
        While the event loop is running, `message_cb` will be called whenever
40
        a message has been successfully parsed from the input stream.
41
        """
42 6
        self._message_cb = message_cb
43 6
        self._event_loop.run(self._on_data)
44 6
        self._message_cb = None
45
46 6
    def stop(self):
47
        """Stop the event loop."""
48 6
        self._event_loop.stop()
49
50 6
    def _on_data(self, data):
51 6
        self._unpacker.feed(data)
52 6
        while True:
53 6
            try:
54 6
                debug('waiting for message...')
55 6
                msg = next(self._unpacker)
56 6
                debug('received message: %s', msg)
57 6
                self._message_cb(msg)
58 6
            except StopIteration:
59 6
                debug('unpacker needs more data...')
60
                break
61