Completed
Push — master ( c90b7d...a42871 )
by Thomas
31:59 queued 17:03
created

exabgp.reactor.loop.Reactor.__init__()   A

Complexity

Conditions 1

Size

Total Lines 27
Code Lines 22

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 22
nop 2
dl 0
loc 27
rs 9.352
c 0
b 0
f 0
1
# encoding: utf-8
2
"""
3
reactor/loop.py
4
5
Created by Thomas Mangin on 2012-06-10.
6
Copyright (c) 2009-2017 Exa Networks. All rights reserved.
7
License: 3-clause BSD. (See the COPYRIGHT file)
8
"""
9
10
import time
11
import uuid
12
import select
13
import socket
14
15
from exabgp.util import character
16
from exabgp.util import concat_bytes_i
17
18
from exabgp.reactor.daemon import Daemon
19
from exabgp.reactor.listener import Listener
20
from exabgp.reactor.api.processes import Processes
21
from exabgp.reactor.api.processes import ProcessError
22
from exabgp.reactor.peer import Peer
23
from exabgp.reactor.peer import ACTION
24
from exabgp.reactor.asynchronous import ASYNC
25
from exabgp.reactor.interrupt import Signal
26
from exabgp.reactor.network.error import error
27
28
from exabgp.reactor.api import API
29
from exabgp.configuration.configuration import Configuration
30
from exabgp.configuration.environment import environment
31
32
from exabgp.bgp.fsm import FSM
33
34
from exabgp.version import version
35
from exabgp.logger import Logger
36
37
38
class Reactor (object):
39
	class Exit (object):
40
		normal = 0
41
		validate = 0
42
		listening = 1
43
		configuration = 1
44
		privileges = 1
45
		log = 1
46
		pid = 1
47
		socket = 1
48
		io_error = 1
49
		process = 1
50
		select = 1
51
		unknown = 1
52
53
	# [hex(ord(c)) for c in os.popen('clear').read()]
54
	clear = concat_bytes_i(character(int(c,16)) for c in ['0x1b', '0x5b', '0x48', '0x1b', '0x5b', '0x32', '0x4a'])
55
56
	def __init__ (self, configurations):
57
		self._ips = environment.settings().tcp.bind
58
		self._port = environment.settings().tcp.port
59
		self._stopping = environment.settings().tcp.once
60
		self.exit_code = self.Exit.unknown
61
62
		self.max_loop_time = environment.settings().reactor.speed
63
		self._sleep_time = self.max_loop_time / 100
64
		self._busyspin = {}
65
		self._ratelimit = {}
66
		self.early_drop = environment.settings().daemon.drop
67
68
		self.processes = None
69
70
		self.configuration = Configuration(configurations)
71
		self.logger = Logger()
72
		self.asynchronous = ASYNC()
73
		self.signal = Signal()
74
		self.daemon = Daemon(self)
75
		self.listener = Listener(self)
76
		self.api = API(self)
77
78
		self._peers = {}
79
80
		self._reload_processes = False
81
		self._saved_pid = False
82
		self._poller = select.poll()
83
84
	def _termination (self,reason, exit_code):
85
		self.exit_code = exit_code
86
		self.signal.received = Signal.SHUTDOWN
87
		self.logger.critical(reason,'reactor')
88
89
	def _prevent_spin(self):
90
		second = int(time.time())
91
		if not second in self._busyspin:
92
			self._busyspin = {second: 0}
93
		self._busyspin[second] += 1
94
		if self._busyspin[second] > self.max_loop_time:
95
			time.sleep(self._sleep_time)
96
			return True
97
		return False
98
99
	def _rate_limited(self,peer,rate):
100
		if rate <= 0:
101
			return False
102
		second = int(time.time())
103
		ratelimit = self._ratelimit.get(peer,{})
104
		if not second in ratelimit:
105
			self._ratelimit[peer] = {second: rate-1}
106
			return False
107
		if self._ratelimit[peer][second] > 0:
108
			self._ratelimit[peer][second] -= 1
109
			return False
110
		return True
111
112
	def _wait_for_io (self,sleeptime):
113
		try:
114
			for fd, event in self._poller.poll(sleeptime):
115
				if event & select.POLLIN or event & select.POLLPRI:
116
					yield fd
117
					continue
118
				elif event & select.POLLHUP:
119
					self._termination('^C received', self.Exit.normal)
120
					return
121
				elif event & select.POLLERR or event & select.POLLNVAL:
122
					self._prevent_spin()
123
					continue
124
		except KeyboardInterrupt:
125
			self._termination('^C received',self.Exit.normal)
126
			return
127
		except Exception:
