Completed
Push — master ( ea37f3...9aacac )
by Thomas
11:37
created

exabgp.reactor.loop.Reactor._pending_adjribout()   A

Complexity

Conditions 3

Size

Total Lines 5
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 3
eloc 5
nop 1
dl 0
loc 5
rs 10
c 0
b 0
f 0
1
# encoding: utf-8
2
"""
3
reactor/loop.py
4
5
Created by Thomas Mangin on 2012-06-10.
6
Copyright (c) 2009-2017 Exa Networks. All rights reserved.
7
License: 3-clause BSD. (See the COPYRIGHT file)
8
"""
9
10
import time
11
import uuid
12
import select
13
import socket
14
15
from exabgp.reactor.daemon import Daemon
16
from exabgp.reactor.listener import Listener
17
from exabgp.reactor.api.processes import Processes
18
from exabgp.reactor.api.processes import ProcessError
19
from exabgp.reactor.peer import Peer
20
from exabgp.reactor.peer import ACTION
21
from exabgp.reactor.asynchronous import ASYNC
22
from exabgp.reactor.interrupt import Signal
23
24
from exabgp.reactor.api import API
25
from exabgp.configuration.configuration import Configuration
26
from exabgp.environment import getenv
27
28
from exabgp.bgp.fsm import FSM
29
30
from exabgp.version import version
31
from exabgp.logger import log
32
33
34
class Reactor(object):
35
    class Exit(object):
36
        normal = 0
37
        validate = 0
38
        listening = 1
39
        configuration = 1
40
        privileges = 1
41
        log = 1
42
        pid = 1
43
        socket = 1
44
        io_error = 1
45
        process = 1
46
        select = 1
47
        unknown = 1
48
49
    # [hex(ord(c)) for c in os.popen('clear').read()]
50
    clear = b''.join(bytes([int(c, 16)]) for c in ['0x1b', '0x5b', '0x48', '0x1b', '0x5b', '0x32', '0x4a'])
51
52
    def __init__(self, configurations):
53
        self._ips = getenv().tcp.bind
54
        self._port = getenv().tcp.port
55
        self._stopping = getenv().tcp.once
56
        self.exit_code = self.Exit.unknown
57
58
        self.max_loop_time = getenv().reactor.speed
59
        self._sleep_time = self.max_loop_time / 100
60
        self._busyspin = {}
61
        self._ratelimit = {}
62
        self.early_drop = getenv().daemon.drop
63
64
        self.processes = None
65
66
        self.configuration = Configuration(configurations)
67
        self.asynchronous = ASYNC()
68
        self.signal = Signal()
69
        self.daemon = Daemon(self)
70
        self.listener = Listener(self)
71
        self.api = API(self)
72
73
        self._peers = {}
74
75
        self._saved_pid = False
76
        self._poller = select.poll()
77
78
    def _termination(self, reason, exit_code):
79
        self.exit_code = exit_code
80
        self.signal.received = Signal.SHUTDOWN
81
        log.critical(reason, 'reactor')
82
83
    def _prevent_spin(self):
84
        second = int(time.time())
85
        if not second in self._busyspin:
86
            self._busyspin = {second: 0}
87
        self._busyspin[second] += 1
88
        if self._busyspin[second] > self.max_loop_time:
89
            time.sleep(self._sleep_time)
90
            return True
91
        return False
92
93
    def _rate_limited(self, peer, rate):
94
        if rate <= 0:
95
            return False
96
        second = int(time.time())
97
        ratelimit = self._ratelimit.get(peer, {})
98
        if not second in ratelimit:
99
            self._ratelimit[peer] = {second: rate - 1}
100
            return False
101
        if self._ratelimit[peer][second] > 0:
102
            self._ratelimit[peer][second] -= 1
103
            return False
104
        return True
105
106
    def _wait_for_io(self, sleeptime):
107
        spin_prevention = False
108
        try:
109
            for fd, event in self._poller.poll(sleeptime):
110
                if event & select.POLLIN or event & select.POLLPRI:
111
                    yield fd
112
                    continue
113
                elif event & select.POLLHUP or event & select.POLLERR or event & select.POLLNVAL:
114
                    spin_prevention = True
115
                    continue
116
            if spin_prevention:
117
                self._prevent_spin()
118
        except KeyboardInterrupt:
119
            self._termination('^C received', self.Exit.normal)
120
            return
121
        except Exception:
122
            self._prevent_spin()
123
            return
124
125
    # peer related functions
126
127
    def active_peers(self):
128
        peers = set()
129
        for key, peer in self._peers.items():
130
            if not peer.neighbor.passive or peer.proto:
131
                peers.add(key)
132
        return peers
133
134
    def established_peers(self):
