Completed
Pull Request — master (#221)
by
unknown
23:54
created

Buffers   A

Complexity

Total Complexity 7

Size/Duplication

Total Lines 33
Duplicated Lines 0 %

Test Coverage

Coverage 92.31%

Importance

Changes 0
Metric Value
c 0
b 0
f 0
dl 0
loc 33
ccs 12
cts 13
cp 0.9231
rs 10
wmc 7
1
"""Main Nvim interface."""
2 5
import functools
3 5
import os
4 5
import sys
5
from traceback import format_stack
6 5
7
from msgpack import ExtType
8 5
9
from .buffer import Buffer
10 5
from .common import (Remote, RemoteApi, RemoteMap, RemoteSequence,
11 5
                     decode_if_bytes, walk)
12
from .tabpage import Tabpage
13 5
from .window import Window
14 5
from ..compat import IS_PYTHON3
15 5
from ..util import Version, format_exc_skip
16 5
17
__all__ = ('Nvim')
18 5
19
20
os_chdir = os.chdir
21 5
22
23
class Nvim(object):
24 5
25
    """Class that represents a remote Nvim instance.
26
27
    This class is main entry point to Nvim remote API, it is a wrapper
28
    around Session instances.
29
30
    The constructor of this class must not be called directly. Instead, the
31
    `from_session` class method should be used to create the first instance
32
    from a raw `Session` instance.
33
34
    Subsequent instances for the same session can be created by calling the
35
    `with_decode` instance method to change the decoding behavior or
36
    `SubClass.from_nvim(nvim)` where `SubClass` is a subclass of `Nvim`, which
37
    is useful for having multiple `Nvim` objects that behave differently
38
    without one affecting the other.
39
    """
40
41
    @classmethod
42 5
    def from_session(cls, session):
43
        """Create a new Nvim instance for a Session instance.
44
45
        This method must be called to create the first Nvim instance, since it
46
        queries Nvim metadata for type information and sets a SessionHook for
47
        creating specialized objects from Nvim remote handles.
48
        """
49
        session.error_wrapper = lambda e: NvimError(e[1])
50 5
        channel_id, metadata = session.request(b'vim_get_api_info')
51 5
52
        if IS_PYTHON3:
53 5
            # decode all metadata strings for python3
54
            metadata = walk(decode_if_bytes, metadata)
55 5
56
        types = {
57 5
            metadata['types']['Buffer']['id']: Buffer,
58
            metadata['types']['Window']['id']: Window,
59
            metadata['types']['Tabpage']['id']: Tabpage,
60
        }
61
62
        return cls(session, channel_id, metadata, types)
63 5
64
    @classmethod
65 5
    def from_nvim(cls, nvim):
66
        """Create a new Nvim instance from an existing instance."""
67
        return cls(nvim._session, nvim.channel_id, nvim.metadata,
68
                   nvim.types, nvim._decode, nvim._err_cb)
69
70
    def __init__(self, session, channel_id, metadata, types,
71 5
                 decode=False, err_cb=None):
72
        """Initialize a new Nvim instance. This method is module-private."""
73
        self._session = session
74 5
        self.channel_id = channel_id
75 5
        self.metadata = metadata
76 5
        version = metadata.get("version", {"api_level": 0})
77 5
        self.version = Version(**version)
78 5
        self.types = types
79 5
        self.api = RemoteApi(self, 'nvim_')
80 5
        self.vars = RemoteMap(self, 'nvim_get_var', 'nvim_set_var')
81 5
        self.vvars = RemoteMap(self, 'nvim_get_vvar', None)
82 5
        self.options = RemoteMap(self, 'nvim_get_option', 'nvim_set_option')
83 5
        self.buffers = Buffers(self)
84 5
        self.windows = RemoteSequence(self, 'nvim_list_wins')
85 5
        self.tabpages = RemoteSequence(self, 'nvim_list_tabpages')
86 5
        self.current = Current(self)
87 5
        self.session = CompatibilitySession(self)
88 5
        self.funcs = Funcs(self)
89 5
        self.error = NvimError
90 5
        self._decode = decode
91 5
        self._err_cb = err_cb
92 5
93
    def _from_nvim(self, obj, decode=None):
94 5
        if decode is None:
95 5
            decode = self._decode
96 5
        if type(obj) is ExtType:
97 5
            cls = self.types[obj.code]
98 5
            return cls(self, (obj.code, obj.data))
99 5
        if decode:
100 5
            obj = decode_if_bytes(obj, decode)
101 3
        return obj
102 5
103
    def _to_nvim(self, obj):
104 5
        if isinstance(obj, Remote):
105 5
            return ExtType(*obj.code_data)
106 5
        return obj
107 5
108
    def request(self, name, *args, **kwargs):
109 5
        r"""Send an API request or notification to nvim.
110
111
        It is rarely needed to call this function directly, as most API
112
        functions have python wrapper functions. The `api` object can
113
        be also be used to call API functions as methods:
114
115
            vim.api.err_write('ERROR\n', async=True)
116
            vim.current.buffer.api.get_mark('.')
117
118
        is equivalent to
119
120
            vim.request('nvim_err_write', 'ERROR\n', async=True)
121
            vim.request('nvim_buf_get_mark', vim.current.buffer, '.')
122
123
124
        Normally a blocking request will be sent.  If the `async` flag is
125
        present and True, a asynchronous notification is sent instead. This
126
        will never block, and the return value or error is ignored.
127
        """
128
        decode = kwargs.pop('decode', self._decode)
129 5
        args = walk(self._to_nvim, args)
130 5
        res = self._session.request(name, *args, **kwargs)
131 5
        return walk(self._from_nvim, res, decode=decode)
132 5
133
    def next_message(self):
134 5
        """Block until a message(request or notification) is available.
135
136
        If any messages were previously enqueued, return the first in queue.
137
        If not, run the event loop until one is received.
138
        """
139
        msg = self._session.next_message()
140 5
        if msg:
141 5
            return walk(self._from_nvim, msg)
142 5
143
    def run_loop(self, request_cb, notification_cb,
144 5
                 setup_cb=None, err_cb=None):
145
        """Run the event loop to receive requests and notifications from Nvim.
146
147
        This should not be called from a plugin running in the host, which
148
        already runs the loop and dispatches events to plugins.
149
        """
150
        if err_cb is None:
151 5
            err_cb = sys.stderr.write
152 5
        self._err_cb = err_cb
153 5
154
        def filter_request_cb(name, args):
155 5
            name = self._from_nvim(name)
156 5
            args = walk(self._from_nvim, args)
157 5
            try:
158 5
                result = request_cb(name, args)
159 5
            except Exception as exc:
160
                msg = ("error caught in request handler '{} {}': {}\n{}\n\n"
161
                       .format(name, args, exc, format_exc_skip(1))
162
                self._err_cb(msg)
163
                raise
164
            return walk(self._to_nvim, result)
165 5
166
        def filter_notification_cb(name, args):
167 5
            name = self._from_nvim(name)
168
            args = walk(self._from_nvim, args)
169
            try:
170
                notification_cb(name, args)
171
            except Exception as exc:
172
                msg = ("error caught in notification handler '{} {}': {}\n{}\n\n"  # noqa
173
                       .format(name, args, exc, format_exc_skip(1)))
174
                self._err_cb(msg)
175
                raise
176
177
        self._session.run(filter_request_cb, filter_notification_cb, setup_cb)
178 5
179
    def stop_loop(self):
180 5
        """Stop the event loop being started with `run_loop`."""
181
        self._session.stop()
182 5
183
    def with_decode(self, decode=True):
184 5
        """Initialize a new Nvim instance."""
185
        return Nvim(self._session, self.channel_id,
186 5
                    self.metadata, self.types, decode, self._err_cb)
187
188
    def ui_attach(self, width, height, rgb):
189 5
        """Register as a remote UI.
190
191
        After this method is called, the client will receive redraw
192
        notifications.
193
        """
194
        return self.request('ui_attach', width, height, rgb)
195
196
    def ui_detach(self):
197 5
        """Unregister as a remote UI."""
198
        return self.request('ui_detach')
199
200
    def ui_try_resize(self, width, height):
201 5
        """Notify nvim that the client window has resized.
202
203
        If possible, nvim will send a redraw request to resize.
204
        """
205
        return self.request('ui_try_resize', width, height)
206
207
    def subscribe(self, event):
208 5
        """Subscribe to a Nvim event."""
209
        return self.request('nvim_subscribe', event)
210 5
211
    def unsubscribe(self, event):
212 5
        """Unsubscribe to a Nvim event."""
213
        return self.request('nvim_unsubscribe', event)
214 5
215
    def command(self, string, **kwargs):
216 5
        """Execute a single ex command."""
217
        return self.request('nvim_command', string, **kwargs)
218 5
219
    def command_output(self, string):
220 5
        """Execute a single ex command and return the output."""
221
        return self.request('nvim_command_output', string)
222
223
    def eval(self, string, **kwargs):
224 5
        """Evaluate a vimscript expression."""
225
        return self.request('nvim_eval', string, **kwargs)
226 5
227
    def call(self, name, *args, **kwargs):
228 5
        """Call a vimscript function."""
229
        return self.request('nvim_call_function', name, args, **kwargs)
230 5
231
    def strwidth(self, string):
232 5
        """Return the number of display cells `string` occupies.
233
234
        Tab is counted as one cell.
235
        """
236
        return self.request('nvim_strwidth', string)
237 5
238
    def list_runtime_paths(self):
239 5
        """Return a list of paths contained in the 'runtimepath' option."""
240
        return self.request('nvim_list_runtime_paths')
241
242
    def foreach_rtp(self, cb):
243 5
        """Invoke `cb` for each path in 'runtimepath'.
244
245
        Call the given callable for each path in 'runtimepath' until either
246
        callable returns something but None, the exception is raised or there
247
        are no longer paths. If stopped in case callable returned non-None,
248
        vim.foreach_rtp function returns the value returned by callable.
249
        """
250
        for path in self.request('nvim_list_runtime_paths'):
251
            try:
252
                if cb(path) is not None:
253
                    break
254
            except Exception:
255
                break
256
257
    def chdir(self, dir_path):
258 5
        """Run os.chdir, then all appropriate vim stuff."""
259
        os_chdir(dir_path)
260 5
        return self.request('nvim_set_current_dir', dir_path)
261 5
262
    def feedkeys(self, keys, options='', escape_csi=True):
263 5
        """Push `keys` to Nvim user input buffer.
264
265
        Options can be a string with the following character flags:
266
        - 'm': Remap keys. This is default.
267
        - 'n': Do not remap keys.
268
        - 't': Handle keys as if typed; otherwise they are handled as if coming
269
               from a mapping. This matters for undo, opening folds, etc.
270
        """
271
        return self.request('nvim_feedkeys', keys, options, escape_csi)
272
273
    def input(self, bytes):
274 5
        """Push `bytes` to Nvim low level input buffer.
275
276
        Unlike `feedkeys()`, this uses the lowest level input buffer and the
277
        call is not deferred. It returns the number of bytes actually
278
        written(which can be less than what was requested if the buffer is
279
        full).
280
        """
281
        return self.request('nvim_input', bytes)
282 5
283
    def replace_termcodes(self, string, from_part=False, do_lt=True,
284 5
                          special=True):
285
        r"""Replace any terminal code strings by byte sequences.
286
287
        The returned sequences are Nvim's internal representation of keys,
288
        for example:
289
290
        <esc> -> '\x1b'
291
        <cr>  -> '\r'
292
        <c-l> -> '\x0c'
293
        <up>  -> '\x80ku'
294
295
        The returned sequences can be used as input to `feedkeys`.
296
        """
297
        return self.request('nvim_replace_termcodes', string,
298
                            from_part, do_lt, special)
299
300
    def out_write(self, msg, **kwargs):
301 5
        """Print `msg` as a normal message."""
302
        return self.request('nvim_out_write', msg, **kwargs)
303
304
    def err_write(self, msg, **kwargs):
305 5
        """Print `msg` as an error message."""
306
        return self.request('nvim_err_write', msg, **kwargs)
307 2
308
    def quit(self, quit_command='qa!'):
309 5
        """Send a quit command to Nvim.
310
311
        By default, the quit command is 'qa!' which will make Nvim quit without
312
        saving anything.
313
        """
314
        try:
315
            self.command(quit_command)
316
        except IOError:
317
            # sending a quit command will raise an IOError because the
318
            # connection is closed before a response is received. Safe to
319
            # ignore it.
320
            pass
321
322
    def new_highlight_source(self):
323 5
        """Return new src_id for use with Buffer.add_highlight."""
324
        return self.current.buffer.add_highlight("", 0, src_id=0)
325
326
    def async_call(self, fn, *args, **kwargs):
327 5
        """Schedule `fn` to be called by the event loop soon.
328
329
        This function is thread-safe, and is the only way code not
330
        on the main thread could interact with nvim api objects.
331
332
        This function can also be called in a synchronous
333
        event handler, just before it returns, to defer execution
334
        that shouldn't block neovim.
335
        """
336
        call_point = ''.join(format_stack(None, 5)[:-1])
337 5
338
        def handler():
339 5
            try:
340 5
                fn(*args, **kwargs)
341 5
            except Exception as err:
342
                msg = ("error caught while executing async callback:\n"
343 1
                       "{!r}\n{}\n \nthe call was requested at\n{}"
344
                       .format(err, format_exc_skip(1), call_point))
345
                self._err_cb(msg)
346 1
                raise
347
        self._session.threadsafe_call(handler)
348 5
349
350
class Buffers(object):
351 5
352
    """Remote NVim buffers.
353
354
    Currently the interface for interacting with remote NVim buffers is the
355
    `nvim_list_bufs` msgpack-rpc function. Most methods fetch the list of
356
    buffers from NVim.
357
358
    Conforms to *python-buffers*.
359
    """
360
361
    def __init__(self, nvim):
362 5
        """Initialize a Buffers object with Nvim object `nvim`."""
363
        self._fetch_buffers = nvim.api.list_bufs
364 5
365
    def __len__(self):
366 5
        """Return the count of buffers."""
367
        return len(self._fetch_buffers())
368 5
369
    def __getitem__(self, number):
370 5
        """Return the Buffer object matching buffer number `number`."""
371
        for b in self._fetch_buffers():
372 5
            if b.number == number:
373 5
                return b
374 5
        raise KeyError(number)
375
376
    def __contains__(self, b):
377 5
        """Return whether Buffer `b` is a known valid buffer."""
378
        return isinstance(b, Buffer) and b.valid
379 5
380
    def __iter__(self):
381 5
        """Return an iterator over the list of buffers."""
382
        return iter(self._fetch_buffers())
383 5
384
385
class CompatibilitySession(object):
386 5
387
    """Helper class for API compatibility."""
388
389
    def __init__(self, nvim):
390 5
        self.threadsafe_call = nvim.async_call
391 5
392
393
class Current(object):
394 5
395
    """Helper class for emulating vim.current from python-vim."""
396
397
    def __init__(self, session):
398 5
        self._session = session
399 5
        self.range = None
400 5
401
    @property
402 5
    def line(self):
403
        return self._session.request('nvim_get_current_line')
404 5
405
    @line.setter
406 5
    def line(self, line):
407
        return self._session.request('nvim_set_current_line', line)
408 5
409
    @property
410 5
    def buffer(self):
411
        return self._session.request('nvim_get_current_buf')
412 5
413
    @buffer.setter
414 5
    def buffer(self, buffer):
415
        return self._session.request('nvim_set_current_buf', buffer)
416 5
417
    @property
418 5
    def window(self):
419
        return self._session.request('nvim_get_current_win')
420 5
421
    @window.setter
422 5
    def window(self, window):
423
        return self._session.request('nvim_set_current_win', window)
424 5
425
    @property
426 5
    def tabpage(self):
427
        return self._session.request('nvim_get_current_tabpage')
428 5
429
    @tabpage.setter
430 5
    def tabpage(self, tabpage):
431
        return self._session.request('nvim_set_current_tabpage', tabpage)
432 5
433
434
class Funcs(object):
435 5
436
    """Helper class for functional vimscript interface."""
437
438
    def __init__(self, nvim):
439 5
        self._nvim = nvim
440 5
441
    def __getattr__(self, name):
442 5
        return functools.partial(self._nvim.call, name)
443 5
444
445
class NvimError(Exception):
446
    pass
447