128
			self._prevent_spin()
129
			return
130
131
	# peer related functions
132
133
	def active_peers (self):
134
		peers = set()
135
		for key,peer in self._peers.items():
136
			if not peer.neighbor.passive or peer.proto:
137
				peers.add(key)
138
		return peers
139
140
	def connected_peers (self):
141
		peers = set()
142
		for key, peer in self._peers.items():
143
			if peer.fsm == FSM.ESTABLISHED:
144
				peers.add(key)
145
		return peers
146
147
	def peers(self):
148
		return list(self._peers)
149
150
	def handle_connection (self, peer_name, connection):
151
		peer = self._peers.get(peer_name, None)
152
		if not peer:
153
			self.logger.critical('could not find referenced peer','reactor')
154
			return
155
		peer.handle_connection(connection)
156
157
	def neighbor (self,peer_name):
158
		peer = self._peers.get(peer_name, None)
159
		if not peer:
160
			self.logger.critical('could not find referenced peer', 'reactor')
161
			return
162
		return peer.neighbor
163
164
	def neighbor_name (self,peer_name):
165
		peer = self._peers.get(peer_name, None)
166
		if not peer:
167
			self.logger.critical('could not find referenced peer', 'reactor')
168
			return ""
169
		return peer.neighbor.name()
170
171
	def neighbor_ip (self, peer_name):
172
		peer = self._peers.get(peer_name, None)
173
		if not peer:
174
			self.logger.critical('could not find referenced peer', 'reactor')
175
			return ""
176
		return str(peer.neighbor.peer_address)
177
178
	def neighbor_cli_data (self, peer_name):
179
		peer = self._peers.get(peer_name, None)
180
		if not peer:
181
			self.logger.critical('could not find referenced peer', 'reactor')
182
			return ""
183
		return peer.cli_data()
184
185
	def neighor_rib (self, peer_name, rib_name, advertised=False):
186
		peer = self._peers.get(peer_name, None)
187
		if not peer:
188
			self.logger.critical('could not find referenced peer', 'reactor')
189
			return []
190
		families = None
191
		if advertised:
192
			families = peer.proto.negotiated.families if peer.proto else []
193
		rib = peer.neighbor.rib.outgoing if rib_name == 'out' else peer.neighbor.rib.incoming
194
		return list(rib.cached_changes(families))
195
196
	def neighbor_rib_resend (self, peer_name):
197
		peer = self._peers.get(peer_name, None)
198
		if not peer:
199
			self.logger.critical('could not find referenced peer', 'reactor')
200
			return
201
		peer.neighbor.rib.outgoing.resend(None, peer.neighbor.route_refresh)
202
203
	def neighbor_rib_out_withdraw (self, peer_name):
204
		peer = self._peers.get(peer_name, None)
205
		if not peer:
206
			self.logger.critical('could not find referenced peer', 'reactor')
207
			return
208
		peer.neighbor.rib.outgoing.withdraw(None, peer.neighbor.route_refresh)
209
210
211
	def neighbor_rib_in_clear (self,peer_name):
212
		peer = self._peers.get(peer_name, None)
213
		if not peer:
214
			self.logger.critical('could not find referenced peer', 'reactor')
215
			return
216
		peer.neighbor.rib.incoming.clear()
217
218
	# ...
219
220
	def _completed (self,peers):
221
		for peer in peers:
222
			if self._peers[peer].neighbor.rib.outgoing.pending():
223
				return False
224
		return True
225
226
	def run (self, validate, root):
227
		self.daemon.daemonise()
228
229
		# Make sure we create processes once we have closed file descriptor
230
		# unfortunately, this must be done before reading the configuration file
231
		# so we can not do it with dropped privileges
232
		self.processes = Processes()
233
234
		# we have to read the configuration possibly with root privileges
235
		# as we need the MD5 information when we bind, and root is needed
236
		# to bind to a port < 1024
237
238
		# this is undesirable as :
239
		# - handling user generated data as root should be avoided
240
		# - we may not be able to reload the configuration once the privileges are dropped
241
242
		# but I can not see any way to avoid it
243
		for ip in self._ips:
244
			if not self.listener.listen_on(ip, None, self._port, None, False, None):
245
				return self.Exit.listening
246
247
		if not self.load():
248
			return self.Exit.configuration
249
250
		if validate:  # only validate configuration
251
			self.logger.warning('','configuration')
252
			self.logger.warning('parsed Neighbors, un-templated','configuration')
253
			self.logger.warning('------------------------------','configuration')
254
			self.logger.warning('','configuration')
255
			for key in self._peers:
256
				self.logger.warning(str(self._peers[key].neighbor),'configuration')
