exabgp.application.pipe   F
last analyzed

Complexity

Total Complexity 61

Size/Duplication

Total Lines 294
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 231
dl 0
loc 294
rs 3.52
c 0
b 0
f 0
wmc 61

4 Functions

Rating   Name   Duplication   Size   Complexity  
A env() 0 7 3
B named_pipe() 0 29 5
B check_fifo() 0 21 6
A main() 0 9 3

8 Methods

Rating   Name   Duplication   Size   Complexity  
A Control.no_buffer() 0 4 1
A Control.__init__() 0 4 1
A Control.init() 0 14 3
A Control.cleanup() 0 9 3
B Control.read_on() 0 14 8
A Control.terminate() 0 8 2
A Control.run() 0 16 4
F Control.loop() 0 119 22

How to fix   Complexity   

Complexity

Complex classes like exabgp.application.pipe often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
import os
2
import sys
3
import fcntl
4
import stat
5
import time
6
import signal
7
import select
8
import socket
9
import traceback
10
from collections import deque
11
12
from exabgp.reactor.network.error import error
13
14
kb = 1024
15
mb = kb * 1024
16
17
18
def named_pipe(root, pipename='exabgp'):
19
    locations = [
20
        '/run/exabgp/',
21
        '/run/%d/' % os.getuid(),
22
        '/run/',
23
        '/var/run/exabgp/',
24
        '/var/run/%d/' % os.getuid(),
25
        '/var/run/',
26
        root + '/run/exabgp/',
27
        root + '/run/%d/' % os.getuid(),
28
        root + '/run/',
29
        root + '/var/run/exabgp/',
30
        root + '/var/run/%d/' % os.getuid(),
31
        root + '/var/run/',
32
    ]
33
    for location in locations:
34
        cli_in = location + pipename + '.in'
35
        cli_out = location + pipename + '.out'
36
37
        try:
38
            if not stat.S_ISFIFO(os.stat(cli_in).st_mode):
39
                continue
40
            if not stat.S_ISFIFO(os.stat(cli_out).st_mode):
41
                continue
42
        except Exception:
43
            continue
44
        os.environ['exabgp_cli_pipe'] = location
45
        return [location]
46
    return locations
47
48
49
def env(app, section, name, default):
50
    r = os.environ.get('%s.%s.%s' % (app, section, name), None)
51
    if r is None:
52
        r = os.environ.get('%s_%s_%s' % (app, section, name), None)
53
    if r is None:
54
        return default
55
    return r
56
57
58
def check_fifo(name):
59
    try:
60
        if not stat.S_ISFIFO(os.stat(name).st_mode):
61
            sys.stdout.write('error: a file exist which is not a named pipe (%s)\n' % os.path.abspath(name))
62
            return False
63
64
        if not os.access(name, os.R_OK):
65
            sys.stdout.write(
66
                'error: a named pipe exists and we can not read/write to it (%s)\n' % os.path.abspath(name)
67
            )
68
            return False
69
        return True
70
    except OSError:
71
        sys.stdout.write('error: could not create the named pipe %s\n' % os.path.abspath(name))
72
        return False
73
    except IOError:
74
        sys.stdout.write('error: could not access/delete the named pipe %s\n' % os.path.abspath(name))
75
        sys.stdout.flush()
76
    except socket.error:
77
        sys.stdout.write('error: could not write on the named pipe %s\n' % os.path.abspath(name))
78
        sys.stdout.flush()
79
80
81
class Control(object):
82
    terminating = False
83
84
    def __init__(self, location):
85
        self.send = location + env('exabgp', 'api', 'pipename', 'exabgp') + '.out'
86
        self.recv = location + env('exabgp', 'api', 'pipename', 'exabgp') + '.in'
87
        self.r_pipe = None
88
89
    def init(self):
90
        # obviously this is vulnerable to race conditions ... if an attacker can create fifo in the folder
91
92
        if not check_fifo(self.recv):
93
            self.terminate()
94
            sys.exit(1)
95
96
        if not check_fifo(self.send):
97
            self.terminate()
98
            sys.exit(1)
99
100
        signal.signal(signal.SIGINT, self.terminate)
101
        signal.signal(signal.SIGTERM, self.terminate)
102
        return True
103
104
    def cleanup(self):
105
        def _close(pipe):
106
            if self.r_pipe:
107
                try:
108
                    os.close(pipe)
109
                except (OSError, IOError, TypeError):
110
                    pass
111
112
        _close(self.r_pipe)
113
114
    def terminate(self, ignore=None, me=None):
115
        # if the named pipe is open, and remove_fifo called
116
        # do not ignore a second signal
117
        if self.terminating:
118
            sys.exit(1)
119
        self.terminating = True
120
121
        self.cleanup()
122
123
    def read_on(self, reading):
124
        sleep_time = 1000
125
126
        poller = select.poll()
127
        for io in reading:
128
            poller.register(io, select.POLLIN | select.POLLPRI | select.POLLHUP | select.POLLNVAL | select.POLLERR)
129
130
        ready = []
131
        for io, event in poller.poll(sleep_time):
