Completed
Push — master ( d75605...cff8bf )
by Thomas
12:23
created

exabgp.application.control.Control.init()   A

Complexity

Conditions 3

Size

Total Lines 14
Code Lines 10

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 3
eloc 10
nop 1
dl 0
loc 14
rs 9.9
c 0
b 0
f 0
1
"""
2
control.py
3
4
Created by Thomas Mangin on 2015-01-13.
5
Copyright (c) 2015-2017 Exa Networks. All rights reserved.
6
License: 3-clause BSD. (See the COPYRIGHT file)
7
"""
8
9
import os
10
import sys
11
import fcntl
12
import stat
13
import time
14
import signal
15
import select
16
import socket
17
import traceback
18
from collections import deque
19
20
from exabgp.util import str_ascii
21
from exabgp.reactor.network.error import error
22
23
kb = 1024
24
mb = kb*1024
25
26
27
def env (app, section, name, default):
28
	r = os.environ.get('%s.%s.%s' % (app,section,name), None)
29
	if r is None:
30
		r = os.environ.get('%s_%s_%s' % (app,section,name), None)
31
	if r is None:
32
		return default
33
	return r
34
35
36
def check_fifo (name):
37
	try:
38
		if not stat.S_ISFIFO(os.stat(name).st_mode):
39
			sys.stdout.write('error: a file exist which is not a named pipe (%s)\n' % os.path.abspath(name))
40
			return False
41
42
		if not os.access(name,os.R_OK):
43
			sys.stdout.write('error: a named pipe exists and we can not read/write to it (%s)\n' % os.path.abspath(name))
44
			return False
45
		return True
46
	except OSError:
47
		sys.stdout.write('error: could not create the named pipe %s\n' % os.path.abspath(name))
48
		return False
49
	except IOError:
50
		sys.stdout.write('error: could not access/delete the named pipe %s\n' % os.path.abspath(name))
51
		sys.stdout.flush()
52
	except socket.error:
53
		sys.stdout.write('error: could not write on the named pipe %s\n' % os.path.abspath(name))
54
		sys.stdout.flush()
55
56
57
class Control (object):
58
	terminating = False
59
60
	def __init__ (self, location):
61
		self.send = location + env('exabgp','api','pipename','exabgp') + '.out'
62
		self.recv = location + env('exabgp','api','pipename','exabgp') + '.in'
63
		self.r_pipe = None
64
65
	def init (self):
66
		# obviously this is vulnerable to race conditions ... if an attacker can create fifo in the folder
67
68
		if not check_fifo(self.recv):
69
			self.terminate()
70
			sys.exit(1)
71
72
		if not check_fifo(self.send):
73
			self.terminate()
74
			sys.exit(1)
75
76
		signal.signal(signal.SIGINT, self.terminate)
77
		signal.signal(signal.SIGTERM, self.terminate)
78
		return True
79
80
	def cleanup (self):
81
		def _close (pipe):
82
			if self.r_pipe:
83
				try:
84
					os.close(pipe)
85
				except (OSError,IOError,TypeError):
86
					pass
87
88
		_close(self.r_pipe)
89
90
	def terminate (self,ignore=None,me=None):
91
		# if the named pipe is open, and remove_fifo called
92
		# do not ignore a second signal
93
		if self.terminating:
94
			sys.exit(1)
95
		self.terminating = True
96
97
		self.cleanup()
98
99
	def read_on (self,reading):
100
		sleep_time = 1000
101
102
		poller = select.poll()
103
		for io in reading:
104
			poller.register(io, select.POLLIN | select.POLLPRI | select.POLLHUP | select.POLLNVAL | select.POLLERR)
105
106
		ready = []
107
		for io, event in poller.poll(sleep_time):
108
			if event & select.POLLIN or event & select.POLLPRI:
109
				ready.append(io)
110
			elif event & select.POLLHUP or event & select.POLLRDHUP or event & select.POLLERR or event & select.POLLNVAL:
111
				sys.exit(1)
112
		return ready
113
114
115
	def no_buffer (self,fd):
116
		mfl = fcntl.fcntl(fd, fcntl.F_GETFL)
117
		mfl |= os.O_SYNC
118
		fcntl.fcntl(fd, fcntl.F_SETFL, mfl)
119
120
	def loop (self):
121
		try:
122
			self.r_pipe = os.open(self.recv, os.O_RDWR | os.O_NONBLOCK | os.O_EXCL)
123
		except OSError:
124
			self.terminate()
125
126
		standard_in = sys.stdin.fileno()