135
        peers = set()
136
        for key, peer in self._peers.items():
137
            if peer.fsm == FSM.ESTABLISHED:
138
                peers.add(key)
139
        return peers
140
141
    def peers(self):
142
        return list(self._peers)
143
144
    def handle_connection(self, peer_name, connection):
145
        peer = self._peers.get(peer_name, None)
146
        if not peer:
147
            log.critical('could not find referenced peer', 'reactor')
148
            return
149
        peer.handle_connection(connection)
150
151
    def neighbor(self, peer_name):
152
        peer = self._peers.get(peer_name, None)
153
        if not peer:
154
            log.critical('could not find referenced peer', 'reactor')
155
            return
156
        return peer.neighbor
157
158
    def neighbor_name(self, peer_name):
159
        peer = self._peers.get(peer_name, None)
160
        if not peer:
161
            log.critical('could not find referenced peer', 'reactor')
162
            return ""
163
        return peer.neighbor.name()
164
165
    def neighbor_ip(self, peer_name):
166
        peer = self._peers.get(peer_name, None)
167
        if not peer:
168
            log.critical('could not find referenced peer', 'reactor')
169
            return ""
170
        return str(peer.neighbor.peer_address)
171
172
    def neighbor_cli_data(self, peer_name):
173
        peer = self._peers.get(peer_name, None)
174
        if not peer:
175
            log.critical('could not find referenced peer', 'reactor')
176
            return ""
177
        return peer.cli_data()
178
179
    def neighor_rib(self, peer_name, rib_name, advertised=False):
180
        peer = self._peers.get(peer_name, None)
181
        if not peer:
182
            log.critical('could not find referenced peer', 'reactor')
183
            return []
184
        families = None
185
        if advertised:
186
            families = peer.proto.negotiated.families if peer.proto else []
187
        rib = peer.neighbor.rib.outgoing if rib_name == 'out' else peer.neighbor.rib.incoming
188
        return list(rib.cached_changes(families))
189
190
    def neighbor_rib_resend(self, peer_name):
191
        peer = self._peers.get(peer_name, None)
192
        if not peer:
193
            log.critical('could not find referenced peer', 'reactor')
194
            return
195
        peer.neighbor.rib.outgoing.resend(None, peer.neighbor.route_refresh)
196
197
    def neighbor_rib_out_withdraw(self, peer_name):
198
        peer = self._peers.get(peer_name, None)
199
        if not peer:
200
            log.critical('could not find referenced peer', 'reactor')
201
            return
202
        peer.neighbor.rib.outgoing.withdraw(None, peer.neighbor.route_refresh)
203
204
    def neighbor_rib_in_clear(self, peer_name):
205
        peer = self._peers.get(peer_name, None)
206
        if not peer:
207
            log.critical('could not find referenced peer', 'reactor')
208
            return
209
        peer.neighbor.rib.incoming.clear()
210
211
    # ...
212
213
    def _pending_adjribout(self):
214
        for peer in self.active_peers():
215
            if self._peers[peer].neighbor.rib.outgoing.pending():
216
                return True
217
        return False
218
219
    def run(self, validate, root):
220
        self.daemon.daemonise()
221
222
        # Make sure we create processes once we have closed file descriptor
223
        # unfortunately, this must be done before reading the configuration file
224
        # so we can not do it with dropped privileges
225
        self.processes = Processes()
226
227
        # we have to read the configuration possibly with root privileges
228
        # as we need the MD5 information when we bind, and root is needed
229
        # to bind to a port < 1024
230
231
        # this is undesirable as :
232
        # - handling user generated data as root should be avoided
233
        # - we may not be able to reload the configuration once the privileges are dropped
234
235
        # but I can not see any way to avoid it
236
        for ip in self._ips:
237
            if not self.listener.listen_on(ip, None, self._port, None, False, None):
238
                return self.Exit.listening
239
240
        if not self.reload():
241
            return self.Exit.configuration
242
243
        if validate:  # only validate configuration
244
            log.warning('', 'configuration')
245
            log.warning('parsed Neighbors, un-templated', 'configuration')
246
            log.warning('------------------------------', 'configuration')
247
            log.warning('', 'configuration')
248
            for key in self._peers:
249
                log.warning(str(self._peers[key].neighbor), 'configuration')
250
                log.warning('', 'configuration')
251
            return self.Exit.validate
252
253
        for neighbor in self.configuration.neighbors.values():
254
            if neighbor.listen:
255
                if not self.listener.listen_on(
256
                    neighbor.md5_ip,
257
                    neighbor.peer_address,
258
                    neighbor.listen,
259
                    neighbor.md5_password,
260
                    neighbor.md5_base64,
261
                    neighbor.ttl_in,
262
                ):
