Passed
Push — master ( 816300...01ef69 )
by Thomas
48s queued 13s
created

Processes._terminate_run()   A

Complexity

Conditions 2

Size

Total Lines 8
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 2
eloc 7
nop 2
dl 0
loc 8
rs 10
c 0
b 0
f 0
1
"""
2
process.py
3
4
Created by Thomas Mangin on 2011-05-02.
5
Copyright (c) 2009-2017 Exa Networks. All rights reserved.
6
License: 3-clause BSD. (See the COPYRIGHT file)
7
"""
8
9
import os
10
import errno
11
import time
12
import subprocess
13
import select
14
import fcntl
15
16
from exabgp.util import str_ascii
17
from exabgp.util import bytes_ascii
18
from exabgp.util.errstr import errstr
19
from exabgp.reactor.network.error import error
20
21
from exabgp.configuration.core.format import formated
22
from exabgp.reactor.api.response import Response
23
from exabgp.reactor.api.response.answer import Answer
24
25
from exabgp.bgp.message import Message
26
from exabgp.logger import Logger
27
28
from exabgp.version import json as json_version
29
from exabgp.version import text as text_version
30
31
from exabgp.configuration.environment import environment
32
from threading import Thread
33
34
35
# pylint: disable=no-self-argument,not-callable,unused-argument,invalid-name
36
37
class ProcessError (Exception):
38
	pass
39
40
41
def preexec_helper ():
42
	# make this process a new process group
43
	# os.setsid()
44
	# This prevent the signal to be sent to the children (and create a new process group)
45
	os.setpgrp()
46
	# signal.signal(signal.SIGINT, signal.SIG_IGN)
47
48
49
class Processes (object):
50
	# how many time can a process can respawn in the time interval
51
	respawn_timemask = 0xFFFFFF - 0b111111
52
	# '0b111111111111111111000000' (around a minute, 63 seconds)
53
54
	_dispatch = {}
55
56
	def __init__ (self):
57
		self.logger = Logger()
58
		self.clean()
59
		self.silence = False
60
		self._buffer = {}
61
		self._configuration = {}
62
63
		self.respawn_number = 5 if environment.settings().api.respawn else 0
64
		self.terminate_on_error = environment.settings().api.terminate
65
		self.ack = environment.settings().api.ack
66
67
	def number (self):
68
		return len(self._process)
69
70
	def clean (self):
71
		self._process = {}
72
		self._encoder = {}
73
		self._broken = []
74
		self._respawning = {}
75
76
	def _handle_problem (self, process):
77
		if process not in self._process:
78
			return
79
		if self.respawn_number:
80
			self.logger.debug('issue with the process, restarting it','process')
81
			self._terminate(process)
82
			self._start(process)
83
		else:
84
			self.logger.debug('issue with the process, terminating it','process')
85
			self._terminate(process)
86
87
	def _terminate (self, process):
88
		self.logger.debug('terminating process %s' % process,'process')
89
		thread = Thread(target=self._terminate_run, args = (process,))
90
		thread.start()
91
		return thread
92
93
	def _terminate_run (self, process):
94
		try:
95
			self._process[process].terminate()
96
			self._process[process].wait()
97
			del self._process[process]
98
		except (OSError, KeyError):
99
			# the process is most likely already dead
100
			pass
101
102
	def terminate (self):
103
		for process in list(self._process):
104
			if not self.silence:
105
				try:
106
					self.write(process,self._encoder[process].shutdown())
107
				except ProcessError:
108
					pass
109
		self.silence = True
110
		# waiting a little to make sure IO is flushed to the pipes
111
		# we are using unbuffered IO but still ..
112
		time.sleep(0.1)
113
		for process in list(self._process):
114
			try:
115
				t = self._terminate(process)
116
				t.join()
117
			except OSError:
118
				# we most likely received a SIGTERM signal and our child is already dead
119
				self.logger.debug('child process %s was already dead' % process,'process')
120
		self.clean()
121
122
	def _start (self,process):
123
		try:
124
			if process in self._process:
125
				self.logger.debug('process already running','process')
126
				return
127
			if process not in self._configuration:
128
				self.logger.debug('can not start process, no configuration for it','process')
129
				return
130
			# Prevent some weird termcap data to be created at the start of the PIPE
131
			# \x1b[?1034h (no-eol) (esc)
132
			os.environ['TERM'] = 'dumb'
133
134
			configuration = self._configuration[process]
135
136
			run = configuration.get('run','')
137
			if run:
138
				api = configuration.get('encoder','')
139
				self._encoder[process] = Response.Text(text_version) if api == 'text' else Response.JSON(json_version)
140
141
				self._process[process] = subprocess.Popen(
142
					run,
143
					stdin=subprocess.PIPE,
144
					stdout=subprocess.PIPE,
145
					preexec_fn=preexec_helper
146
					# This flags exists for python 2.7.3 in the documentation but on on my MAC
147
					# creationflags=subprocess.CREATE_NEW_PROCESS_GROUP
148
				)
