Test Failed
Push — master ( db4166...efa4d0 )
by Thomas
11:36
created

exabgp.application.control   F

Complexity

Total Complexity 62

Size/Duplication

Total Lines 298
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 233
dl 0
loc 298
rs 3.44
c 0
b 0
f 0
wmc 62

8 Methods

Rating   Name   Duplication   Size   Complexity  
A Control.__init__() 0 4 1
B Control.read_on() 0 14 8
A Control.terminate() 0 8 2
A Control.init() 0 14 3
A Control.cleanup() 0 9 3
A Control.run() 0 16 4
A Control.no_buffer() 0 4 1
F Control.loop() 0 121 22

4 Functions

Rating   Name   Duplication   Size   Complexity  
A env() 0 7 3
A main() 0 9 3
B named_pipe() 0 31 6
B check_fifo() 0 21 6

How to fix   Complexity   

Complexity

Complex classes like exabgp.application.control often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
import os
2
import sys
3
import fcntl
4
import stat
5
import time
6
import signal
7
import select
8
import socket
9
import traceback
10
from collections import deque
11
12
from exabgp.reactor.network.error import error
13
14
kb = 1024
15
mb = kb * 1024
16
17
18
def named_pipe(root, pipename='exabgp'):
19
    locations = [
20
        '/run/exabgp/',
21
        '/run/%d/' % os.getuid(),
22
        '/run/',
23
        '/var/run/exabgp/',
24
        '/var/run/%d/' % os.getuid(),
25
        '/var/run/',
26
        root + '/run/exabgp/',
27
        root + '/run/%d/' % os.getuid(),
28
        root + '/run/',
29
        root + '/var/run/exabgp/',
30
        root + '/var/run/%d/' % os.getuid(),
31
        root + '/var/run/',
32
    ]
33
    for location in locations:
34
        cli_in = location + pipename + '.in'
35
        cli_out = location + pipename + '.out'
36
37
        try:
38
            if not stat.S_ISFIFO(os.stat(cli_in).st_mode):
39
                continue
40
            if not stat.S_ISFIFO(os.stat(cli_out).st_mode):
41
                continue
42
        except KeyboardInterrupt:
43
            raise
44
        except Exception:
45
            continue
46
        os.environ['exabgp_cli_pipe'] = location
47
        return [location]
48
    return locations
49
50
51
def env(app, section, name, default):
52
    r = os.environ.get('%s.%s.%s' % (app, section, name), None)
53
    if r is None:
54
        r = os.environ.get('%s_%s_%s' % (app, section, name), None)
55
    if r is None:
56
        return default
57
    return r
58
59
60
def check_fifo(name):
61
    try:
62
        if not stat.S_ISFIFO(os.stat(name).st_mode):
63
            sys.stdout.write('error: a file exist which is not a named pipe (%s)\n' % os.path.abspath(name))
64
            return False
65
66
        if not os.access(name, os.R_OK):
67
            sys.stdout.write(
68
                'error: a named pipe exists and we can not read/write to it (%s)\n' % os.path.abspath(name)
69
            )
70
            return False
71
        return True
72
    except OSError:
73
        sys.stdout.write('error: could not create the named pipe %s\n' % os.path.abspath(name))
74
        return False
75
    except IOError:
76
        sys.stdout.write('error: could not access/delete the named pipe %s\n' % os.path.abspath(name))
77
        sys.stdout.flush()
78
    except socket.error:
79
        sys.stdout.write('error: could not write on the named pipe %s\n' % os.path.abspath(name))
80
        sys.stdout.flush()
81
82
83
class Control(object):
84
    terminating = False
85
86
    def __init__(self, location):
87
        self.send = location + env('exabgp', 'api', 'pipename', 'exabgp') + '.out'
88
        self.recv = location + env('exabgp', 'api', 'pipename', 'exabgp') + '.in'
89
        self.r_pipe = None
90
91
    def init(self):
92
        # obviously this is vulnerable to race conditions ... if an attacker can create fifo in the folder
93
94
        if not check_fifo(self.recv):
95
            self.terminate()
96
            sys.exit(1)
97
98
        if not check_fifo(self.send):
99
            self.terminate()
100
            sys.exit(1)
101
102
        signal.signal(signal.SIGINT, self.terminate)
103
        signal.signal(signal.SIGTERM, self.terminate)
104
        return True
105
106
    def cleanup(self):
107
        def _close(pipe):
108
            if self.r_pipe:
109
                try:
110
                    os.close(pipe)
111
                except (OSError, IOError, TypeError):
112
                    pass
113
114
        _close(self.r_pipe)
115
116
    def terminate(self, ignore=None, me=None):
117
        # if the named pipe is open, and remove_fifo called
118
        # do not ignore a second signal
119
        if self.terminating:
120
            sys.exit(1)
121
        self.terminating = True
122
123
        self.cleanup()
124
125
    def read_on(self, reading):
126
        sleep_time = 1000
127
128
        poller = select.poll()
129
        for io in reading:
130
            poller.register(io, select.POLLIN | select.POLLPRI | select.POLLHUP | select.POLLNVAL | select.POLLERR)
131
132
        ready = []
133
        for io, event in poller.poll(sleep_time):
134
            if event & select.POLLIN or event & select.POLLPRI:
135
                ready.append(io)