263
                    return self.Exit.listening
264
265
        if not self.early_drop:
266
            self.processes.start(self.configuration.processes)
267
268
        if not self.daemon.drop_privileges():
269
            log.critical(
270
                'could not drop privileges to \'%s\' refusing to run as root' % self.daemon.user, 'reactor'
271
            )
272
            log.critical(
273
                'set the environmemnt value exabgp.daemon.user to change the unprivileged user', 'reactor'
274
            )
275
            return self.Exit.privileges
276
277
        if self.early_drop:
278
            self.processes.start(self.configuration.processes)
279
280
        # This is required to make sure we can write in the log location as we now have dropped root privileges
281
        if not log.restart():
282
            log.critical('could not setup the logger, aborting', 'reactor')
283
            return self.Exit.log
284
285
        if not self.daemon.savepid():
286
            return self.Exit.pid
287
288
        wait = getenv().tcp.delay
289
        if wait:
290
            sleeptime = (wait * 60) - int(time.time()) % (wait * 60)
291
            log.debug('waiting for %d seconds before connecting' % sleeptime, 'reactor')
292
            time.sleep(float(sleeptime))
293
294
        workers = {}
295
        peers = set()
296
        api_fds = []
297
        ms_sleep = int(self._sleep_time * 1000)
298
299
        while True:
300
            try:
301
                if self.signal.received:
302
                    signaled = self.signal.received
303
304
                    # report that we received a signal
305
                    for key in self._peers:
306
                        if self._peers[key].neighbor.api['signal']:
307
                            self._peers[key].reactor.processes.signal(self._peers[key].neighbor, self.signal.number)
308
309
                    self.signal.rearm()
310
311
                    # we always want to exit
312
                    if signaled == Signal.SHUTDOWN:
313
                        self.exit_code = self.Exit.normal
314
                        self.shutdown()
315
                        break
316
317
                    # it does mot matter what we did if we are restarting
318
                    # as the peers and network stack are replaced by new ones
319
                    if signaled == Signal.RESTART:
320
                        self.restart()
321
                        continue
322
323
                    # did we complete the run of updates caused by the last SIGUSR1/SIGUSR2 ?
324
                    if self._pending_adjribout():
325
                        continue
326
327
                    if signaled == Signal.RELOAD:
328
                        self.reload()
329
                        self.processes.start(self.configuration.processes, False)
330
                        continue
331
332
                    if signaled == Signal.FULL_RELOAD:
333
                        self.reload()
334
                        self.processes.start(self.configuration.processes, True)
335
                        continue
336
337
                if self.listener.incoming():
338
                    # check all incoming connection
339
                    self.asynchronous.schedule(
340
                        str(uuid.uuid1()), 'checking for new connection(s)', self.listener.new_connections()
341
                    )
342
343
                sleep = ms_sleep
344
345
                # do not attempt to listen on closed sockets even if the peer is still here
346
                for io in list(workers.keys()):
347
                    if io == -1:
348
                        self._poller.unregister(io)
349
                        del workers[io]
350
351
                peers = self.active_peers()
352
                # give a turn to all the peers
353
                for key in list(peers):
354
                    peer = self._peers[key]
355
356
                    # limit the number of message handling per second
357
                    if self._rate_limited(key, peer.neighbor.rate_limit):
358
                        peers.discard(key)
359
                        continue
360
361
                    # handle the peer
362
                    action = peer.run()
363
364
                    # .run() returns an ACTION enum:
365
                    # * immediate if it wants to be called again
366
                    # * later if it should be called again but has no work atm
367
                    # * close if it is finished and is closing down, or restarting
368
                    if action == ACTION.CLOSE:
369
                        if key in self._peers:
370
                            del self._peers[key]
371
                        peers.discard(key)
372
                    # we are loosing this peer, not point to schedule more process work
373
                    elif action == ACTION.LATER:
374
                        io = peer.socket()
375
                        if io != -1:
376
                            self._poller.register(
377
                                io, select.POLLIN | select.POLLPRI | select.POLLHUP | select.POLLNVAL | select.POLLERR
378
                            )
379
                            workers[io] = key
380
                        # no need to come back to it before a a full cycle
381
                        peers.discard(key)
382
                    elif action == ACTION.NOW:
383
                        sleep = 0
384
385
                    if not peers:
386
                        break
387
388
                # read at least on message per process if there is some and parse it
389
                for service, command in self.processes.received():
390
                    self.api.text(self, service, command)
391
                    sleep = 0
392
393
                self.asynchronous.run()
394
395
                if api_fds != self.processes.fds:
396
                    for fd in api_fds:
397
                        if fd == -1:
398
                            continue
399
                        if fd not in self.processes.fds:
400
                            self._poller.unregister(fd)
401
                    for fd in self.processes.fds:
402
                        if fd == -1:
403
                            continue
404
                        if fd not in api_fds:
405
                            self._poller.register(
406
                                fd, select.POLLIN | select.POLLPRI | select.POLLHUP | select.POLLNVAL | select.POLLERR
407
                            )
408
                    api_fds = self.processes.fds
409
410
                for io in self._wait_for_io(sleep):
411
                    if io not in api_fds:
412
                        peers.add(workers[io])
413
414
                if self._stopping and not self._peers.keys():
415
                    self._termination('exiting on peer termination', self.Exit.normal)
416
417
            except KeyboardInterrupt:
418
                self._termination('^C received', self.Exit.normal)
419
            except SystemExit:
420
                self._termination('exiting', self.Exit.normal)
421
            # socket.error is a subclass of IOError (so catch it first)
422
            except socket.error:
423
                self._termination('socket error received', self.Exit.socket)
424
            except IOError:
425
                self._termination('I/O Error received, most likely ^C during IO', self.Exit.io_error)
426
            except ProcessError:
427
                self._termination('Problem when sending message(s) to helper program, stopping', self.Exit.process)
428
            except select.error:
429
                self._termination('problem using select, stopping', self.Exit.select)
430
431
        return self.exit_code
432
433
    def register_peer(self, name, peer):
434
        self._peers[name] = peer
435
436
    def teardown_peer(self, name, code):
437
        self._peers[name].teardown(code)
438
439
    def shutdown(self):
440
        """Terminate all the current BGP connections"""
441
        log.critical('performing shutdown', 'reactor')
442
        if self.listener:
443
            self.listener.stop()
444
            self.listener = None
445
        for key in self._peers.keys():
446
            self._peers[key].shutdown()
447
        self.asynchronous.clear()
448
        self.processes.terminate()
449
        self.daemon.removepid()
450
        self._stopping = True
451
452
    def reload(self):
453
        """Reload the configuration and send to the peer the route which changed"""
454
        log.notice('performing reload of exabgp %s' % version, 'configuration')
455
456
        reloaded = self.configuration.reload()
457
458
        if not reloaded:
459
            #
460
            # Careful the string below is used but the QA code to check for sucess of failure
461
            log.error('not reloaded, no change found in the configuration', 'configuration')
462
            # Careful the string above is used but the QA code to check for sucess of failure
463
            #
464
            log.error(str(self.configuration.error), 'configuration')
465
            return False
466
467
        for key, peer in self._peers.items():
468
            if key not in self.configuration.neighbors:
469
                log.debug('removing peer: %s' % peer.neighbor.name(), 'reactor')
470
                peer.remove()
471
472
        for key, neighbor in self.configuration.neighbors.items():
473
            # new peer
474
            if key not in self._peers:
475
                log.debug('new peer: %s' % neighbor.name(), 'reactor')
476
                peer = Peer(neighbor, self)
477
                self._peers[key] = peer
478
            # modified peer
479
            elif self._peers[key].neighbor != neighbor:
480
                log.debug('peer definition change, establishing a new connection for %s' % str(key), 'reactor')
481
                self._peers[key].reestablish(neighbor)
482
            # same peer but perhaps not the routes
483
            else:
484
                # finding what route changed and sending the delta is not obvious
485
                log.debug(
486
                    'peer definition identical, updating peer routes if required for %s' % str(key), 'reactor'
487
                )
488
                self._peers[key].reconfigure(neighbor)
489
            for ip in self._ips:
490
                if ip.afi == neighbor.peer_address.afi:
491
                    self.listener.listen_on(
492
                        ip, neighbor.peer_address, self._port, neighbor.md5_password, neighbor.md5_base64, None
493
                    )
494
        log.notice('loaded new configuration successfully', 'reactor')
495
496
        return True
497
498
    def restart(self):
499
        """Kill the BGP session and restart it"""
500
        log.notice('performing restart of exabgp %s' % version, 'reactor')
501
502
        # XXX: FIXME: Could return False, in case there is interference with old config...
503
        reloaded = self.configuration.reload()
504
505
        for key in self._peers.keys():
506
            if key not in self.configuration.neighbors.keys():
507
                peer = self._peers[key]
508
                log.debug('removing peer %s' % peer.neighbor.name(), 'reactor')
509
                self._peers[key].remove()
510
            else:
511
                self._peers[key].reestablish()
512
        self.processes.start(self.configuration.processes, True)
513
514
    # def nexthops (self, peers):
515
    # 	return dict((peer,self._peers[peer].neighbor.local_address) for peer in peers)
516