Test Failed
Push — master ( a5ee34...588b1f )
by Justin M.
34s queued 10s
created

pynvim.plugin.script_host   A

Complexity

Total Complexity 38

Size/Duplication

Total Lines 271
Duplicated Lines 0 %

Test Coverage

Coverage 24.28%

Importance

Changes 0
Metric Value
eloc 187
dl 0
loc 271
rs 9.36
c 0
b 0
f 0
ccs 42
cts 173
cp 0.2428
wmc 38

13 Methods

Rating   Name   Duplication   Size   Complexity  
A LegacyVim.eval() 0 3 1
A RedirectStream.writelines() 0 2 1
A ScriptHost.teardown() 0 9 1
A ScriptHost.setup() 0 18 3
A RedirectStream.write() 0 2 1
B ScriptHost.python_do_range() 0 53 8
A ScriptHost.python_eval() 0 4 1
A ScriptHost._set_current_range() 0 3 1
A ScriptHost.python_execute() 0 8 2
A ScriptHost.__init__() 0 22 1
A RedirectStream.__init__() 0 2 1
A ScriptHost.python_execute_file() 0 10 3
A ScriptHost.python_chdir() 0 4 1

3 Functions

Rating   Name   Duplication   Size   Complexity  
A num_to_str() 0 5 2
A discover_runtime_directories() 0 10 5
B path_hook() 0 51 6
1
"""Legacy python/python3-vim emulation."""
2 6
import imp
3 6
import io
4 6
import logging
5 6
import os
6 6
import sys
7
8 6
from .decorators import plugin, rpc_export
9 6
from ..api import Nvim, walk
10 6
from ..compat import IS_PYTHON3
11 6
from ..msgpack_rpc import ErrorResponse
12 6
from ..util import format_exc_skip
13
14 6
__all__ = ('ScriptHost',)
15
16
17 6
logger = logging.getLogger(__name__)
18 6
debug, info, warn = (logger.debug, logger.info, logger.warn,)
19
20 6
if IS_PYTHON3:
21 4
    basestring = str
22
23 4
    if sys.version_info >= (3, 4):
24 4
        from importlib.machinery import PathFinder
25
26 4
    PYTHON_SUBDIR = 'python3'
27
else:
28 2
    PYTHON_SUBDIR = 'python2'
29
30
31 6
@plugin
32 6
class ScriptHost(object):
33
34
    """Provides an environment for running python plugins created for Vim."""
35
36 6
    def __init__(self, nvim):
37
        """Initialize the legacy python-vim environment."""
38
        self.setup(nvim)
39
        # context where all code will run
40
        self.module = imp.new_module('__main__')
41
        nvim.script_context = self.module
42
        # it seems some plugins assume 'sys' is already imported, so do it now
43
        exec('import sys', self.module.__dict__)
44
        self.legacy_vim = LegacyVim.from_nvim(nvim)
45
        sys.modules['vim'] = self.legacy_vim
46
47
        # Handle DirChanged. #296
48
        nvim.command(
49
            'au DirChanged * call rpcnotify({}, "python_chdir", v:event.cwd)'
50
            .format(nvim.channel_id), async_=True)
51
        # XXX: Avoid race condition.
52
        # https://github.com/neovim/pynvim/pull/296#issuecomment-358970531
53
        # TODO(bfredl): when host initialization has been refactored,
54
        # to make __init__ safe again, the following should work:
55
        # os.chdir(nvim.eval('getcwd()', async_=False))
56
        nvim.command('call rpcnotify({}, "python_chdir", getcwd())'
57
                     .format(nvim.channel_id), async_=True)
58
59 6
    def setup(self, nvim):
60
        """Setup import hooks and global streams.
61
62
        This will add import hooks for importing modules from runtime
63
        directories and patch the sys module so 'print' calls will be
64
        forwarded to Nvim.
65
        """
66
        self.nvim = nvim
67
        info('install import hook/path')
68
        self.hook = path_hook(nvim)
69
        sys.path_hooks.append(self.hook)
70
        nvim.VIM_SPECIAL_PATH = '_vim_path_'
71
        sys.path.append(nvim.VIM_SPECIAL_PATH)
72
        info('redirect sys.stdout and sys.stderr')
73
        self.saved_stdout = sys.stdout
74
        self.saved_stderr = sys.stderr
75
        sys.stdout = RedirectStream(lambda data: nvim.out_write(data))
76
        sys.stderr = RedirectStream(lambda data: nvim.err_write(data))
77
78 6
    def teardown(self):
79
        """Restore state modified from the `setup` call."""
80
        nvim = self.nvim
81
        info('uninstall import hook/path')
82
        sys.path.remove(nvim.VIM_SPECIAL_PATH)
83
        sys.path_hooks.remove(self.hook)
84
        info('restore sys.stdout and sys.stderr')
85
        sys.stdout = self.saved_stdout
86
        sys.stderr = self.saved_stderr
87
88 6
    @rpc_export('python_execute', sync=True)
89
    def python_execute(self, script, range_start, range_stop):
90
        """Handle the `python` ex command."""
91
        self._set_current_range(range_start, range_stop)
92
        try:
93
            exec(script, self.module.__dict__)
94
        except Exception:
95
            raise ErrorResponse(format_exc_skip(1))
96
97 6
    @rpc_export('python_execute_file', sync=True)
98
    def python_execute_file(self, file_path, range_start, range_stop):
99
        """Handle the `pyfile` ex command."""
100
        self._set_current_range(range_start, range_stop)
101
        with open(file_path) as f:
102
            script = compile(f.read(), file_path, 'exec')
103
            try:
104
                exec(script, self.module.__dict__)
105
            except Exception:
106
                raise ErrorResponse(format_exc_skip(1))
