Completed
Push — master ( f8c35a...0dc7c6 )
by Thomas
14:34
created

Processes._update_fds()   A

Complexity

Conditions 1

Size

Total Lines 2
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 2
nop 1
dl 0
loc 2
rs 10
c 0
b 0
f 0
1
"""
2
process.py
3
4
Created by Thomas Mangin on 2011-05-02.
5
Copyright (c) 2009-2017 Exa Networks. All rights reserved.
6
License: 3-clause BSD. (See the COPYRIGHT file)
7
"""
8
9
import os
10
import errno
11
import time
12
import subprocess
13
import select
14
import fcntl
15
16
from exabgp.util import str_ascii
17
from exabgp.util import bytes_ascii
18
from exabgp.util.errstr import errstr
19
from exabgp.reactor.network.error import error
20
21
from exabgp.configuration.core.format import formated
22
from exabgp.reactor.api.response import Response
23
from exabgp.reactor.api.response.answer import Answer
24
25
from exabgp.bgp.message import Message
26
from exabgp.logger import Logger
27
28
from exabgp.version import json as json_version
29
from exabgp.version import text as text_version
30
31
from exabgp.configuration.environment import environment
32
from threading import Thread
33
34
35
# pylint: disable=no-self-argument,not-callable,unused-argument,invalid-name
36
37
class ProcessError (Exception):
38
	pass
39
40
41
def preexec_helper ():
42
	# make this process a new process group
43
	# os.setsid()
44
	# This prevent the signal to be sent to the children (and create a new process group)
45
	os.setpgrp()
46
	# signal.signal(signal.SIGINT, signal.SIG_IGN)
47
48
49
class Processes (object):
50
	# how many time can a process can respawn in the time interval
51
	respawn_timemask = 0xFFFFFF - 0b111111
52
	# '0b111111111111111111000000' (around a minute, 63 seconds)
53
54
	_dispatch = {}
55
56
	def __init__ (self):
57
		self.logger = Logger()
58
		self.clean()
59
		self.silence = False
60
		self._buffer = {}
61
		self._configuration = {}
62
63
		self.respawn_number = 5 if environment.settings().api.respawn else 0
64
		self.terminate_on_error = environment.settings().api.terminate
65
		self.ack = environment.settings().api.ack
66
67
	def number (self):
68
		return len(self._process)
69
70
	def clean (self):
71
		self.fds = []
72
		self._process = {}
73
		self._encoder = {}
74
		self._broken = []
75
		self._respawning = {}
76
77
	def _handle_problem (self, process):
78
		if process not in self._process:
79
			return
80
		if self.respawn_number:
81
			self.logger.debug('issue with the process, restarting it','process')
82
			self._terminate(process)
83
			self._start(process)
84
		else:
85
			self.logger.debug('issue with the process, terminating it','process')
86
			self._terminate(process)
87
88
	def _terminate (self, process_name):
89
		self.logger.debug('terminating process %s' % process_name, 'process')
90
		process = self._process[process_name]
91
		del self._process[process_name]
92
		self._update_fds()
93
		thread = Thread(target=self._terminate_run, args=(process,))
94
		thread.start()
95
		return thread
96
97
	def _terminate_run (self, process):
98
		try:
99
			process.terminate()
100
			process.wait()
101
		except (OSError, KeyError):
102
			# the process is most likely already dead
103
			pass
104
105
	def terminate (self):
106
		for process in list(self._process):
107
			if not self.silence:
108
				try:
109
					self.write(process,self._encoder[process].shutdown())
110
				except ProcessError:
111
					pass
112
		self.silence = True
113
		# waiting a little to make sure IO is flushed to the pipes
114
		# we are using unbuffered IO but still ..
115
		time.sleep(0.1)
116
		for process in list(self._process):
117
			try:
118
				t = self._terminate(process)
119
				t.join()
120
			except OSError:
121
				# we most likely received a SIGTERM signal and our child is already dead
122
				self.logger.debug('child process %s was already dead' % process,'process')
123
		self.clean()
124
125
	def _start (self,process):
126
		try:
127
			if process in self._process:
128
				self.logger.debug('process already running','process')
129
				return
130
			if process not in self._configuration:
131
				self.logger.debug('can not start process, no configuration for it','process')
132
				return
133
			# Prevent some weird termcap data to be created at the start of the PIPE
134
			# \x1b[?1034h (no-eol) (esc)
135
			os.environ['TERM'] = 'dumb'
136
137
			configuration = self._configuration[process]
138
139
			run = configuration.get('run','')
140
			if run:
141
				api = configuration.get('encoder','')
142
				self._encoder[process] = Response.Text(text_version) if api == 'text' else Response.JSON(json_version)
143
144
				self._process[process] = subprocess.Popen(
145
					run,
146
					stdin=subprocess.PIPE,
147
					stdout=subprocess.PIPE,
148
					preexec_fn=preexec_helper
149
					# This flags exists for python 2.7.3 in the documentation but on on my MAC
150
					# creationflags=subprocess.CREATE_NEW_PROCESS_GROUP
151
				)
