Completed
Push — master ( 0dc7c6...640170 )
by Thomas
15:16
created

exabgp.application.control.Control.init()   A

Complexity

Conditions 3

Size

Total Lines 14
Code Lines 10

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 3
eloc 10
nop 1
dl 0
loc 14
rs 9.9
c 0
b 0
f 0
1
"""
2
control.py
3
4
Created by Thomas Mangin on 2015-01-13.
5
Copyright (c) 2015-2017 Exa Networks. All rights reserved.
6
License: 3-clause BSD. (See the COPYRIGHT file)
7
"""
8
9
import os
10
import sys
11
import fcntl
12
import stat
13
import time
14
import signal
15
import select
16
import socket
17
import traceback
18
from collections import deque
19
20
from exabgp.util import str_ascii
21
from exabgp.reactor.network.error import error
22
23
kb = 1024
24
mb = kb*1024
25
26
27
def env (app, section, name, default):
28
	r = os.environ.get('%s.%s.%s' % (app,section,name), None)
29
	if r is None:
30
		r = os.environ.get('%s_%s_%s' % (app,section,name), None)
31
	if r is None:
32
		return default
33
	return r
34
35
36
def check_fifo (name):
37
	try:
38
		if not stat.S_ISFIFO(os.stat(name).st_mode):
39
			sys.stdout.write('error: a file exist which is not a named pipe (%s)\n' % os.path.abspath(name))
40
			return False
41
42
		if not os.access(name,os.R_OK):
43
			sys.stdout.write('error: a named pipe exists and we can not read/write to it (%s)\n' % os.path.abspath(name))
44
			return False
45
		return True
46
	except OSError:
47
		sys.stdout.write('error: could not create the named pipe %s\n' % os.path.abspath(name))
48
		return False
49
	except IOError:
50
		sys.stdout.write('error: could not access/delete the named pipe %s\n' % os.path.abspath(name))
51
		sys.stdout.flush()
52
	except socket.error:
53
		sys.stdout.write('error: could not write on the named pipe %s\n' % os.path.abspath(name))
54
		sys.stdout.flush()
55
56
57
class Control (object):
58
	terminating = False
59
60
	def __init__ (self, location):
61
		self.send = location + env('exabgp','api','pipename','exabgp') + '.out'
62
		self.recv = location + env('exabgp','api','pipename','exabgp') + '.in'
63
		self.r_pipe = None
64
65
	def init (self):
66
		# obviously this is vulnerable to race conditions ... if an attacker can create fifo in the folder
67
68
		if not check_fifo(self.recv):
69
			self.terminate()
70
			sys.exit(1)
71
72
		if not check_fifo(self.send):
73
			self.terminate()
74
			sys.exit(1)
75
76
		signal.signal(signal.SIGINT, self.terminate)
77
		signal.signal(signal.SIGTERM, self.terminate)
78
		return True
79
80
	def cleanup (self):
81
		def _close (pipe):
82
			if self.r_pipe:
83
				try:
84
					os.close(pipe)
85
				except (OSError,IOError,TypeError):
86
					pass
87
88
		_close(self.r_pipe)
89
90
	def terminate (self,ignore=None,me=None):
91
		# if the named pipe is open, and remove_fifo called
92
		# do not ignore a second signal
93
		if self.terminating:
94
			sys.exit(1)
95
		self.terminating = True
96
97
		self.cleanup()
98
99
	def read_on (self,reading):
100
		sleep_time = 1000
101
102
		poller = select.poll()
103
		poller.register(reading, select.POLLIN | select.POLLPRI | select.POLLHUP | select.POLLNVAL | select.POLLERR)
104
105
		ready = False
106
		for _, event in poller.poll(sleep_time):
107
			if event & select.POLLIN or event & select.POLLPRI:
108
				ready = True
109
			elif event & select.POLLHUP:
110
				raise KeyboardInterrupt()
111
			elif event & select.POLLERR or event & select.POLLNVAL:
112
				sys.exit(1)
113
		return ready
114
115
116
	def no_buffer (self,fd):
117
		mfl = fcntl.fcntl(fd, fcntl.F_GETFL)
118
		mfl |= os.O_SYNC
119
		fcntl.fcntl(fd, fcntl.F_SETFL, mfl)
120
121
	def loop (self):
122
		try:
123
			self.r_pipe = os.open(self.recv, os.O_RDWR | os.O_NONBLOCK | os.O_EXCL)
124
		except OSError:
125
			self.terminate()
126
127
		standard_in = sys.stdin.fileno()