132
            if event & select.POLLIN or event & select.POLLPRI:
133
                ready.append(io)
134
            elif event & select.POLLHUP or event & select.POLLERR or event & select.POLLNVAL:
135
                sys.exit(1)
136
        return ready
137
138
    def no_buffer(self, fd):
139
        mfl = fcntl.fcntl(fd, fcntl.F_GETFL)
140
        mfl |= os.O_SYNC
141
        fcntl.fcntl(fd, fcntl.F_SETFL, mfl)
142
143
    def loop(self):
144
        try:
145
            self.r_pipe = os.open(self.recv, os.O_RDWR | os.O_NONBLOCK | os.O_EXCL)
146
        except OSError:
147
            self.terminate()
148
149
        standard_in = sys.stdin.fileno()
150
        standard_out = sys.stdout.fileno()
151
152
        def monitor(function):
153
            def wrapper(*args):
154
                r = function(*args)
155
                return r
156
157
            return wrapper
158
159
        @monitor
160
        def std_reader(number):
161
            try:
162
                return os.read(standard_in, number)
163
            except OSError as exc:
164
                if exc.errno in error.block:
165
                    return ''
166
                sys.exit(1)
167
168
        @monitor
169
        def std_writer(line):
170
            try:
171
                return os.write(standard_out, line)
172
            except OSError as exc:
173
                if exc.errno in error.block:
174
                    return 0
175
                sys.exit(1)
176
177
        @monitor
178
        def fifo_reader(number):
179
            try:
180
                return os.read(self.r_pipe, number)
181
            except OSError as exc:
182
                if exc.errno in error.block:
183
                    return ''
184
                sys.exit(1)
185
186
        @monitor
187
        def fifo_writer(line):
188
            pipe, nb = None, 0
189
            try:
190
                pipe = os.open(self.send, os.O_WRONLY | os.O_NONBLOCK | os.O_EXCL)
191
                self.no_buffer(pipe)
192
            except OSError:
193
                time.sleep(0.05)
194
                return 0
195
            if pipe is not None:
196
                try:
197
                    nb = os.write(pipe, line)
198
                except OSError:
199
                    pass
200
                try:
201
                    os.close(pipe)
202
                except OSError:
203
                    pass
204
            return nb
205
206
        read = {
207
            standard_in: std_reader,
208
            self.r_pipe: fifo_reader,
209
        }
210
211
        write = {
212
            standard_in: fifo_writer,
213
            self.r_pipe: std_writer,
214
        }
215
216
        backlog = {
217
            standard_in: deque(),
218
            self.r_pipe: deque(),
219
        }
220
221
        store = {
222
            standard_in: b'',
223
            self.r_pipe: b'',
224
        }
225
226
        def consume(source):
227
            if not backlog[source] and b'\n' not in store[source]:
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable store does not seem to be defined.
Loading history...
228
                store[source] += read[source](1024)
229
            else:
230
                backlog[source].append(read[source](1024))
231
                # assuming a route takes 80 chars, 100 Mb is over 1Millions routes
232
                # something is really wrong if it was not consummed
233
                if len(backlog) > 100 * mb:
234
                    sys.stderr.write('using too much memory - exiting')
235
                    sys.exit(1)
236
237
        reading = [standard_in, self.r_pipe]
238
239
        while True:
240
            ready = self.read_on(reading)
241
242
            # command from user
243
            if self.r_pipe in ready:
244
                consume(self.r_pipe)
245
            if standard_in in ready:
246
                consume(standard_in)
247
248
            for source in reading:
249
                while b'\n' in store[source]:
250
                    line, _ = store[source].split(b'\n', 1)
251
                    # sys.stderr.write(str(line).replace('\n','\\n') + '\n')
252
                    # sys.stderr.flush()
253
                    sent = write[source](line + b'\n')
254
                    # sys.stderr.write('sent %d\n' % sent)
255
                    # sys.stderr.flush()
256
                    if sent:
257
                        store[source] = store[source][sent:]
258
                        continue
259
                    break
260
                if backlog[source]:
261
                    store[source] += backlog[source].popleft()
262
263
    def run(self):
264
        if not self.init():
265
            sys.exit(1)
266
        try:
267
            self.loop()
268
        except KeyboardInterrupt:
269
            self.cleanup()
270
            sys.exit(0)
271
        except Exception as exc:
272
            sys.stderr.write(str(exc))
273
            sys.stderr.write('\n\n')
274
            sys.stderr.flush()
275
            traceback.print_exc(file=sys.stderr)
276
            sys.stderr.flush()
277
            self.cleanup()
278
            sys.exit(1)
279
280
281
def main(location=''):
282
    if not location:
283
        location = os.environ.get('exabgp_cli_pipe', '')
284
    if not location:
285
        sys.stderr.write("usage %s %s\n" % (sys.executable, ' '.join(sys.argv)))
286
        sys.stderr.write("run with 'env exabgp_cli_pipe=<location>' if you are trying to mess with ExaBGP's intenals")
287
        sys.stderr.flush()
288
        sys.exit(1)
289
    Control(location).run()
290
291
292
if __name__ == '__main__':
293
    main()
294