152
				self._update_fds()
153
				fcntl.fcntl(self._process[process].stdout.fileno(), fcntl.F_SETFL, os.O_NONBLOCK)
154
155
				self.logger.debug('forked process %s' % process,'process')
156
157
				around_now = int(time.time()) & self.respawn_timemask
158
				if process in self._respawning:
159
					if around_now in self._respawning[process]:
160
						self._respawning[process][around_now] += 1
161
						# we are respawning too fast
162
						if self._respawning[process][around_now] > self.respawn_number:
163
							self.logger.critical(
164
								'Too many death for %s (%d) terminating program' % (process,self.respawn_number),
165
								'process'
166
							)
167
							raise ProcessError()
168
					else:
169
						# reset long time since last respawn
170
						self._respawning[process] = {around_now: 1}
171
				else:
172
					# record respawing
173
					self._respawning[process] = {around_now: 1}
174
175
		except (subprocess.CalledProcessError,OSError,ValueError) as exc:
176
			self._broken.append(process)
177
			self.logger.debug('could not start process %s' % process,'process')
178
			self.logger.debug('reason: %s' % str(exc),'process')
179
180
	def start (self, configuration, restart=False):
181
		for process in list(self._process):
182
			if process not in configuration:
183
				self._terminate(process)
184
		self._configuration = configuration
185
		for process in configuration:
186
			if restart and process in list(self._process):
187
				self._terminate(process)
188
			self._start(process)
189
190
	def broken (self, neighbor):
191
		if self._broken:
192
			for process in self._configuration:
193
				if process in self._broken:
194
					return True
195
		return False
196
197
	def _update_fds (self):
198
		self.fds = [self._process[process].stdout.fileno() for process in self._process]
199
200
	def received (self):
201
		consumed_data = False
202
203
		for process in list(self._process):
204
			try:
205
				proc = self._process[process]
206
				poll = proc.poll()
207
				# proc.poll returns None if the process is still fine
208
				# -[signal], like -15, if the process was terminated
209
				if poll is not None:
210
					self._handle_problem(process)
211
					return
212
				r,_,_ = select.select([proc.stdout,],[],[],0)
213
				if not r:
214
					continue
215
				try:
216
					# Calling next() on Linux and OSX works perfectly well
217
					# but not on OpenBSD where it always raise StopIteration
218
					# and only readline() works
219
					buf = str_ascii(proc.stdout.read(16384))
220
					if buf == '' and poll is not None:
221
						# if proc.poll() is None then
222
						# process is fine, we received an empty line because
223
						# we're doing .readline() on a non-blocking pipe and
224
						# the process maybe has nothing to send yet
225
						self._handle_problem(process)
226
						continue
227
228
					raw = self._buffer.get(process,'') + buf
229
230
					while '\n' in raw:
231
						line,raw = raw.split('\n',1)
232
						line = line.rstrip()
233
						consumed_data = True
234
						self.logger.debug('command from process %s : %s ' % (process,line),'process')
235
						yield (process,formated(line))
236
237
					self._buffer[process] = raw
238
239
				except IOError as exc:
240
					if not exc.errno or exc.errno in error.fatal:
241
						# if the program exits we can get an IOError with errno code zero !
242
						self._handle_problem(process)
243
					elif exc.errno in error.block:
244
						# we often see errno.EINTR: call interrupted and
245
						# we most likely have data, we will try to read them a the next loop iteration
246
						pass
247
					else:
248
						self.logger.debug('unexpected errno received from forked process (%s)' % errstr(exc),'process')
249
				except StopIteration:
250
					if not consumed_data:
251
						self._handle_problem(process)
252
			except KeyError:
253
				pass
254
			except (subprocess.CalledProcessError,OSError,ValueError):
255
				self._handle_problem(process)
256
257
	def write (self, process, string, neighbor=None):
258
		if string is None:
259
			return True
260
261
		# XXX: FIXME: This is potentially blocking
262
		while True:
263
			try:
264
				self._process[process].stdin.write(bytes_ascii('%s\n' % string))
265
			except IOError as exc:
266
				self._broken.append(process)
267
				if exc.errno == errno.EPIPE:
268
					self._broken.append(process)
269
					self.logger.debug('issue while sending data to our helper program','process')
270
					raise ProcessError()
271
				else:
272
					# Could it have been caused by a signal ? What to do.
273
					self.logger.debug('error received while sending data to helper program, retrying (%s)' % errstr(exc),'process')
274
					continue
275
			break
276
277
		try:
278
			self._process[process].stdin.flush()
279
		except IOError as exc:
280
			# AFAIK, the buffer should be flushed at the next attempt.
281
			self.logger.debug('error received while FLUSHING data to helper program, retrying (%s)' % errstr(exc),'process')
282
283
		return True
284
285
	def _answer (self, service, string, force=False):
286
		if force or self.ack:
287
			self.logger.debug('responding to %s : %s' % (service,string.replace('\n', '\\n')), 'process')
288
			self.write(service,string)
289
290
	def answer_done (self, service):
291
		self._answer(service,Answer.done)