257
				self.logger.warning('','configuration')
258
			return self.Exit.validate
259
260
		for neighbor in self.configuration.neighbors.values():
261
			if neighbor.listen:
262
				if not self.listener.listen_on(neighbor.md5_ip, neighbor.peer_address, neighbor.listen, neighbor.md5_password, neighbor.md5_base64, neighbor.ttl_in):
263
					return self.Exit.listening
264
265
		if not self.early_drop:
266
			self.processes.start(self.configuration.processes)
267
268
		if not self.daemon.drop_privileges():
269
			self.logger.critical('could not drop privileges to \'%s\' refusing to run as root' % self.daemon.user,'reactor')
270
			self.logger.critical('set the environmemnt value exabgp.daemon.user to change the unprivileged user','reactor')
271
			return self.Exit.privileges
272
273
		if self.early_drop:
274
			self.processes.start(self.configuration.processes)
275
276
		# This is required to make sure we can write in the log location as we now have dropped root privileges
277
		if not self.logger.restart():
278
			self.logger.critical('could not setup the logger, aborting','reactor')
279
			return self.Exit.log
280
281
		if not self.daemon.savepid():
282
			return self.Exit.pid
283
284
		# did we complete the run of updates caused by the last SIGUSR1/SIGUSR2 ?
285
		reload_completed = False
286
287
		wait = environment.settings().tcp.delay
288
		if wait:
289
			sleeptime = (wait * 60) - int(time.time()) % (wait * 60)
290
			self.logger.debug('waiting for %d seconds before connecting' % sleeptime,'reactor')
291
			time.sleep(float(sleeptime))
292
293
		workers = {}
294
		peers = set()
295
		api_fds = []
296
297
		while True:
298
			try:
299
				if self.signal.received:
300
					for key in self._peers:
301
						if self._peers[key].neighbor.api['signal']:
302
							self._peers[key].reactor.processes.signal(self._peers[key].neighbor,self.signal.number)
303
304
					signaled = self.signal.received
305
					self.signal.rearm()
306
307
					if signaled == Signal.SHUTDOWN:
308
						self.shutdown()
309
						break
310
311
					if signaled == Signal.RESTART:
312
						self.restart()
313
						continue
314
315
					if not reload_completed:
316
						continue
317
318
					if signaled == Signal.FULL_RELOAD:
319
						self._reload_processes = True
320
321
					if signaled in (Signal.RELOAD, Signal.FULL_RELOAD):
322
						self.load()
323
						self.processes.start(self.configuration.processes,self._reload_processes)
324
						self._reload_processes = False
325
						continue
326
327
				if self.listener.incoming():
328
					# check all incoming connection
329
					self.asynchronous.schedule(str(uuid.uuid1()),'checking for new connection(s)',self.listener.new_connections())
330
331
				peers = self.active_peers()
332
				if self._completed(peers):
333
					reload_completed = True
334
335
				sleep = self._sleep_time
336
337
				# do not attempt to listen on closed sockets even if the peer is still here
338
				for io in list(workers.keys()):
339
					if io == -1:
340
						self._poller.unregister(io)
341
						del workers[io]
342
343
				# give a turn to all the peers
344
				for key in list(peers):
345
					peer = self._peers[key]
346
347
					# limit the number of message handling per second
348
					if self._rate_limited(key,peer.neighbor.rate_limit):
349
						peers.discard(key)
350
						continue
351
352
					# handle the peer
353
					action = peer.run()
354
355
					# .run() returns an ACTION enum:
356
					# * immediate if it wants to be called again
357
					# * later if it should be called again but has no work atm
358
					# * close if it is finished and is closing down, or restarting
359
					if action == ACTION.CLOSE:
360
						if key in self._peers:
361
							del self._peers[key]
362
						peers.discard(key)
363
					# we are loosing this peer, not point to schedule more process work
364
					elif action == ACTION.LATER:
365
						io = peer.socket()
366
						if io != -1:
367
							self._poller.register(io, select.POLLIN | select.POLLPRI | select.POLLHUP | select.POLLNVAL | select.POLLERR)
368
							workers[io] = key
369
						# no need to come back to it before a a full cycle
370
						peers.discard(key)
371
					elif action == ACTION.NOW:
372
						sleep = 0
373
374
					if not peers:
375
						break
376
377
				# read at least on message per process if there is some and parse it
378
				for service,command in self.processes.received():
379
					self.api.text(self,service,command)
380
					sleep = 0
381
382
				self.asynchronous.run()
383
384
				if api_fds != self.processes.fds:
385
					for fd in api_fds:
386
						if fd == -1:
387
							continue
388
						if fd not in self.processes.fds:
389
							self._poller.unregister(fd)
390
					for fd in self.processes.fds:
391
						if fd == -1:
392
							continue
393
						if fd not in api_fds:
394
							self._poller.register(fd, select.POLLIN | select.POLLPRI | select.POLLHUP | select.POLLNVAL | select.POLLERR)
395
					api_fds = self.processes.fds
396
397
				for io in self._wait_for_io(sleep):
398
					if io not in api_fds:
399
						peers.add(workers[io])
400
401
				if self._stopping and not self._peers.keys():
402
					self._termination('exiting on peer termination',self.Exit.normal)
403
404
			except KeyboardInterrupt:
405
				self._termination('^C received',self.Exit.normal)
406
			except SystemExit:
407
				self._termination('exiting', self.Exit.normal)
408
			# socket.error is a subclass of IOError (so catch it first)
409
			except socket.error:
410
				self._termination('socket error received',self.Exit.socket)
411
			except IOError:
412
				self._termination('I/O Error received, most likely ^C during IO',self.Exit.io_error)
413
			except ProcessError:
414
				self._termination('Problem when sending message(s) to helper program, stopping',self.Exit.process)
415
			except select.error:
416
				self._termination('problem using select, stopping',self.Exit.select)
417
418
		return self.exit_code
419
420
	def register_peer (self,name,peer):
421
		self._peers[name] = peer
422
423
	def teardown_peer (self,name,code):
424
		self._peers[name].teardown(code)
425
426
	def shutdown (self):
427
		"""Terminate all the current BGP connections"""
428
		self.logger.critical('performing shutdown','reactor')
429
		if self.listener:
430
			self.listener.stop()
431
			self.listener = None
432
		for key in self._peers.keys():
433
			self._peers[key].shutdown()
434
		self.asynchronous.clear()
435
		self.processes.terminate()
436
		self.daemon.removepid()
437
		self._stopping = True
438
439
	def load (self):
440
		"""Reload the configuration and send to the peer the route which changed"""
441
		self.logger.notice('performing reload of exabgp %s' % version,'configuration')
442
443
		reloaded = self.configuration.reload()
444
445
		if not reloaded:
446
			#
447
			# Careful the string below is used but the QA code to check for sucess of failure
448
			self.logger.error('not reloaded, no change found in the configuration','configuration')
449
			# Careful the string above is used but the QA code to check for sucess of failure
450
			#
451
			self.logger.error(str(self.configuration.error),'configuration')
452
			return False
453
454
		for key, peer in self._peers.items():
455
			if key not in self.configuration.neighbors:
456
				self.logger.debug('removing peer: %s' % peer.neighbor.name(),'reactor')
457
				peer.remove()
458
459
		for key, neighbor in self.configuration.neighbors.items():
460
			# new peer
461
			if key not in self._peers:
462
				self.logger.debug('new peer: %s' % neighbor.name(),'reactor')
463
				peer = Peer(neighbor,self)
464
				self._peers[key] = peer
465
			# modified peer
466
			elif self._peers[key].neighbor != neighbor:
467
				self.logger.debug('peer definition change, establishing a new connection for %s' % str(key),'reactor')
468
				self._peers[key].reestablish(neighbor)
469
			# same peer but perhaps not the routes
470
			else:
471
				# finding what route changed and sending the delta is not obvious
472
				self.logger.debug('peer definition identical, updating peer routes if required for %s' % str(key),'reactor')
473
				self._peers[key].reconfigure(neighbor)
474
			for ip in self._ips:
475
				if ip.afi == neighbor.peer_address.afi:
476
					self.listener.listen_on(ip, neighbor.peer_address, self._port, neighbor.md5_password, neighbor.md5_base64, None)
477
		self.logger.notice('loaded new configuration successfully','reactor')
478
479
		return True
480
481
	def restart (self):
482
		"""Kill the BGP session and restart it"""
483
		self.logger.notice('performing restart of exabgp %s' % version,'reactor')
484
485
		# XXX: FIXME: Could return False, in case there is interference with old config...
486
		reloaded = self.configuration.reload()
487
488
		for key in self._peers.keys():
489
			if key not in self.configuration.neighbors.keys():
490
				peer = peers[key]
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable peers does not seem to be defined.
Loading history...
491
				self.logger.debug('removing peer %s' % peer.neighbor.name(),'reactor')
492
				self._peers[key].remove()
493
			else:
494
				self._peers[key].reestablish()
495
		self.processes.start(self.configuration.processes,True)
496
497
	# def nexthops (self, peers):
498
	# 	return dict((peer,self._peers[peer].neighbor.local_address) for peer in peers)
499