127
		standard_out = sys.stdout.fileno()
128
129
		def monitor (function):
130
			def wrapper (*args):
131
				# print >> sys.stderr, "%s(%s)" % (function.func_name,','.join([str(_).replace('\n','\\n') for _ in args]))
132
				r = function(*args)
133
				# print >> sys.stderr, "%s -> %s" % (function.func_name,str(r))
134
				return r
135
			return wrapper
136
137
		@monitor
138
		def std_reader (number):
139
			try:
140
				return os.read(standard_in,number)
141
			except OSError as exc:
142
				if exc.errno in error.block:
143
					return ''
144
				sys.exit(1)
145
146
		@monitor
147
		def std_writer (line):
148
			try:
149
				return os.write(standard_out,line)
150
			except OSError as exc:
151
				if exc.errno in error.block:
152
					return 0
153
				sys.exit(1)
154
155
		@monitor
156
		def fifo_reader (number):
157
			try:
158
				return os.read(self.r_pipe,number)
159
			except OSError as exc:
160
				if exc.errno in error.block:
161
					return ''
162
				sys.exit(1)
163
164
		@monitor
165
		def fifo_writer (line):
166
			pipe,nb = None,0
167
			try:
168
				pipe = os.open(self.send, os.O_WRONLY | os.O_NONBLOCK | os.O_EXCL)
169
				self.no_buffer(pipe)
170
			except OSError:
171
				time.sleep(0.05)
172
				return 0
173
			if pipe is not None:
174
				try:
175
					nb = os.write(pipe,line)
176
				except OSError:
177
					pass
178
				try:
179
					os.close(pipe)
180
				except OSError:
181
					pass
182
			return nb
183
184
		read = {
185
			standard_in: std_reader,
186
			self.r_pipe: fifo_reader,
187
		}
188
189
		write = {
190
			standard_in: fifo_writer,
191
			self.r_pipe: std_writer,
192
		}
193
194
		backlog = {
195
			standard_in: deque(),
196
			self.r_pipe: deque(),
197
		}
198
199
		store = {
200
			standard_in: b'',
201
			self.r_pipe: b'',
202
		}
203
204
		def consume (source):
205
			if not backlog[source] and b'\n' not in store[source]:
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable store does not seem to be defined.
Loading history...
206
				store[source] += read[source](1024)
207
			else:
208
				backlog[source].append(read[source](1024))
209
				# assuming a route takes 80 chars, 100 Mb is over 1Millions routes
210
				# something is really wrong if it was not consummed
211
				if len(backlog) > 100*mb:
212
					sys.stderr.write('using too much memory - exiting')
213
					sys.exit(1)
214
215
		reading = [standard_in, self.r_pipe]
216
217
		while True:
218
			ready = self.read_on(reading)
219
220
			# command from user
221
			if self.r_pipe in ready:
222
				consume(self.r_pipe)
223
			if standard_in in ready:
224
				consume(standard_in)
225
226
			for source in reading:
227
				while b'\n' in store[source]:
228
					line,_ = store[source].split(b'\n',1)
229
					# sys.stderr.write(str(line).replace('\n','\\n') + '\n')
230
					# sys.stderr.flush()
231
					sent = write[source](line + b'\n')
232
					# sys.stderr.write('sent %d\n' % sent)
233
					# sys.stderr.flush()
234
					if sent:
235
						store[source] = store[source][sent:]
236
						continue
237
					break
238
				if backlog[source]:
239
					store[source] += backlog[source].popleft()
240
241
	def run (self):
242
		if not self.init():
243
			sys.exit(1)
244
		try:
245
			self.loop()
246
		except KeyboardInterrupt:
247
			self.cleanup()
248
			sys.exit(0)
249
		except Exception as exc:
250
			sys.stderr.write(str(exc))
251
			sys.stderr.write('\n\n')
252
			sys.stderr.flush()
253
			traceback.print_exc(file=sys.stderr)
254
			sys.stderr.flush()
255
			self.cleanup()
256
			sys.exit(1)
257
258
259
def main (location=''):
260
	if not location:
261
		location = os.environ.get('exabgp_cli_pipe','')
262
	if not location:
263
		sys.stderr.write("usage %s %s\n" % (sys.executable,' '.join(sys.argv)))
264
		sys.stderr.write("run with 'env exabgp_cli_pipe=<location>' if you are trying to mess with ExaBGP's intenals")
265
		sys.stderr.flush()
266
		sys.exit(1)
267
	Control(location).run()
268
269
270
if __name__ == '__main__':
271
	main()
272