Completed
Push — master ( 9bf9ff...63635d )
by timothy
01:03
created

rate_limited()   F

Complexity

Conditions 14

Size

Total Lines 52

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 14
c 1
b 0
f 0
dl 0
loc 52
rs 3.011

3 Methods

Rating   Name   Duplication   Size   Complexity  
F decorate() 0 38 13
F rate_limited_function() 0 35 12
A run_func() 0 5 1

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like rate_limited() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
'''
2
some of these should go into neovim python client
3
'''
4
from __future__ import print_function
5
6
import os
7
import time
8
from subprocess import *
9
from threading import Thread
10
import shlex
11
import string
12
import random
13
from functools import wraps
14
15
from neovim import attach
16
17
def attach_socket(path=None):
18
    '''does it auto'''
19
    def open_nvim():
20
        proc = Popen('NVIM_LISTEN_ADDRESS=/tmp/nvim nvim',
21
                    stdin=PIPE, stdout=PIPE, shell=True)
22
        proc.communicate()
23
24
    # THIS IS DOESNT WORK UNRELIBALE 
25
    #path = os.environ.get('NVIM_LISTEN_ADDRESS')
26
    if not path:
27
        print('threading')
28
        t = Thread(target=open_nvim)
29
        t.start()
30
        #todo make this give a random path
31
        return attach('socket', path='/tmp/nvim')
32
    else:
33
        print('attaching socket')
34
        return attach('socket', path=path)
35
36
37
def attach_child():
38
    return attach('child', argv=['nvim', '--embed'])
39
40
41
def attach_headless(nvim_args=None, path=None):
42
    if not path:
43
        path = '/tmp/nvim' + rand_str(8)
44
    os.environ['NVIM_LISTEN_ADDRESS'] = path
45
    dnull = open(os.devnull)
46
    # TODO WHY USE SHLEX???
47
    cmd = shlex.split('nvim --headless')
48
    if nvim_args:
49
        cmd.extend(nvim_args)
50
    proc = Popen(cmd,
51
            stdin=dnull,
52
            stdout=dnull,
53
            stderr=dnull)
54
    dnull.close()
55
    while proc.poll() or proc.returncode is None:
56
        try:
57
            nvim = attach('socket', path=path)
58
            break
59
        except IOError:
60
            # Socket not yet ready
61
            time.sleep(0.05)
62
63
    return nvim
64
65
66
def rand_str(length):
67
    '''returns a random string of length'''
68
    chars = []
69
    for i in range(length):
70
        chars.append(random.choice(string.ascii_letters))
71
    return ''.join(char for char in chars)
72
73
74
def _stringify_key(key, state):
75
    send = []
76
    if state == 'Shift':
77
        send.append('S')
78
    elif state == 'Ctrl':
79
        send.append('C')
80
    elif state =='Alt':
81
        send.append('A')
82
    send.append(key)
83
    return '<' + '-'.join(send) + '>'
84
85
86
def _split_color(n):
87
    return ((n >> 16) & 0xff, (n >> 8) & 0xff, n & 0xff,)
88
89
90
def _invert_color(r, g, b):
91
    return (255 - r, 255 - g, 255 - b,)
92
93
94
def _stringify_color(r, g, b):
95
    return '#{0:0{1}x}'.format((r << 16) + (g << 8) + b, 6)
96
97
98
def debug_echo(func):
99
    '''used on method to simply print the function name and
100
    parameters if self.debug_echo = True,
101
    the function will not execute'''
102
    @wraps(func)
103
    def deco(*args, **kwargs):
104
        try:
105
            debug = args[0].debug_echo
106
        except AttributeError:
107
            debug = False
108
        if debug:
109
            if len(args) == 1:
110
                to_print = []
111
            else:
112
                to_print = args[1:]
113
            print(func.__name__, repr(to_print), **kwargs)
114
115
        return func(*args, **kwargs)
116
    return deco
117
118
119
def rate_limited(max_per_second, mode='wait', delay_first_call=False):
120
    """
121
    Decorator that make functions not be called faster than
122
123
    set mode to 'kill' to just ignore requests that are faster than the
124
    rate.
125
126
    set mode to 'refresh_timer' to reset the timer on successive calls
127
128
    set delay_first_call to True to delay the first call as well
129
    """
130
    lock = threading.Lock()
131
    min_interval = 1.0 / float(max_per_second)
132
    def decorate(func):
133
        last_time_called = [0.0]
134
        @wraps(func)
135
        def rate_limited_function(*args, **kwargs):
136
            def run_func():
137
                lock.release()
138
                ret = func(*args, **kwargs)
139
                last_time_called[0] = time.perf_counter()
140
                return ret
141
            lock.acquire()
142
            elapsed = time.perf_counter() - last_time_called[0]
143
            left_to_wait = min_interval - elapsed
144
            if delay_first_call:
145
                if left_to_wait > 0:
146
                    if mode == 'wait':
147
                        time.sleep(left_to_wait)
148
                        return run_func()
149
                    elif mode == 'kill':
150
                        lock.release()
151
                        return
152
                else:
153
                    return run_func()
154
            else:
155
                if not last_time_called[0] or elapsed > min_interval:
156
                    return run_func()
157
                elif mode == 'refresh_timer':
158
                    print('Ref timer')
159
                    lock.release()
160
                    last_time_called[0] += time.perf_counter()
161
                    return
162
                elif left_to_wait > 0:
163
                    if mode == 'wait':
164
                        time.sleep(left_to_wait)
165
                        return run_func()
166
                    elif mode == 'kill':
167
                        lock.release()
168
                        return
169
        return rate_limited_function
170
    return decorate
171
172