Completed
Push — master ( efa4d0...b751a0 )
by Thomas
10:41
created

exabgp.application.pipe.Control.no_buffer()   A

Complexity

Conditions 1

Size

Total Lines 4
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 4
nop 2
dl 0
loc 4
rs 10
c 0
b 0
f 0
1
import os
2
import sys
3
import fcntl
4
import stat
5
import time
6
import signal
7
import select
8
import socket
9
import traceback
10
from collections import deque
11
12
from exabgp.reactor.network.error import error
13
14
kb = 1024
15
mb = kb * 1024
16
17
18
def named_pipe(root, pipename='exabgp'):
19
    locations = [
20
        '/run/exabgp/',
21
        '/run/%d/' % os.getuid(),
22
        '/run/',
23
        '/var/run/exabgp/',
24
        '/var/run/%d/' % os.getuid(),
25
        '/var/run/',
26
        root + '/run/exabgp/',
27
        root + '/run/%d/' % os.getuid(),
28
        root + '/run/',
29
        root + '/var/run/exabgp/',
30
        root + '/var/run/%d/' % os.getuid(),
31
        root + '/var/run/',
32
    ]
33
    for location in locations:
34
        cli_in = location + pipename + '.in'
35
        cli_out = location + pipename + '.out'
36
37
        try:
38
            if not stat.S_ISFIFO(os.stat(cli_in).st_mode):
39
                continue
40
            if not stat.S_ISFIFO(os.stat(cli_out).st_mode):
41
                continue
42
        except Exception:
43
            continue
44
        os.environ['exabgp_cli_pipe'] = location
45
        return [location]
46
    return locations
47
48
49
def env(app, section, name, default):
50
    r = os.environ.get('%s.%s.%s' % (app, section, name), None)
51
    if r is None:
52
        r = os.environ.get('%s_%s_%s' % (app, section, name), None)
53
    if r is None:
54
        return default
55
    return r
56
57
58
def check_fifo(name):
59
    try:
60
        if not stat.S_ISFIFO(os.stat(name).st_mode):
61
            sys.stdout.write('error: a file exist which is not a named pipe (%s)\n' % os.path.abspath(name))
62
            return False
63
64
        if not os.access(name, os.R_OK):
65
            sys.stdout.write(
66
                'error: a named pipe exists and we can not read/write to it (%s)\n' % os.path.abspath(name)
67
            )
68
            return False
69
        return True
70
    except OSError:
71
        sys.stdout.write('error: could not create the named pipe %s\n' % os.path.abspath(name))
72
        return False
73
    except IOError:
74
        sys.stdout.write('error: could not access/delete the named pipe %s\n' % os.path.abspath(name))
75
        sys.stdout.flush()
76
    except socket.error:
77
        sys.stdout.write('error: could not write on the named pipe %s\n' % os.path.abspath(name))
78
        sys.stdout.flush()
79
80
81
class Control(object):
82
    terminating = False
83
84
    def __init__(self, location):
85
        self.send = location + env('exabgp', 'api', 'pipename', 'exabgp') + '.out'
86
        self.recv = location + env('exabgp', 'api', 'pipename', 'exabgp') + '.in'
87
        self.r_pipe = None
88
89
    def init(self):
90
        # obviously this is vulnerable to race conditions ... if an attacker can create fifo in the folder
91
92
        if not check_fifo(self.recv):
93
            self.terminate()
94
            sys.exit(1)
95
96
        if not check_fifo(self.send):
97
            self.terminate()
98
            sys.exit(1)
99
100
        signal.signal(signal.SIGINT, self.terminate)
101
        signal.signal(signal.SIGTERM, self.terminate)
102
        return True
103
104
    def cleanup(self):
105
        def _close(pipe):
106
            if self.r_pipe:
107
                try:
108
                    os.close(pipe)
109
                except (OSError, IOError, TypeError):
110
                    pass
111
112
        _close(self.r_pipe)
113
114
    def terminate(self, ignore=None, me=None):
115
        # if the named pipe is open, and remove_fifo called
116
        # do not ignore a second signal
117
        if self.terminating:
118
            sys.exit(1)
119
        self.terminating = True
120
121
        self.cleanup()
122
123
    def read_on(self, reading):
124
        sleep_time = 1000
125
126
        poller = select.poll()
127
        for io in reading:
128
            poller.register(io, select.POLLIN | select.POLLPRI | select.POLLHUP | select.POLLNVAL | select.POLLERR)
129
130
        ready = []
131
        for io, event in poller.poll(sleep_time):
132
            if event & select.POLLIN or event & select.POLLPRI:
133
                ready.append(io)
134
            elif event & select.POLLHUP or event & select.POLLERR or event & select.POLLNVAL:
135
                sys.exit(1)
136
        return ready
137
138
    def no_buffer(self, fd):
139
        mfl = fcntl.fcntl(fd, fcntl.F_GETFL)
140
        mfl |= os.O_SYNC
