Completed
Push — master ( f8c35a...0dc7c6 )
by Thomas
14:34
created

exabgp.reactor.loop.Reactor._active_peers()   A

Complexity

Conditions 4

Size

Total Lines 6
Code Lines 6

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 4
eloc 6
nop 1
dl 0
loc 6
rs 10
c 0
b 0
f 0
1
# encoding: utf-8
2
"""
3
reactor/loop.py
4
5
Created by Thomas Mangin on 2012-06-10.
6
Copyright (c) 2009-2017 Exa Networks. All rights reserved.
7
License: 3-clause BSD. (See the COPYRIGHT file)
8
"""
9
10
import time
11
import uuid
12
import select
13
import socket
14
15
from exabgp.util import character
16
from exabgp.util import concat_bytes_i
17
18
from exabgp.reactor.daemon import Daemon
19
from exabgp.reactor.listener import Listener
20
from exabgp.reactor.api.processes import Processes
21
from exabgp.reactor.api.processes import ProcessError
22
from exabgp.reactor.peer import Peer
23
from exabgp.reactor.peer import ACTION
24
from exabgp.reactor.asynchronous import ASYNC
25
from exabgp.reactor.interrupt import Signal
26
from exabgp.reactor.network.error import error
27
28
from exabgp.reactor.api import API
29
from exabgp.configuration.configuration import Configuration
30
from exabgp.configuration.environment import environment
31
32
from exabgp.version import version
33
from exabgp.logger import Logger
34
35
36
class Reactor (object):
37
	class Exit (object):
38
		normal = 0
39
		validate = 0
40
		listening = 1
41
		configuration = 1
42
		privileges = 1
43
		log = 1
44
		pid = 1
45
		socket = 1
46
		io_error = 1
47
		process = 1
48
		select = 1
49
		unknown = 1
50
51
	# [hex(ord(c)) for c in os.popen('clear').read()]
52
	clear = concat_bytes_i(character(int(c,16)) for c in ['0x1b', '0x5b', '0x48', '0x1b', '0x5b', '0x32', '0x4a'])
53
54
	READ_ONLY = select.POLLIN | select.POLLPRI | select.POLLHUP | select.POLLERR
55
	READ_WRITE = READ_ONLY | select.POLLOUT
56
57
	def __init__ (self, configurations):
58
		self._ips = environment.settings().tcp.bind
59
		self._port = environment.settings().tcp.port
60
		self._stopping = environment.settings().tcp.once
61
		self.exit_code = self.Exit.unknown
62
63
		self.max_loop_time = environment.settings().reactor.speed
64
		self._sleep_time = self.max_loop_time / 100
65
		self._busyspin = {}
66
		self._ratelimit = {}
67
		self.early_drop = environment.settings().daemon.drop
68
69
		self.processes = None
70
71
		self.configuration = Configuration(configurations)
72
		self.logger = Logger()
73
		self.asynchronous = ASYNC()
74
		self.signal = Signal()
75
		self.daemon = Daemon(self)
76
		self.listener = Listener(self)
77
		self.api = API(self)
78
79
		self._peers = {}
80
81
		self._reload_processes = False
82
		self._saved_pid = False
83
		self._poller = select.poll()
84
85
	def _termination (self,reason, exit_code):
86
		self.exit_code = exit_code
87
		self.signal.received = Signal.SHUTDOWN
88
		self.logger.critical(reason,'reactor')
89
90
	def _prevent_spin(self):
91
		second = int(time.time())
92
		if not second in self._busyspin:
93
			self._busyspin = {second: 0}
94
		self._busyspin[second] += 1
95
		if self._busyspin[second] > self.max_loop_time:
96
			time.sleep(self._sleep_time)
97
			return True
98
		return False
99
100
	def _rate_limited(self,peer,rate):
101
		if rate <= 0:
102
			return False
103
		second = int(time.time())
104
		ratelimit = self._ratelimit.get(peer,{})
105
		if not second in ratelimit:
106
			self._ratelimit[peer] = {second: rate-1}
107
			return False
108
		if self._ratelimit[peer][second] > 0:
109
			self._ratelimit[peer][second] -= 1
110
			return False
111
		return True
112
113
	def _wait_for_io (self,sleeptime):
114
		try:
115
			for fd,_ in self._poller.poll(sleeptime):
116
				yield fd
117
		except select.error as exc:
118
			err_no,message = exc.args  # pylint: disable=W0633
119
			if err_no not in error.block:
120
				raise exc
121
			self._prevent_spin()
122
		except socket.error as exc:
123
			# python 3 does not raise on closed FD, but python2 does
124
			# we have lost a peer and it is causing the select
