1 | import os |
||
2 | import sys |
||
3 | import fcntl |
||
4 | import stat |
||
5 | import time |
||
6 | import signal |
||
7 | import select |
||
8 | import socket |
||
9 | import traceback |
||
10 | from collections import deque |
||
11 | |||
12 | from exabgp.reactor.network.error import error |
||
13 | |||
14 | kb = 1024 |
||
15 | mb = kb * 1024 |
||
16 | |||
17 | |||
18 | def named_pipe(root, pipename='exabgp'): |
||
19 | locations = [ |
||
20 | '/run/exabgp/', |
||
21 | '/run/%d/' % os.getuid(), |
||
22 | '/run/', |
||
23 | '/var/run/exabgp/', |
||
24 | '/var/run/%d/' % os.getuid(), |
||
25 | '/var/run/', |
||
26 | root + '/run/exabgp/', |
||
27 | root + '/run/%d/' % os.getuid(), |
||
28 | root + '/run/', |
||
29 | root + '/var/run/exabgp/', |
||
30 | root + '/var/run/%d/' % os.getuid(), |
||
31 | root + '/var/run/', |
||
32 | ] |
||
33 | for location in locations: |
||
34 | cli_in = location + pipename + '.in' |
||
35 | cli_out = location + pipename + '.out' |
||
36 | |||
37 | try: |
||
38 | if not stat.S_ISFIFO(os.stat(cli_in).st_mode): |
||
39 | continue |
||
40 | if not stat.S_ISFIFO(os.stat(cli_out).st_mode): |
||
41 | continue |
||
42 | except Exception: |
||
43 | continue |
||
44 | os.environ['exabgp_cli_pipe'] = location |
||
45 | return [location] |
||
46 | return locations |
||
47 | |||
48 | |||
49 | def env(app, section, name, default): |
||
50 | r = os.environ.get('%s.%s.%s' % (app, section, name), None) |
||
51 | if r is None: |
||
52 | r = os.environ.get('%s_%s_%s' % (app, section, name), None) |
||
53 | if r is None: |
||
54 | return default |
||
55 | return r |
||
56 | |||
57 | |||
58 | def check_fifo(name): |
||
59 | try: |
||
60 | if not stat.S_ISFIFO(os.stat(name).st_mode): |
||
61 | sys.stdout.write('error: a file exist which is not a named pipe (%s)\n' % os.path.abspath(name)) |
||
62 | return False |
||
63 | |||
64 | if not os.access(name, os.R_OK): |
||
65 | sys.stdout.write( |
||
66 | 'error: a named pipe exists and we can not read/write to it (%s)\n' % os.path.abspath(name) |
||
67 | ) |
||
68 | return False |
||
69 | return True |
||
70 | except OSError: |
||
71 | sys.stdout.write('error: could not create the named pipe %s\n' % os.path.abspath(name)) |
||
72 | return False |
||
73 | except IOError: |
||
74 | sys.stdout.write('error: could not access/delete the named pipe %s\n' % os.path.abspath(name)) |
||
75 | sys.stdout.flush() |
||
76 | except socket.error: |
||
77 | sys.stdout.write('error: could not write on the named pipe %s\n' % os.path.abspath(name)) |
||
78 | sys.stdout.flush() |
||
79 | |||
80 | |||
81 | class Control(object): |
||
82 | terminating = False |
||
83 | |||
84 | def __init__(self, location): |
||
85 | self.send = location + env('exabgp', 'api', 'pipename', 'exabgp') + '.out' |
||
86 | self.recv = location + env('exabgp', 'api', 'pipename', 'exabgp') + '.in' |
||
87 | self.r_pipe = None |
||
88 | |||
89 | def init(self): |
||
90 | # obviously this is vulnerable to race conditions ... if an attacker can create fifo in the folder |
||
91 | |||
92 | if not check_fifo(self.recv): |
||
93 | self.terminate() |
||
94 | sys.exit(1) |
||
95 | |||
96 | if not check_fifo(self.send): |
||
97 | self.terminate() |
||
98 | sys.exit(1) |
||
99 | |||
100 | signal.signal(signal.SIGINT, self.terminate) |
||
101 | signal.signal(signal.SIGTERM, self.terminate) |
||
102 | return True |
||
103 | |||
104 | def cleanup(self): |
||
105 | def _close(pipe): |
||
106 | if self.r_pipe: |
||
107 | try: |
||
108 | os.close(pipe) |
||
109 | except (OSError, IOError, TypeError): |
||
110 | pass |
||
111 | |||
112 | _close(self.r_pipe) |
||
113 | |||
114 | def terminate(self, ignore=None, me=None): |
||
115 | # if the named pipe is open, and remove_fifo called |
||
116 | # do not ignore a second signal |
||
117 | if self.terminating: |
||
118 | sys.exit(1) |
||
119 | self.terminating = True |
||
120 | |||
121 | self.cleanup() |
||
122 | |||
123 | def read_on(self, reading): |
||
124 | sleep_time = 1000 |
||
125 | |||
126 | poller = select.poll() |
||
127 | for io in reading: |
||
128 | poller.register(io, select.POLLIN | select.POLLPRI | select.POLLHUP | select.POLLNVAL | select.POLLERR) |
||
129 | |||
130 | ready = [] |
||
131 | for io, event in poller.poll(sleep_time): |
||
132 | if event & select.POLLIN or event & select.POLLPRI: |
||
133 | ready.append(io) |
||
134 | elif event & select.POLLHUP or event & select.POLLERR or event & select.POLLNVAL: |
||
135 | sys.exit(1) |
||
136 | return ready |
||
137 | |||
138 | def no_buffer(self, fd): |
||
139 | mfl = fcntl.fcntl(fd, fcntl.F_GETFL) |
||
140 | mfl |= os.O_SYNC |
||
141 | fcntl.fcntl(fd, fcntl.F_SETFL, mfl) |
||
142 | |||
143 | def loop(self): |
||
144 | try: |
||
145 | self.r_pipe = os.open(self.recv, os.O_RDWR | os.O_NONBLOCK | os.O_EXCL) |
||
146 | except OSError: |
||
147 | self.terminate() |
||
148 | |||
149 | standard_in = sys.stdin.fileno() |
||
150 | standard_out = sys.stdout.fileno() |
||
151 | |||
152 | def monitor(function): |
||
153 | def wrapper(*args): |
||
154 | # print >> sys.stderr, "%s(%s)" % (function.func_name,','.join([str(_).replace('\n','\\n') for _ in args])) |
||
155 | r = function(*args) |
||
156 | # print >> sys.stderr, "%s -> %s" % (function.func_name,str(r)) |
||
157 | return r |
||
158 | |||
159 | return wrapper |
||
160 | |||
161 | @monitor |
||
162 | def std_reader(number): |
||
163 | try: |
||
164 | return os.read(standard_in, number) |
||
165 | except OSError as exc: |
||
166 | if exc.errno in error.block: |
||
167 | return '' |
||
168 | sys.exit(1) |
||
169 | |||
170 | @monitor |
||
171 | def std_writer(line): |
||
172 | try: |
||
173 | return os.write(standard_out, line) |
||
174 | except OSError as exc: |
||
175 | if exc.errno in error.block: |
||
176 | return 0 |
||
177 | sys.exit(1) |
||
178 | |||
179 | @monitor |
||
180 | def fifo_reader(number): |
||
181 | try: |
||
182 | return os.read(self.r_pipe, number) |
||
183 | except OSError as exc: |
||
184 | if exc.errno in error.block: |
||
185 | return '' |
||
186 | sys.exit(1) |
||
187 | |||
188 | @monitor |
||
189 | def fifo_writer(line): |
||
190 | pipe, nb = None, 0 |
||
191 | try: |
||
192 | pipe = os.open(self.send, os.O_WRONLY | os.O_NONBLOCK | os.O_EXCL) |
||
193 | self.no_buffer(pipe) |
||
194 | except OSError: |
||
195 | time.sleep(0.05) |
||
196 | return 0 |
||
197 | if pipe is not None: |
||
198 | try: |
||
199 | nb = os.write(pipe, line) |
||
200 | except OSError: |
||
201 | pass |
||
202 | try: |
||
203 | os.close(pipe) |
||
204 | except OSError: |
||
205 | pass |
||
206 | return nb |
||
207 | |||
208 | read = { |
||
209 | standard_in: std_reader, |
||
210 | self.r_pipe: fifo_reader, |
||
211 | } |
||
212 | |||
213 | write = { |
||
214 | standard_in: fifo_writer, |
||
215 | self.r_pipe: std_writer, |
||
216 | } |
||
217 | |||
218 | backlog = { |
||
219 | standard_in: deque(), |
||
220 | self.r_pipe: deque(), |
||
221 | } |
||
222 | |||
223 | store = { |
||
224 | standard_in: b'', |
||
225 | self.r_pipe: b'', |
||
226 | } |
||
227 | |||
228 | def consume(source): |
||
229 | if not backlog[source] and b'\n' not in store[source]: |
||
0 ignored issues
–
show
Comprehensibility
Best Practice
introduced
by
Loading history...
|
|||
230 | store[source] += read[source](1024) |
||
231 | else: |
||
232 | backlog[source].append(read[source](1024)) |
||
233 | # assuming a route takes 80 chars, 100 Mb is over 1Millions routes |
||
234 | # something is really wrong if it was not consummed |
||
235 | if len(backlog) > 100 * mb: |
||
236 | sys.stderr.write('using too much memory - exiting') |
||
237 | sys.exit(1) |
||
238 | |||
239 | reading = [standard_in, self.r_pipe] |
||
240 | |||
241 | while True: |
||
242 | ready = self.read_on(reading) |
||
243 | |||
244 | # command from user |
||
245 | if self.r_pipe in ready: |
||
246 | consume(self.r_pipe) |
||
247 | if standard_in in ready: |
||
248 | consume(standard_in) |
||
249 | |||
250 | for source in reading: |
||
251 | while b'\n' in store[source]: |
||
252 | line, _ = store[source].split(b'\n', 1) |
||
253 | # sys.stderr.write(str(line).replace('\n','\\n') + '\n') |
||
254 | # sys.stderr.flush() |
||
255 | sent = write[source](line + b'\n') |
||
256 | # sys.stderr.write('sent %d\n' % sent) |
||
257 | # sys.stderr.flush() |
||
258 | if sent: |
||
259 | store[source] = store[source][sent:] |
||
260 | continue |
||
261 | break |
||
262 | if backlog[source]: |
||
263 | store[source] += backlog[source].popleft() |
||
264 | |||
265 | def run(self): |
||
266 | if not self.init(): |
||
267 | sys.exit(1) |
||
268 | try: |
||
269 | self.loop() |
||
270 | except KeyboardInterrupt: |
||
271 | self.cleanup() |
||
272 | sys.exit(0) |
||
273 | except Exception as exc: |
||
274 | sys.stderr.write(str(exc)) |
||
275 | sys.stderr.write('\n\n') |
||
276 | sys.stderr.flush() |
||
277 | traceback.print_exc(file=sys.stderr) |
||
278 | sys.stderr.flush() |
||
279 | self.cleanup() |
||
280 | sys.exit(1) |
||
281 | |||
282 | |||
283 | def main(location=''): |
||
284 | if not location: |
||
285 | location = os.environ.get('exabgp_cli_pipe', '') |
||
286 | if not location: |
||
287 | sys.stderr.write("usage %s %s\n" % (sys.executable, ' '.join(sys.argv))) |
||
288 | sys.stderr.write("run with 'env exabgp_cli_pipe=<location>' if you are trying to mess with ExaBGP's intenals") |
||
289 | sys.stderr.flush() |
||
290 | sys.exit(1) |
||
291 | Control(location).run() |
||
292 | |||
293 | |||
294 | if __name__ == '__main__': |
||
295 | main() |
||
296 |