292
293
	def answer_error (self, service):
294
		self._answer(service, Answer.error)
295
296
	def _notify (self, neighbor, event):
297
		for process in neighbor.api[event]:
298
			yield process
299
300
	# do not do anything if silenced
301
	# no-self-argument
302
303
	def silenced (function):
304
		def closure (self, *args):
305
			if self.silence:
306
				return
307
			return function(self,*args)
308
		return closure
309
310
	# invalid-name
311
	@silenced
312
	def up (self, neighbor):
313
		for process in self._notify(neighbor,'neighbor-changes'):
314
			self.write(process,self._encoder[process].up(neighbor),neighbor)
315
316
	@silenced
317
	def connected (self, neighbor):
318
		for process in self._notify(neighbor,'neighbor-changes'):
319
			self.write(process,self._encoder[process].connected(neighbor),neighbor)
320
321
	@silenced
322
	def down (self, neighbor, reason):
323
		for process in self._notify(neighbor,'neighbor-changes'):
324
			self.write(process,self._encoder[process].down(neighbor,reason),neighbor)
325
326
	@silenced
327
	def negotiated (self, neighbor, negotiated):
328
		for process in self._notify(neighbor,'negotiated'):
329
			self.write(process,self._encoder[process].negotiated(neighbor,negotiated),neighbor)
330
331
	@silenced
332
	def fsm (self, neighbor, fsm):
333
		for process in self._notify(neighbor,'fsm'):
334
			self.write(process,self._encoder[process].fsm(neighbor,fsm),neighbor)
335
336
	@silenced
337
	def signal (self, neighbor, signal):
338
		for process in self._notify(neighbor,'signal'):
339
			self.write(process,self._encoder[process].signal(neighbor,signal),neighbor)
340
341
	@silenced
342
	def packets (self, neighbor, direction, category, header, body):
343
		for process in self._notify(neighbor,'%s-packets' % direction):
344
			self.write(process,self._encoder[process].packets(neighbor,direction,category,header,body),neighbor)
345
346
	@silenced
347
	def notification (self, neighbor, direction, code, subcode, data, header, body):
348
		for process in self._notify(neighbor,'neighbor-changes'):
349
			self.write(process,self._encoder[process].notification(neighbor,direction,code,subcode,data,header,body),neighbor)
350
351
	@silenced
352
	def message (self, message_id, neighbor, direction, message, negotiated, header, *body):
353
		self._dispatch[message_id](self,neighbor,direction,message,negotiated,header,*body)
354
355
	# registering message functions
356
	# no-self-argument
357
358
	def register_process (message_id, storage=_dispatch):
359
		def closure (function):
360
			def wrap (*args):
361
				function(*args)
362
			storage[message_id] = wrap
363
			return wrap
364
		return closure
365
366
	# notifications are handled in the loop as they use different arguments
367
368
	@register_process(Message.CODE.OPEN)
369
	def _open (self, peer, direction, message, negotiated, header, body):
370
		for process in self._notify(peer,'%s-%s' % (direction,Message.CODE.OPEN.SHORT)):
371
			self.write(process,self._encoder[process].open(peer,direction,message,negotiated,header,body),peer)
372
373
	@register_process(Message.CODE.UPDATE)
374
	def _update (self, peer, direction, update, negotiated, header, body):
375
		for process in self._notify(peer,'%s-%s' % (direction,Message.CODE.UPDATE.SHORT)):
376
			self.write(process,self._encoder[process].update(peer,direction,update,negotiated,header,body),peer)
377
378
	@register_process(Message.CODE.NOTIFICATION)
379
	def _notification (self, peer, direction, message, negotiated, header, body):
380
		for process in self._notify(peer,'%s-%s' % (direction,Message.CODE.NOTIFICATION.SHORT)):
381
			self.write(process,self._encoder[process].notification(peer,direction,message,negotiated,header,body),peer)
382
383
	# unused-argument, must keep the API
384
	@register_process(Message.CODE.KEEPALIVE)
385
	def _keepalive (self, peer, direction, keepalive, negotiated, header, body):
386
		for process in self._notify(peer,'%s-%s' % (direction,Message.CODE.KEEPALIVE.SHORT)):
387
			self.write(process,self._encoder[process].keepalive(peer,direction,negotiated,header,body),peer)
388
389
	@register_process(Message.CODE.ROUTE_REFRESH)
390
	def _refresh (self, peer, direction, refresh, negotiated, header, body):
391
		for process in self._notify(peer,'%s-%s' % (direction,Message.CODE.ROUTE_REFRESH.SHORT)):
392
			self.write(process,self._encoder[process].refresh(peer,direction,refresh,negotiated,header,body),peer)
393
394
	@register_process(Message.CODE.OPERATIONAL)
395
	def _operational (self, peer, direction, operational, negotiated, header, body):
396
		for process in self._notify(peer,'%s-%s' % (direction,Message.CODE.OPERATIONAL.SHORT)):
397
			self.write(process,self._encoder[process].operational(peer,direction,operational.category,operational,negotiated,header,body),peer)
398