141
        fcntl.fcntl(fd, fcntl.F_SETFL, mfl)
142
143
    def loop(self):
144
        try:
145
            self.r_pipe = os.open(self.recv, os.O_RDWR | os.O_NONBLOCK | os.O_EXCL)
146
        except OSError:
147
            self.terminate()
148
149
        standard_in = sys.stdin.fileno()
150
        standard_out = sys.stdout.fileno()
151
152
        def monitor(function):
153
            def wrapper(*args):
154
                # print >> sys.stderr, "%s(%s)" % (function.func_name,','.join([str(_).replace('\n','\\n') for _ in args]))
155
                r = function(*args)
156
                # print >> sys.stderr, "%s -> %s" % (function.func_name,str(r))
157
                return r
158
159
            return wrapper
160
161
        @monitor
162
        def std_reader(number):
163
            try:
164
                return os.read(standard_in, number)
165
            except OSError as exc:
166
                if exc.errno in error.block:
167
                    return ''
168
                sys.exit(1)
169
170
        @monitor
171
        def std_writer(line):
172
            try:
173
                return os.write(standard_out, line)
174
            except OSError as exc:
175
                if exc.errno in error.block:
176
                    return 0
177
                sys.exit(1)
178
179
        @monitor
180
        def fifo_reader(number):
181
            try:
182
                return os.read(self.r_pipe, number)
183
            except OSError as exc:
184
                if exc.errno in error.block:
185
                    return ''
186
                sys.exit(1)
187
188
        @monitor
189
        def fifo_writer(line):
190
            pipe, nb = None, 0
191
            try:
192
                pipe = os.open(self.send, os.O_WRONLY | os.O_NONBLOCK | os.O_EXCL)
193
                self.no_buffer(pipe)
194
            except OSError:
195
                time.sleep(0.05)
196
                return 0
197
            if pipe is not None:
198
                try:
199
                    nb = os.write(pipe, line)
200
                except OSError:
201
                    pass
202
                try:
203
                    os.close(pipe)
204
                except OSError:
205
                    pass
206
            return nb
207
208
        read = {
209
            standard_in: std_reader,
210
            self.r_pipe: fifo_reader,
211
        }
212
213
        write = {
214
            standard_in: fifo_writer,
215
            self.r_pipe: std_writer,
216
        }
217
218
        backlog = {
219
            standard_in: deque(),
220
            self.r_pipe: deque(),
221
        }
222
223
        store = {
224
            standard_in: b'',
225
            self.r_pipe: b'',
226
        }
227
228
        def consume(source):
229
            if not backlog[source] and b'\n' not in store[source]:
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable store does not seem to be defined.
Loading history...
230
                store[source] += read[source](1024)
231
            else:
232
                backlog[source].append(read[source](1024))
233
                # assuming a route takes 80 chars, 100 Mb is over 1Millions routes
234
                # something is really wrong if it was not consummed
235
                if len(backlog) > 100 * mb:
236
                    sys.stderr.write('using too much memory - exiting')
237
                    sys.exit(1)
238
239
        reading = [standard_in, self.r_pipe]
240
241
        while True:
242
            ready = self.read_on(reading)
243
244
            # command from user
245
            if self.r_pipe in ready:
246
                consume(self.r_pipe)
247
            if standard_in in ready:
248
                consume(standard_in)
249
250
            for source in reading:
251
                while b'\n' in store[source]:
252
                    line, _ = store[source].split(b'\n', 1)
253
                    # sys.stderr.write(str(line).replace('\n','\\n') + '\n')
254
                    # sys.stderr.flush()
255
                    sent = write[source](line + b'\n')
256
                    # sys.stderr.write('sent %d\n' % sent)
257
                    # sys.stderr.flush()
258
                    if sent:
259
                        store[source] = store[source][sent:]
260
                        continue
261
                    break
262
                if backlog[source]:
263
                    store[source] += backlog[source].popleft()
264
265
    def run(self):
266
        if not self.init():
267
            sys.exit(1)
268
        try:
269
            self.loop()
270
        except KeyboardInterrupt:
271
            self.cleanup()
272
            sys.exit(0)
273
        except Exception as exc:
274
            sys.stderr.write(str(exc))
275
            sys.stderr.write('\n\n')
276
            sys.stderr.flush()
277
            traceback.print_exc(file=sys.stderr)
278
            sys.stderr.flush()
279
            self.cleanup()
280
            sys.exit(1)
281
282
283
def main(location=''):
284
    if not location:
285
        location = os.environ.get('exabgp_cli_pipe', '')
286
    if not location:
287
        sys.stderr.write("usage %s %s\n" % (sys.executable, ' '.join(sys.argv)))
288
        sys.stderr.write("run with 'env exabgp_cli_pipe=<location>' if you are trying to mess with ExaBGP's intenals")
289
        sys.stderr.flush()
290
        sys.exit(1)
291
    Control(location).run()
292
293
294
if __name__ == '__main__':
295
    main()
296