136
            elif event & select.POLLHUP or event & select.POLLERR or event & select.POLLNVAL:
137
                sys.exit(1)
138
        return ready
139
140
    def no_buffer(self, fd):
141
        mfl = fcntl.fcntl(fd, fcntl.F_GETFL)
142
        mfl |= os.O_SYNC
143
        fcntl.fcntl(fd, fcntl.F_SETFL, mfl)
144
145
    def loop(self):
146
        try:
147
            self.r_pipe = os.open(self.recv, os.O_RDWR | os.O_NONBLOCK | os.O_EXCL)
148
        except OSError:
149
            self.terminate()
150
151
        standard_in = sys.stdin.fileno()
152
        standard_out = sys.stdout.fileno()
153
154
        def monitor(function):
155
            def wrapper(*args):
156
                # print >> sys.stderr, "%s(%s)" % (function.func_name,','.join([str(_).replace('\n','\\n') for _ in args]))
157
                r = function(*args)
158
                # print >> sys.stderr, "%s -> %s" % (function.func_name,str(r))
159
                return r
160
161
            return wrapper
162
163
        @monitor
164
        def std_reader(number):
165
            try:
166
                return os.read(standard_in, number)
167
            except OSError as exc:
168
                if exc.errno in error.block:
169
                    return ''
170
                sys.exit(1)
171
172
        @monitor
173
        def std_writer(line):
174
            try:
175
                return os.write(standard_out, line)
176
            except OSError as exc:
177
                if exc.errno in error.block:
178
                    return 0
179
                sys.exit(1)
180
181
        @monitor
182
        def fifo_reader(number):
183
            try:
184
                return os.read(self.r_pipe, number)
185
            except OSError as exc:
186
                if exc.errno in error.block:
187
                    return ''
188
                sys.exit(1)
189
190
        @monitor
191
        def fifo_writer(line):
192
            pipe, nb = None, 0
193
            try:
194
                pipe = os.open(self.send, os.O_WRONLY | os.O_NONBLOCK | os.O_EXCL)
195
                self.no_buffer(pipe)
196
            except OSError:
197
                time.sleep(0.05)
198
                return 0
199
            if pipe is not None:
200
                try:
201
                    nb = os.write(pipe, line)
202
                except OSError:
203
                    pass
204
                try:
205
                    os.close(pipe)
206
                except OSError:
207
                    pass
208
            return nb
209
210
        read = {
211
            standard_in: std_reader,
212
            self.r_pipe: fifo_reader,
213
        }
214
215
        write = {
216
            standard_in: fifo_writer,
217
            self.r_pipe: std_writer,
218
        }
219
220
        backlog = {
221
            standard_in: deque(),
222
            self.r_pipe: deque(),
223
        }
224
225
        store = {
226
            standard_in: b'',
227
            self.r_pipe: b'',
228
        }
229
230
        def consume(source):
231
            if not backlog[source] and b'\n' not in store[source]:
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable store does not seem to be defined.
Loading history...
232
                store[source] += read[source](1024)
233
            else:
234
                backlog[source].append(read[source](1024))
235
                # assuming a route takes 80 chars, 100 Mb is over 1Millions routes
236
                # something is really wrong if it was not consummed
237
                if len(backlog) > 100 * mb:
238
                    sys.stderr.write('using too much memory - exiting')
239
                    sys.exit(1)
240
241
        reading = [standard_in, self.r_pipe]
242
243
        while True:
244
            ready = self.read_on(reading)
245
246
            # command from user
247
            if self.r_pipe in ready:
248
                consume(self.r_pipe)
249
            if standard_in in ready:
250
                consume(standard_in)
251
252
            for source in reading:
253
                while b'\n' in store[source]:
254
                    line, _ = store[source].split(b'\n', 1)
255
                    # sys.stderr.write(str(line).replace('\n','\\n') + '\n')
256
                    # sys.stderr.flush()
257
                    sent = write[source](line + b'\n')
258
                    # sys.stderr.write('sent %d\n' % sent)
259
                    # sys.stderr.flush()
260
                    if sent:
261
                        store[source] = store[source][sent:]
262
                        continue
263
                    break
264
                if backlog[source]:
265
                    store[source] += backlog[source].popleft()
266
267
    def run(self):
268
        if not self.init():
269
            sys.exit(1)
270
        try:
271
            self.loop()
272
        except KeyboardInterrupt:
273
            self.cleanup()
274
            sys.exit(0)
275
        except Exception as exc:
276
            sys.stderr.write(str(exc))
277
            sys.stderr.write('\n\n')
278
            sys.stderr.flush()
279
            traceback.print_exc(file=sys.stderr)
280
            sys.stderr.flush()
281
            self.cleanup()
282
            sys.exit(1)
283
284
285
def main(location=''):
286
    if not location:
287
        location = os.environ.get('exabgp_cli_pipe', '')
288
    if not location:
289
        sys.stderr.write("usage %s %s\n" % (sys.executable, ' '.join(sys.argv)))
290
        sys.stderr.write("run with 'env exabgp_cli_pipe=<location>' if you are trying to mess with ExaBGP's intenals")
291
        sys.stderr.flush()
292
        sys.exit(1)
293
    Control(location).run()
294
295
296
if __name__ == '__main__':
297
    main()
298