149
				fcntl.fcntl(self._process[process].stdout.fileno(), fcntl.F_SETFL, os.O_NONBLOCK)
150
151
				self.logger.debug('forked process %s' % process,'process')
152
153
				around_now = int(time.time()) & self.respawn_timemask
154
				if process in self._respawning:
155
					if around_now in self._respawning[process]:
156
						self._respawning[process][around_now] += 1
157
						# we are respawning too fast
158
						if self._respawning[process][around_now] > self.respawn_number:
159
							self.logger.critical(
160
								'Too many death for %s (%d) terminating program' % (process,self.respawn_number),
161
								'process'
162
							)
163
							raise ProcessError()
164
					else:
165
						# reset long time since last respawn
166
						self._respawning[process] = {around_now: 1}
167
				else:
168
					# record respawing
169
					self._respawning[process] = {around_now: 1}
170
171
		except (subprocess.CalledProcessError,OSError,ValueError) as exc:
172
			self._broken.append(process)
173
			self.logger.debug('could not start process %s' % process,'process')
174
			self.logger.debug('reason: %s' % str(exc),'process')
175
176
	def start (self, configuration, restart=False):
177
		for process in list(self._process):
178
			if process not in configuration:
179
				self._terminate(process)
180
		self._configuration = configuration
181
		for process in configuration:
182
			if restart and process in list(self._process):
183
				self._terminate(process)
184
			self._start(process)
185
186
	def broken (self, neighbor):
187
		if self._broken:
188
			for process in self._configuration:
189
				if process in self._broken:
190
					return True
191
		return False
192
193
	def fds (self):
194
		return [self._process[process].stdout for process in self._process]
195
196
	def received (self):
197
		consumed_data = False
198
199
		for process in list(self._process):
200
			try:
201
				proc = self._process[process]
202
				poll = proc.poll()
203
				# proc.poll returns None if the process is still fine
204
				# -[signal], like -15, if the process was terminated
205
				if poll is not None:
206
					self._handle_problem(process)
207
					return
208
				r,_,_ = select.select([proc.stdout,],[],[],0)
209
				if not r:
210
					continue
211
				try:
212
					# Calling next() on Linux and OSX works perfectly well
213
					# but not on OpenBSD where it always raise StopIteration
214
					# and only readline() works
215
					buf = str_ascii(proc.stdout.read(16384))
216
					if buf == '' and poll is not None:
217
						# if proc.poll() is None then
218
						# process is fine, we received an empty line because
219
						# we're doing .readline() on a non-blocking pipe and
220
						# the process maybe has nothing to send yet
221
						self._handle_problem(process)
222
						continue
223
224
					raw = self._buffer.get(process,'') + buf
225
226
					while '\n' in raw:
227
						line,raw = raw.split('\n',1)
228
						line = line.rstrip()
229
						consumed_data = True
230
						self.logger.debug('command from process %s : %s ' % (process,line),'process')
231
						yield (process,formated(line))
232
233
					self._buffer[process] = raw
234
235
				except IOError as exc:
236
					if not exc.errno or exc.errno in error.fatal:
237
						# if the program exits we can get an IOError with errno code zero !
238
						self._handle_problem(process)
239
					elif exc.errno in error.block:
240
						# we often see errno.EINTR: call interrupted and
241
						# we most likely have data, we will try to read them a the next loop iteration
242
						pass
243
					else:
244
						self.logger.debug('unexpected errno received from forked process (%s)' % errstr(exc),'process')
245
				except StopIteration:
246
					if not consumed_data:
247
						self._handle_problem(process)
248
			except KeyError:
249
				pass
250
			except (subprocess.CalledProcessError,OSError,ValueError):
251
				self._handle_problem(process)
252
253
	def write (self, process, string, neighbor=None):
254
		if string is None:
255
			return True
256
257
		# XXX: FIXME: This is potentially blocking
258
		while True:
259
			try:
260
				self._process[process].stdin.write(bytes_ascii('%s\n' % string))
261
			except IOError as exc:
262
				self._broken.append(process)
263
				if exc.errno == errno.EPIPE:
264
					self._broken.append(process)
265
					self.logger.debug('issue while sending data to our helper program','process')
266
					raise ProcessError()
267
				else:
268
					# Could it have been caused by a signal ? What to do.
269
					self.logger.debug('error received while sending data to helper program, retrying (%s)' % errstr(exc),'process')
270
					continue
271
			break
272
273
		try:
274
			self._process[process].stdin.flush()
275
		except IOError as exc:
276
			# AFAIK, the buffer should be flushed at the next attempt.
277
			self.logger.debug('error received while FLUSHING data to helper program, retrying (%s)' % errstr(exc),'process')
278
279
		return True
280
281
	def _answer (self, service, string, force=False):
282
		if force or self.ack:
283
			self.logger.debug('responding to %s : %s' % (service,string.replace('\n', '\\n')), 'process')
284
			self.write(service,string)
285
286
	def answer_done (self, service):
287
		self._answer(service,Answer.done)
288
289
	def answer_error (self, service):