125
			# to complain, the code will self-heal, ignore the issue
126
			# (EBADF from python2 must be ignored if when checkign error.fatal)
127
			# otherwise sending  notification causes TCP to drop and cause
128
			# this code to kill ExaBGP
129
			self._prevent_spin()
130
		except ValueError as exc:
131
			# The peer closing the TCP connection lead to a negative file descritor
132
			self._prevent_spin()
133
		except KeyboardInterrupt:
134
			self._termination('^C received',self.Exit.normal)
135
136
	# peer related functions
137
138
	def active_peers (self):
139
		peers = set()
140
		for key,peer in self._peers.items():
141
			if not peer.neighbor.passive or peer.proto:
142
				peers.add(key)
143
		return peers
144
145
	def peers(self):
146
		return list(self._peers)
147
148
	def handle_connection (self, peer_name, connection):
149
		peer = self._peers.get(peer_name, None)
150
		if not peer:
151
			self.logger.critical('could not find referenced peer','reactor')
152
			return
153
		peer.handle_connection(connection)
154
155
	def neighbor (self,peer_name):
156
		peer = self._peers.get(peer_name, None)
157
		if not peer:
158
			self.logger.critical('could not find referenced peer', 'reactor')
159
			return
160
		return peer.neighbor
161
162
	def neighbor_name (self,peer_name):
163
		peer = self._peers.get(peer_name, None)
164
		if not peer:
165
			self.logger.critical('could not find referenced peer', 'reactor')
166
			return ""
167
		return peer.neighbor.name()
168
169
	def neighbor_ip (self, peer_name):
170
		peer = self._peers.get(peer_name, None)
171
		if not peer:
172
			self.logger.critical('could not find referenced peer', 'reactor')
173
			return ""
174
		return str(peer.neighbor.peer_address)
175
176
	def neighbor_cli_data (self, peer_name):
177
		peer = self._peers.get(peer_name, None)
178
		if not peer:
179
			self.logger.critical('could not find referenced peer', 'reactor')
180
			return ""
181
		return peer.cli_data()
182
183
	def neighor_rib (self, peer_name, rib_name, advertised=False):
184
		peer = self._peers.get(peer_name, None)
185
		if not peer:
186
			self.logger.critical('could not find referenced peer', 'reactor')
187
			return []
188
		families = None
189
		if advertised:
190
			families = peer.proto.negotiated.families if peer.proto else []
191
		rib = peer.neighbor.rib.outgoing if rib_name == 'out' else peer.neighbor.rib.incoming
192
		return list(rib.cached_changes(families))
193
194
	def neighbor_rib_resend (self, peer_name):
195
		peer = self._peers.get(peer_name, None)
196
		if not peer:
197
			self.logger.critical('could not find referenced peer', 'reactor')
198
			return
199
		peer.neighbor.rib.outgoing.resend(None, peer.neighbor.route_refresh)
200
201
	def neighbor_rib_out_withdraw (self, peer_name):
202
		peer = self._peers.get(peer_name, None)
203
		if not peer:
204
			self.logger.critical('could not find referenced peer', 'reactor')
205
			return
206
		peer.neighbor.rib.outgoing.withdraw(None, peer.neighbor.route_refresh)
207
208
209
	def neighbor_rib_in_clear (self,peer_name):
210
		peer = self._peers.get(peer_name, None)
211
		if not peer:
212
			self.logger.critical('could not find referenced peer', 'reactor')
213
			return
214
		peer.neighbor.rib.incoming.clear()
215
216
	# ...
217
218
	def _completed (self,peers):
219
		for peer in peers:
220
			if self._peers[peer].neighbor.rib.outgoing.pending():
221
				return False
222
		return True
223
224
	def run (self, validate, root):
225
		self.daemon.daemonise()
226
227
		# Make sure we create processes once we have closed file descriptor
228
		# unfortunately, this must be done before reading the configuration file
229
		# so we can not do it with dropped privileges
230
		self.processes = Processes()
231
232
		# we have to read the configuration possibly with root privileges
233
		# as we need the MD5 information when we bind, and root is needed
234
		# to bind to a port < 1024
235
236
		# this is undesirable as :
237
		# - handling user generated data as root should be avoided
238
		# - we may not be able to reload the configuration once the privileges are dropped
239
240
		# but I can not see any way to avoid it
241
		for ip in self._ips:
242
			if not self.listener.listen_on(ip, None, self._port, None, False, None):
243
				return self.Exit.listening
244
245
		if not self.load():
246
			return self.Exit.configuration
247
248
		if validate:  # only validate configuration
249
			self.logger.warning('','configuration')
250
			self.logger.warning('parsed Neighbors, un-templated','configuration')