128
		standard_out = sys.stdout.fileno()
129
130
		def monitor (function):
131
			def wrapper (*args):
132
				# print >> sys.stderr, "%s(%s)" % (function.func_name,','.join([str(_).replace('\n','\\n') for _ in args]))
133
				r = function(*args)
134
				# print >> sys.stderr, "%s -> %s" % (function.func_name,str(r))
135
				return r
136
			return wrapper
137
138
		@monitor
139
		def std_reader (number):
140
			try:
141
				return os.read(standard_in,number)
142
			except OSError as exc:
143
				if exc.errno in error.block:
144
					return ''
145
				sys.exit(1)
146
147
		@monitor
148
		def std_writer (line):
149
			try:
150
				return os.write(standard_out,line)
151
			except OSError as exc:
152
				if exc.errno in error.block:
153
					return 0
154
				sys.exit(1)
155
156
		@monitor
157
		def fifo_reader (number):
158
			try:
159
				return os.read(self.r_pipe,number)
160
			except OSError as exc:
161
				if exc.errno in error.block:
162
					return ''
163
				sys.exit(1)
164
165
		@monitor
166
		def fifo_writer (line):
167
			pipe,nb = None,0
168
			try:
169
				pipe = os.open(self.send, os.O_WRONLY | os.O_NONBLOCK | os.O_EXCL)
170
				self.no_buffer(pipe)
171
			except OSError:
172
				time.sleep(0.05)
173
				return 0
174
			if pipe is not None:
175
				try:
176
					nb = os.write(pipe,line)
177
				except OSError:
178
					pass
179
				try:
180
					os.close(pipe)
181
				except OSError:
182
					pass
183
			return nb
184
185
		read = {
186
			standard_in: std_reader,
187
			self.r_pipe: fifo_reader,
188
		}
189
190
		write = {
191
			standard_in: fifo_writer,
192
			self.r_pipe: std_writer,
193
		}
194
195
		backlog = {
196
			standard_in: deque(),
197
			self.r_pipe: deque(),
198
		}
199
200
		store = {
201
			standard_in: b'',
202
			self.r_pipe: b'',
203
		}
204
205
		def consume (source):
206
			if not backlog[source] and b'\n' not in store[source]:
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable store does not seem to be defined.
Loading history...
207
				store[source] += read[source](1024)
208
			else:
209
				backlog[source].append(read[source](1024))
210
				# assuming a route takes 80 chars, 100 Mb is over 1Millions routes
211
				# something is really wrong if it was not consummed
212
				if len(backlog) > 100*mb:
213
					sys.stderr.write('using too much memory - exiting')
214
					sys.exit(1)
215
216
		reading = [standard_in, self.r_pipe]
217
218
		while True:
219
			ready = self.read_on(reading)
220
221
			# command from user
222
			if self.r_pipe in ready:
223
				consume(self.r_pipe)
224
			if standard_in in ready:
225
				consume(standard_in)
226
227
			for source in reading:
228
				while b'\n' in store[source]:
229
					line,_ = store[source].split(b'\n',1)
230
					# sys.stderr.write(str(line).replace('\n','\\n') + '\n')
231
					# sys.stderr.flush()
232
					sent = write[source](line + b'\n')
233
					# sys.stderr.write('sent %d\n' % sent)
234
					# sys.stderr.flush()
235
					if sent:
236
						store[source] = store[source][sent:]
237
						continue
238
					break
239
				if backlog[source]:
240
					store[source] += backlog[source].popleft()
241
242
	def run (self):
243
		if not self.init():
244
			sys.exit(1)
245
		try:
246
			self.loop()
247
		except KeyboardInterrupt:
248
			self.cleanup()
249
			sys.exit(0)
250
		except Exception as exc:
251
			sys.stderr.write(str(exc))
252
			sys.stderr.write('\n\n')
253
			sys.stderr.flush()
254
			traceback.print_exc(file=sys.stderr)
255
			sys.stderr.flush()
256
			self.cleanup()
257
			sys.exit(1)
258
259
260
def main (location=''):
261
	if not location:
262
		location = os.environ.get('exabgp_cli_pipe','')
263
	if not location:
264
		sys.stderr.write("usage %s %s\n" % (sys.executable,' '.join(sys.argv)))
265
		sys.stderr.write("run with 'env exabgp_cli_pipe=<location>' if you are trying to mess with ExaBGP's intenals")
266
		sys.stderr.flush()
267
		sys.exit(1)
268
	Control(location).run()
269
270
271
if __name__ == '__main__':
272
	main()
273