107
108 6
    @rpc_export('python_do_range', sync=True)
109
    def python_do_range(self, start, stop, code):
110
        """Handle the `pydo` ex command."""
111
        self._set_current_range(start, stop)
112
        nvim = self.nvim
113
        start -= 1
114
        fname = '_vim_pydo'
115
116
        # define the function
117
        function_def = 'def %s(line, linenr):\n %s' % (fname, code,)
118
        exec(function_def, self.module.__dict__)
119
        # get the function
120
        function = self.module.__dict__[fname]
121
        while start < stop:
122
            # Process batches of 5000 to avoid the overhead of making multiple
123
            # API calls for every line. Assuming an average line length of 100
124
            # bytes, approximately 488 kilobytes will be transferred per batch,
125
            # which can be done very quickly in a single API call.
126
            sstart = start
127
            sstop = min(start + 5000, stop)
128
            lines = nvim.current.buffer.api.get_lines(sstart, sstop, True)
129
130
            exception = None
131
            newlines = []
132
            linenr = sstart + 1
133
            for i, line in enumerate(lines):
134
                result = function(line, linenr)
135
                if result is None:
136
                    # Update earlier lines, and skip to the next
137
                    if newlines:
138
                        end = sstart + len(newlines) - 1
139
                        nvim.current.buffer.api.set_lines(sstart, end,
140
                                                          True, newlines)
141
                    sstart += len(newlines) + 1
142
                    newlines = []
143
                    pass
144
                elif isinstance(result, basestring):
0 ignored issues
show
introduced by
The variable basestring does not seem to be defined for all execution paths.
Loading history...
145
                    newlines.append(result)
146
                else:
147
                    exception = TypeError('pydo should return a string '
148
                                          + 'or None, found %s instead'
149
                                          % result.__class__.__name__)
150
                    break
151
                linenr += 1
152
153
            start = sstop
154
            if newlines:
155
                end = sstart + len(newlines)
156
                nvim.current.buffer.api.set_lines(sstart, end, True, newlines)
157
            if exception:
158
                raise exception
159
        # delete the function
160
        del self.module.__dict__[fname]
161
162 6
    @rpc_export('python_eval', sync=True)
163
    def python_eval(self, expr):
164
        """Handle the `pyeval` vim function."""
165
        return eval(expr, self.module.__dict__)
166
167 6
    @rpc_export('python_chdir', sync=False)
168
    def python_chdir(self, cwd):
169
        """Handle working directory changes."""
170
        os.chdir(cwd)
171
172 6
    def _set_current_range(self, start, stop):
173
        current = self.legacy_vim.current
174
        current.range = current.buffer.range(start, stop)
175
176
177 6
class RedirectStream(io.IOBase):
178 6
    def __init__(self, redirect_handler):
179
        self.redirect_handler = redirect_handler
180
181 6
    def write(self, data):
182
        self.redirect_handler(data)
183
184 6
    def writelines(self, seq):
185
        self.redirect_handler('\n'.join(seq))
186
187
188 6
if IS_PYTHON3:
189 4
    num_types = (int, float)
190
else:
191 2
    num_types = (int, long, float)  # noqa: F821
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable long does not seem to be defined.
Loading history...
192
193
194 6
def num_to_str(obj):
195
    if isinstance(obj, num_types):
196
        return str(obj)
197
    else:
198
        return obj
199
200
201 6
class LegacyVim(Nvim):
202 6
    def eval(self, expr):
203
        obj = self.request("vim_eval", expr)
204
        return walk(num_to_str, obj)
205
206
207
# Copied/adapted from :help if_pyth.
208 6
def path_hook(nvim):
209
    def _get_paths():
210
        if nvim._thread_invalid():
211
            return []
212
        return discover_runtime_directories(nvim)
213
214
    def _find_module(fullname, oldtail, path):
215
        idx = oldtail.find('.')
216
        if idx > 0:
217
            name = oldtail[:idx]
218
            tail = oldtail[idx + 1:]
219
            fmr = imp.find_module(name, path)
220
            module = imp.find_module(fullname[:-len(oldtail)] + name, *fmr)
221
            return _find_module(fullname, tail, module.__path__)
222
        else:
223
            return imp.find_module(fullname, path)
224
225
    class VimModuleLoader(object):
226
        def __init__(self, module):
227
            self.module = module
228
229
        def load_module(self, fullname, path=None):
230
            # Check sys.modules, required for reload (see PEP302).
231
            try:
232
                return sys.modules[fullname]
233
            except KeyError:
234
                pass
235
            return imp.load_module(fullname, *self.module)
236
237
    class VimPathFinder(object):
238
        @staticmethod
239
        def find_module(fullname, path=None):
240
            """Method for Python 2.7 and 3.3."""
241
            try:
242
                return VimModuleLoader(
243
                    _find_module(fullname, fullname, path or _get_paths()))
244
            except ImportError:
245
                return None
246
247
        @staticmethod
248
        def find_spec(fullname, target=None):
249
            """Method for Python 3.4+."""
250
            return PathFinder.find_spec(fullname, _get_paths(), target)
0 ignored issues
show
introduced by
The variable PathFinder does not seem to be defined for all execution paths.
Loading history...
251
252
    def hook(path):
253
        if path == nvim.VIM_SPECIAL_PATH:
254
            return VimPathFinder
255
        else:
256
            raise ImportError
257
258
    return hook
259
260
261 6
def discover_runtime_directories(nvim):
262
    rv = []
263
    for rtp in nvim.list_runtime_paths():
264
        if not os.path.exists(rtp):
265
            continue
266
        for subdir in ['pythonx', PYTHON_SUBDIR]:
267
            path = os.path.join(rtp, subdir)
268
            if os.path.exists(path):
269
                rv.append(path)
270
    return rv
271