251
			self.logger.warning('------------------------------','configuration')
252
			self.logger.warning('','configuration')
253
			for key in self._peers:
254
				self.logger.warning(str(self._peers[key].neighbor),'configuration')
255
				self.logger.warning('','configuration')
256
			return self.Exit.validate
257
258
		for neighbor in self.configuration.neighbors.values():
259
			if neighbor.listen:
260
				if not self.listener.listen_on(neighbor.md5_ip, neighbor.peer_address, neighbor.listen, neighbor.md5_password, neighbor.md5_base64, neighbor.ttl_in):
261
					return self.Exit.listening
262
263
		if not self.early_drop:
264
			self.processes.start(self.configuration.processes)
265
266
		if not self.daemon.drop_privileges():
267
			self.logger.critical('could not drop privileges to \'%s\' refusing to run as root' % self.daemon.user,'reactor')
268
			self.logger.critical('set the environmemnt value exabgp.daemon.user to change the unprivileged user','reactor')
269
			return self.Exit.privileges
270
271
		if self.early_drop:
272
			self.processes.start(self.configuration.processes)
273
274
		# This is required to make sure we can write in the log location as we now have dropped root privileges
275
		if not self.logger.restart():
276
			self.logger.critical('could not setup the logger, aborting','reactor')
277
			return self.Exit.log
278
279
		if not self.daemon.savepid():
280
			return self.Exit.pid
281
282
		# did we complete the run of updates caused by the last SIGUSR1/SIGUSR2 ?
283
		reload_completed = False
284
285
		wait = environment.settings().tcp.delay
286
		if wait:
287
			sleeptime = (wait * 60) - int(time.time()) % (wait * 60)
288
			self.logger.debug('waiting for %d seconds before connecting' % sleeptime,'reactor')
289
			time.sleep(float(sleeptime))
290
291
		workers = {}
292
		peers = set()
293
		api_fds = []
294
295
		while True:
296
			try:
297
				if self.signal.received:
298
					for key in self._peers:
299
						if self._peers[key].neighbor.api['signal']:
300
							self._peers[key].reactor.processes.signal(self._peers[key].neighbor,self.signal.number)
301
302
					signaled = self.signal.received
303
					self.signal.rearm()
304
305
					if signaled == Signal.SHUTDOWN:
306
						self.shutdown()
307
						break
308
309
					if signaled == Signal.RESTART:
310
						self.restart()
311
						continue
312
313
					if not reload_completed:
314
						continue
315
316
					if signaled == Signal.FULL_RELOAD:
317
						self._reload_processes = True
318
319
					if signaled in (Signal.RELOAD, Signal.FULL_RELOAD):
320
						self.load()
321
						self.processes.start(self.configuration.processes,self._reload_processes)
322
						self._reload_processes = False
323
						continue
324
325
				if self.listener.incoming():
326
					# check all incoming connection
327
					self.asynchronous.schedule(str(uuid.uuid1()),'checking for new connection(s)',self.listener.new_connections())
328
329
				peers = self.active_peers()
330
				if self._completed(peers):
331
					reload_completed = True
332
333
				sleep = self._sleep_time
334
335
				# do not attempt to listen on closed sockets even if the peer is still here
336
				for io in list(workers.keys()):
337
					if io == -1:
338
						self._poller.unregister(io)
339
						del workers[io]
340
341
				# give a turn to all the peers
342
				for key in list(peers):
343
					peer = self._peers[key]
344
345
					# limit the number of message handling per second
346
					if self._rate_limited(key,peer.neighbor.rate_limit):
347
						peers.discard(key)
348
						continue
349
350
					# handle the peer
351
					action = peer.run()
352
353
					# .run() returns an ACTION enum:
354
					# * immediate if it wants to be called again
355
					# * later if it should be called again but has no work atm
356
					# * close if it is finished and is closing down, or restarting
357
					if action == ACTION.CLOSE:
358
						if key in self._peers:
359
							del self._peers[key]
360
						peers.discard(key)
361
					# we are loosing this peer, not point to schedule more process work
362
					elif action == ACTION.LATER:
363
						io = peer.socket()
364
						if io != -1:
365
							self._poller.register(io, self.READ_ONLY)
366
							workers[io] = key
367
						# no need to come back to it before a a full cycle
368
						peers.discard(key)
369
					elif action == ACTION.NOW:
370
						sleep = 0
371
372
					if not peers:
373
						break
374
375
				# read at least on message per process if there is some and parse it
376
				for service,command in self.processes.received():
377
					self.api.text(self,service,command)
378
					sleep = 0
379
380
				self.asynchronous.run()
381
382
				if api_fds != self.processes.fds:
383
					for fd in api_fds:
384
						if fd == -1:
385
							continue
386
						if fd not in self.processes.fds:
387
							self._poller.unregister(fd)
388
					for fd in self.processes.fds:
389
						if fd == -1:
390
							continue
391
						if fd not in api_fds:
392
							self._poller.register(fd, self.READ_ONLY)
393
					api_fds = self.processes.fds
394
395
				for io in self._wait_for_io(sleep):
396
					if io not in api_fds:
397
						peers.add(workers[io])
398
399
				if self._stopping and not self._peers.keys():
400
					self._termination('exiting on peer termination',self.Exit.normal)
401
402
			except KeyboardInterrupt:
403
				self._termination('^C received',self.Exit.normal)
404
			except SystemExit:
405
				self._termination('exiting', self.Exit.normal)
406
			# socket.error is a subclass of IOError (so catch it first)
407
			except socket.error:
408
				self._termination('socket error received',self.Exit.socket)
409
			except IOError:
410
				self._termination('I/O Error received, most likely ^C during IO',self.Exit.io_error)
411
			except ProcessError:
412
				self._termination('Problem when sending message(s) to helper program, stopping',self.Exit.process)
413
			except select.error:
414
				self._termination('problem using select, stopping',self.Exit.select)
415
416
		return self.exit_code
417
418
	def register_peer (self,name,peer):
419
		self._peers[name] = peer
420
421
	def teardown_peer (self,name,code):
422
		self._peers[name].teardown(code)
423
424
	def shutdown (self):
425
		"""Terminate all the current BGP connections"""
426
		self.logger.critical('performing shutdown','reactor')
427
		if self.listener:
428
			self.listener.stop()
429
			self.listener = None
430
		for key in self._peers.keys():
431
			self._peers[key].shutdown()
432
		self.asynchronous.clear()
433
		self.processes.terminate()
434
		self.daemon.removepid()
435
		self._stopping = True
436
437
	def load (self):
438
		"""Reload the configuration and send to the peer the route which changed"""
439
		self.logger.notice('performing reload of exabgp %s' % version,'configuration')
440
441
		reloaded = self.configuration.reload()
442
443
		if not reloaded:
444
			#
445
			# Careful the string below is used but the QA code to check for sucess of failure
446
			self.logger.error('not reloaded, no change found in the configuration','configuration')
447
			# Careful the string above is used but the QA code to check for sucess of failure
448
			#
449
			self.logger.error(str(self.configuration.error),'configuration')
450
			return False
451
452
		for key, peer in self._peers.items():
453
			if key not in self.configuration.neighbors:
454
				self.logger.debug('removing peer: %s' % peer.neighbor.name(),'reactor')
455
				peer.remove()
456
457
		for key, neighbor in self.configuration.neighbors.items():
458
			# new peer
459
			if key not in self._peers:
460
				self.logger.debug('new peer: %s' % neighbor.name(),'reactor')
461
				peer = Peer(neighbor,self)
462
				self._peers[key] = peer
463
			# modified peer
464
			elif self._peers[key].neighbor != neighbor:
465
				self.logger.debug('peer definition change, establishing a new connection for %s' % str(key),'reactor')
466
				self._peers[key].reestablish(neighbor)
467
			# same peer but perhaps not the routes
468
			else:
469
				# finding what route changed and sending the delta is not obvious
470
				self.logger.debug('peer definition identical, updating peer routes if required for %s' % str(key),'reactor')
471
				self._peers[key].reconfigure(neighbor)
472
			for ip in self._ips:
473
				if ip.afi == neighbor.peer_address.afi:
474
					self.listener.listen_on(ip, neighbor.peer_address, self._port, neighbor.md5_password, neighbor.md5_base64, None)
475
		self.logger.notice('loaded new configuration successfully','reactor')
476
477
		return True
478
479
	def restart (self):
480
		"""Kill the BGP session and restart it"""
481
		self.logger.notice('performing restart of exabgp %s' % version,'reactor')
482
483
		# XXX: FIXME: Could return False, in case there is interference with old config...
484
		reloaded = self.configuration.reload()
485
486
		for key in self._peers.keys():
487
			if key not in self.configuration.neighbors.keys():
488
				peer = peers[key]
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable peers does not seem to be defined.
Loading history...
489
				self.logger.debug('removing peer %s' % peer.neighbor.name(),'reactor')
490
				self._peers[key].remove()
491
			else:
492
				self._peers[key].reestablish()
493
		self.processes.start(self.configuration.processes,True)
494
495
	# def nexthops (self, peers):
496
	# 	return dict((peer,self._peers[peer].neighbor.local_address) for peer in peers)
497