290
		self._answer(service, Answer.error)
291
292
	def _notify (self, neighbor, event):
293
		for process in neighbor.api[event]:
294
			yield process
295
296
	# do not do anything if silenced
297
	# no-self-argument
298
299
	def silenced (function):
300
		def closure (self, *args):
301
			if self.silence:
302
				return
303
			return function(self,*args)
304
		return closure
305
306
	# invalid-name
307
	@silenced
308
	def up (self, neighbor):
309
		for process in self._notify(neighbor,'neighbor-changes'):
310
			self.write(process,self._encoder[process].up(neighbor),neighbor)
311
312
	@silenced
313
	def connected (self, neighbor):
314
		for process in self._notify(neighbor,'neighbor-changes'):
315
			self.write(process,self._encoder[process].connected(neighbor),neighbor)
316
317
	@silenced
318
	def down (self, neighbor, reason):
319
		for process in self._notify(neighbor,'neighbor-changes'):
320
			self.write(process,self._encoder[process].down(neighbor,reason),neighbor)
321
322
	@silenced
323
	def negotiated (self, neighbor, negotiated):
324
		for process in self._notify(neighbor,'negotiated'):
325
			self.write(process,self._encoder[process].negotiated(neighbor,negotiated),neighbor)
326
327
	@silenced
328
	def fsm (self, neighbor, fsm):
329
		for process in self._notify(neighbor,'fsm'):
330
			self.write(process,self._encoder[process].fsm(neighbor,fsm),neighbor)
331
332
	@silenced
333
	def signal (self, neighbor, signal):
334
		for process in self._notify(neighbor,'signal'):
335
			self.write(process,self._encoder[process].signal(neighbor,signal),neighbor)
336
337
	@silenced
338
	def packets (self, neighbor, direction, category, header, body):
339
		for process in self._notify(neighbor,'%s-packets' % direction):
340
			self.write(process,self._encoder[process].packets(neighbor,direction,category,header,body),neighbor)
341
342
	@silenced
343
	def notification (self, neighbor, direction, code, subcode, data, header, body):
344
		for process in self._notify(neighbor,'neighbor-changes'):
345
			self.write(process,self._encoder[process].notification(neighbor,direction,code,subcode,data,header,body),neighbor)
346
347
	@silenced
348
	def message (self, message_id, neighbor, direction, message, negotiated, header, *body):
349
		self._dispatch[message_id](self,neighbor,direction,message,negotiated,header,*body)
350
351
	# registering message functions
352
	# no-self-argument
353
354
	def register_process (message_id, storage=_dispatch):
355
		def closure (function):
356
			def wrap (*args):
357
				function(*args)
358
			storage[message_id] = wrap
359
			return wrap
360
		return closure
361
362
	# notifications are handled in the loop as they use different arguments
363
364
	@register_process(Message.CODE.OPEN)
365
	def _open (self, peer, direction, message, negotiated, header, body):
366
		for process in self._notify(peer,'%s-%s' % (direction,Message.CODE.OPEN.SHORT)):
367
			self.write(process,self._encoder[process].open(peer,direction,message,negotiated,header,body),peer)
368
369
	@register_process(Message.CODE.UPDATE)
370
	def _update (self, peer, direction, update, negotiated, header, body):
371
		for process in self._notify(peer,'%s-%s' % (direction,Message.CODE.UPDATE.SHORT)):
372
			self.write(process,self._encoder[process].update(peer,direction,update,negotiated,header,body),peer)
373
374
	@register_process(Message.CODE.NOTIFICATION)
375
	def _notification (self, peer, direction, message, negotiated, header, body):
376
		for process in self._notify(peer,'%s-%s' % (direction,Message.CODE.NOTIFICATION.SHORT)):
377
			self.write(process,self._encoder[process].notification(peer,direction,message,negotiated,header,body),peer)
378
379
	# unused-argument, must keep the API
380
	@register_process(Message.CODE.KEEPALIVE)
381
	def _keepalive (self, peer, direction, keepalive, negotiated, header, body):
382
		for process in self._notify(peer,'%s-%s' % (direction,Message.CODE.KEEPALIVE.SHORT)):
383
			self.write(process,self._encoder[process].keepalive(peer,direction,negotiated,header,body),peer)
384
385
	@register_process(Message.CODE.ROUTE_REFRESH)
386
	def _refresh (self, peer, direction, refresh, negotiated, header, body):
387
		for process in self._notify(peer,'%s-%s' % (direction,Message.CODE.ROUTE_REFRESH.SHORT)):
388
			self.write(process,self._encoder[process].refresh(peer,direction,refresh,negotiated,header,body),peer)
389
390
	@register_process(Message.CODE.OPERATIONAL)
391
	def _operational (self, peer, direction, operational, negotiated, header, body):
392
		for process in self._notify(peer,'%s-%s' % (direction,Message.CODE.OPERATIONAL.SHORT)):
393
			self.write(process,self._encoder[process].operational(peer,direction,operational.category,operational,negotiated,header,body),peer)
394