exabgp.reactor.protocol   F
last analyzed

Complexity

Total Complexity 102

Size/Duplication

Total Lines 404
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 276
dl 0
loc 404
rs 2
c 0
b 0
f 0
wmc 102

21 Methods

Rating   Name   Duplication   Size   Complexity  
A Protocol.send() 0 9 3
A Protocol.write() 0 10 3
A Protocol.me() 0 2 1
A Protocol.close() 0 14 4
A Protocol.accept() 0 8 2
A Protocol.__init__() 0 18 4
B Protocol.connect() 0 25 8
B Protocol._to_api() 0 23 6
A Protocol.fd() 0 4 2
F Protocol.read_message() 0 79 30
A Protocol.read_open() 0 12 4
A Protocol.new_refresh() 0 5 2
A Protocol.new_open() 0 22 4
A Protocol.new_notification() 0 9 2
A Protocol.new_keepalive() 0 9 3
A Protocol.read_keepalive() 0 11 4
A Protocol.validate_open() 0 20 5
A Protocol.new_update() 0 12 5
B Protocol.new_eors() 0 16 6
A Protocol.new_operational() 0 5 2
A Protocol.new_eor() 0 6 2

How to fix   Complexity   

Complexity

Complex classes like exabgp.reactor.protocol often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# encoding: utf-8
2
"""
3
protocol.py
4
5
Created by Thomas Mangin on 2009-08-25.
6
Copyright (c) 2009-2017 Exa Networks. All rights reserved.
7
License: 3-clause BSD. (See the COPYRIGHT file)
8
"""
9
10
import os
11
12
import traceback
13
14
# ================================================================ Registration
15
#
16
17
from exabgp.reactor.network.outgoing import Outgoing
18
19
# from exabgp.reactor.network.error import NotifyError
20
21
from exabgp.protocol.family import AFI
22
from exabgp.protocol.family import SAFI
23
from exabgp.bgp.message import Message
24
from exabgp.bgp.message import NOP
25
from exabgp.bgp.message import _NOP
26
from exabgp.bgp.message import Open
27
from exabgp.bgp.message.open import Version
28
from exabgp.bgp.message.open.capability import Capabilities
29
from exabgp.bgp.message.open.capability import Negotiated
30
from exabgp.bgp.message import Update
31
from exabgp.bgp.message import EOR
32
from exabgp.bgp.message import KeepAlive
33
from exabgp.bgp.message import Notification
34
from exabgp.bgp.message import Notify
35
from exabgp.bgp.message import Operational
36
37
from exabgp.bgp.message.direction import IN
38
from exabgp.bgp.message.update.attribute import Attribute
39
40
from exabgp.protocol.ip import IP
41
from exabgp.reactor.api.processes import ProcessError
42
43
from exabgp.logger import log
44
45
# This is the number of chuncked message we are willing to buffer, not the number of routes
46
MAX_BACKLOG = 15000
47
48
_UPDATE = Update([], b'')
49
_OPERATIONAL = Operational(0x00)
50
51
52
class Protocol(object):
53
    decode = True
54
55
    def __init__(self, peer):
56
        self.peer = peer
57
        self.neighbor = peer.neighbor
58
        self.negotiated = Negotiated(self.neighbor)
59
        self.connection = None
60
61
        if self.neighbor.connect:
62
            self.port = self.neighbor.connect
63
        elif os.environ.get('exabgp.tcp.port', '').isdigit():
64
            self.port = int(os.environ.get('exabgp.tcp.port'))
65
        elif os.environ.get('exabgp_tcp_port', '').isdigit():
66
            self.port = int(os.environ.get('exabgp_tcp_port'))
67
        else:
68
            self.port = 179
69
70
        from exabgp.environment import getenv
71
72
        self.log_routes = peer.neighbor.adj_rib_in or getenv().log.routes
73
74
    def fd(self):
75
        if self.connection is None:
76
            return -1
77
        return self.connection.fd()
78
79
    # XXX: we use self.peer.neighbor.peer_address when we could use self.neighbor.peer_address
80
81
    def me(self, message):
82
        return "%s/%s %s" % (self.peer.neighbor.peer_address, self.peer.neighbor.peer_as, message)
83
84
    def accept(self, incoming):
85
        self.connection = incoming
86
87
        if self.peer.neighbor.api['neighbor-changes']:
88
            self.peer.reactor.processes.connected(self.peer.neighbor)
89
90
        # very important - as we use this function on __init__
91
        return self
92
93
    def connect(self):
94
        # allows to test the protocol code using modified StringIO with a extra 'pending' function
95
        if self.connection:
96
            return
97
98
        local = self.neighbor.md5_ip.top() if not self.neighbor.auto_discovery else None
99
        peer = self.neighbor.peer_address.top()
100
        afi = self.neighbor.peer_address.afi
101
        md5 = self.neighbor.md5_password
102
        md5_base64 = self.neighbor.md5_base64
103
        ttl_out = self.neighbor.ttl_out
104
        self.connection = Outgoing(afi, peer, local, self.port, md5, md5_base64, ttl_out)
105
106
        for connected in self.connection.establish():
107
            yield False
108
109
        if self.peer.neighbor.api['neighbor-changes']:
110
            self.peer.reactor.processes.connected(self.peer.neighbor)
111
112
        if not local:
113
            self.neighbor.local_address = IP.create(self.connection.local)
114
            if self.neighbor.router_id is None and self.neighbor.local_address.afi == AFI.ipv4:
115
                self.neighbor.router_id = self.neighbor.local_address
116
117
        yield True
118
119
    def close(self, reason='protocol closed, reason unspecified'):
120
        if self.connection:
121
            log.debug(reason, self.connection.session())
122
123
            # must be first otherwise we could have a loop caused by the raise in the below
124
            self.connection.close()
125
            self.connection = None
126
127
            self.peer.stats['down'] = self.peer.stats.get('down', 0) + 1
128
            try:
129
                if self.peer.neighbor.api['neighbor-changes']:
130
                    self.peer.reactor.processes.down(self.peer.neighbor, reason)
131
            except ProcessError:
132
                log.debug('could not send notification of neighbor close to API', self.connection.session())
133
134
    def _to_api(self, direction, message, raw):
135
        packets = self.neighbor.api['%s-packets' % direction]
136
        parsed = self.neighbor.api['%s-parsed' % direction]
137
        consolidate = self.neighbor.api['%s-consolidate' % direction]
138
        negotiated = self.negotiated if self.neighbor.api['negotiated'] else None
139
140
        if consolidate:
141
            if packets:
142
                self.peer.reactor.processes.message(
143
                    message.ID, self.peer.neighbor, direction, message, negotiated, raw[:19], raw[19:]
144
                )
145
            else:
146
                self.peer.reactor.processes.message(
147
                    message.ID, self.peer.neighbor, direction, message, negotiated, b'', b''
148
                )
149
        else:
150
            if packets:
151
                self.peer.reactor.processes.packets(
152
                    self.peer.neighbor, direction, int(message.ID), negotiated, raw[:19], raw[19:]
153
                )
154
            if parsed:
155
                self.peer.reactor.processes.message(
156
                    message.ID, self.peer.neighbor, direction, message, negotiated, b'', b''
157
                )
158
159
    def write(self, message, negotiated=None):
160
        raw = message.message(negotiated)
161
162
        code = 'send-%s' % Message.CODE.short(message.ID)
163
        self.peer.stats[code] = self.peer.stats.get(code, 0) + 1
164
        if self.neighbor.api.get(code, False):
165
            self._to_api('send', message, raw)
166
167
        for boolean in self.connection.writer(raw):
168
            yield boolean
169
170
    def send(self, raw):
171
        code = 'send-%s' % Message.CODE.short(raw[18])
172
        self.peer.stats[code] = self.peer.stats.get(code, 0) + 1
173
        if self.neighbor.api.get(code, False):
174
            message = Update.unpack_message(raw[19:], self.negotiated)
175
            self._to_api('send', message, raw)
176
177
        for boolean in self.connection.writer(raw):
178
            yield boolean
179
180
    # Read from network .......................................................
181
182
    def read_message(self):
183
        # This will always be defined by the loop but scope leaking upset scrutinizer/pylint
184
        msg_id = None
185
186
        packets = self.neighbor.api['receive-packets']
187
        consolidate = self.neighbor.api['receive-consolidate']
188
        parsed = self.neighbor.api['receive-parsed']
189
190
        body, header = b'', b''  # just because pylint/pylama are getting more clever
191
192
        for length, msg_id, header, body, notify in self.connection.reader():
193
            # internal issue
194
            if notify:
195
                code = 'receive-%s' % Message.CODE.NOTIFICATION.SHORT
196
                if self.neighbor.api.get(code, False):
197
                    if consolidate:
198
                        self.peer.reactor.processes.notification(
199
                            self.peer.neighbor, 'receive', notify.code, notify.subcode, str(notify), None, header, body
200
                        )
201
                    elif parsed:
202
                        self.peer.reactor.processes.notification(
203
                            self.peer.neighbor, 'receive', notify.code, notify.subcode, str(notify), None, b'', b''
204
                        )
205
                    elif packets:
206
                        self.peer.reactor.processes.packets(self.peer.neighbor, 'receive', msg_id, None, header, body)
207
                # XXX: is notify not already Notify class ?
208
                raise Notify(notify.code, notify.subcode, str(notify))
209
210
            if not length:
211
                yield _NOP
212
                continue
213
214
            log.debug('<< message of type %s' % Message.CODE.name(msg_id), self.connection.session())
215
216
            code = 'receive-%s' % Message.CODE.short(msg_id)
217
            self.peer.stats[code] = self.peer.stats.get(code, 0) + 1
218
            for_api = self.neighbor.api.get(code, False)
219
220
            if for_api and packets and not consolidate:
221
                negotiated = self.negotiated if self.neighbor.api.get('negotiated', False) else None
222
                self.peer.reactor.processes.packets(self.peer.neighbor, 'receive', msg_id, negotiated, header, body)
223
224
            if msg_id == Message.CODE.UPDATE:
225
                if not self.neighbor.adj_rib_in and not (for_api or self.log_routes) and not (parsed or consolidate):
226
                    yield _UPDATE
227
                    return
228
229
            try:
230
                message = Message.unpack(msg_id, body, self.negotiated)
231
            except (KeyboardInterrupt, SystemExit, Notify):
232
                raise
233
            except Exception as exc:
234
                log.debug('could not decode message "%d"' % msg_id, self.connection.session())
235
                log.debug('%s' % str(exc), self.connection.session())
236
                log.debug(traceback.format_exc(), self.connection.session())
237
                raise Notify(1, 0, 'can not decode update message of type "%d"' % msg_id)
238
                # raise Notify(5,0,'unknown message received')
239
240
            if message.TYPE == Update.TYPE:
241
                if Attribute.CODE.INTERNAL_TREAT_AS_WITHDRAW in message.attributes:
242
                    for nlri in message.nlris:
243
                        nlri.action = IN.WITHDRAWN
244
245
            if for_api:
246
                negotiated = self.negotiated if self.neighbor.api.get('negotiated', False) else None
247
                if consolidate:
248
                    self.peer.reactor.processes.message(
249
                        msg_id, self.neighbor, 'receive', message, negotiated, header, body
250
                    )
251
                elif parsed:
252
                    self.peer.reactor.processes.message(msg_id, self.neighbor, 'receive', message, negotiated, b'', b'')
253
254
            if message.TYPE == Notification.TYPE:
255
                raise message
256
257
            if message.TYPE == Update.TYPE and Attribute.CODE.INTERNAL_DISCARD in message.attributes:
258
                yield _NOP
259
            else:
260
                yield message
261
262
    def validate_open(self):
263
        error = self.negotiated.validate(self.neighbor)
264
        if error is not None:
265
            raise Notify(*error)
266
267
        if self.neighbor.api['negotiated']:
268
            self.peer.reactor.processes.negotiated(self.peer.neighbor, self.negotiated)
269
270
        if self.negotiated.mismatch:
271
            log.warning(
272
                '--------------------------------------------------------------------', self.connection.session()
273
            )
274
            log.warning('the connection can not carry the following family/families', self.connection.session())
275
            for reason, (afi, safi) in self.negotiated.mismatch:
276
                log.warning(' - %s is not configured for %s/%s' % (reason, afi, safi), self.connection.session())
277
            log.warning(
278
                'therefore no routes of this kind can be announced on the connection', self.connection.session()
279
            )
280
            log.warning(
281
                '--------------------------------------------------------------------', self.connection.session()
282
            )
283
284
    def read_open(self, ip):
285
        for received_open in self.read_message():
286
            if received_open.TYPE == NOP.TYPE:
287
                yield received_open
288
            else:
289
                break
290
291
        if received_open.TYPE != Open.TYPE:
0 ignored issues
show
introduced by
The variable received_open does not seem to be defined in case the for loop on line 285 is not entered. Are you sure this can never be the case?
Loading history...
292
            raise Notify(5, 1, 'The first packet received is not an open message (%s)' % received_open)
293
294
        log.debug('<< %s' % received_open, self.connection.session())
295
        yield received_open
296
297
    def read_keepalive(self):
298
        for message in self.read_message():
299
            if message.TYPE == NOP.TYPE:
300
                yield message
301
            else:
302
                break
303
304
        if message.TYPE != KeepAlive.TYPE:
0 ignored issues
show
introduced by
The variable message does not seem to be defined in case the for loop on line 298 is not entered. Are you sure this can never be the case?
Loading history...
305
            raise Notify(5, 2)
306
307
        yield message
308
309
    #
310
    # Sending message to peer
311
    #
312
313
    def new_open(self):
314
        if self.neighbor.local_as:
315
            local_as = self.neighbor.local_as
316
        elif self.negotiated.received_open:
317
            local_as = self.negotiated.received_open.asn
318
        else:
319
            raise RuntimeError('no ASN available for the OPEN message')
320
321
        sent_open = Open(
322
            Version(4),
323
            local_as,
324
            self.neighbor.hold_time,
325
            self.neighbor.router_id,
326
            Capabilities().new(self.neighbor, self.peer._restarted),
327
        )
328
329
        # we do not buffer open message in purpose
330
        for _ in self.write(sent_open):
331
            yield _NOP
332
333
        log.debug('>> %s' % sent_open, self.connection.session())
334
        yield sent_open
335
336
    def new_keepalive(self, comment=''):
337
        keepalive = KeepAlive()
338
339
        for _ in self.write(keepalive):
340
            yield _NOP
341
342
        log.debug('>> KEEPALIVE%s' % (' (%s)' % comment if comment else ''), self.connection.session())
343
344
        yield keepalive
345
346
    def new_notification(self, notification):
347
        for _ in self.write(notification):
348
            yield _NOP
349
        log.debug(
350
            '>> NOTIFICATION (%d,%d,"%s")'
351
            % (notification.code, notification.subcode, notification.data.decode('utf-8')),
352
            self.connection.session(),
353
        )
354
        yield notification
355
356
    def new_update(self, include_withdraw):
357
        updates = self.neighbor.rib.outgoing.updates(self.neighbor.group_updates)
358
        number = 0
359
        for update in updates:
360
            for message in update.messages(self.negotiated, include_withdraw):
361
                number += 1
362
                for boolean in self.send(message):
363
                    # boolean is a transient network error we already announced
364
                    yield _NOP
365
        if number:
366
            log.debug('>> %d UPDATE(s)' % number, self.connection.session())
367
        yield _UPDATE
368
369
    def new_eor(self, afi, safi):
370
        eor = EOR(afi, safi)
371
        for _ in self.write(eor):
372
            yield _NOP
373
        log.debug('>> EOR %s %s' % (afi, safi), self.connection.session())
374
        yield eor
375
376
    def new_eors(self, afi=AFI.undefined, safi=SAFI.undefined):
377
        # Send EOR to let our peer know he can perform a RIB update
378
        if self.negotiated.families:
379
            families = (
380
                self.negotiated.families if (afi, safi) == (AFI.undefined, SAFI.undefined) else [(afi, safi),]
381
            )
382
            for eor_afi, eor_safi in families:
383
                for _ in self.new_eor(eor_afi, eor_safi):
384
                    yield _
385
        else:
386
            # If we are not sending an EOR, send a keepalive as soon as when finished
387
            # So the other routers knows that we have no (more) routes to send ...
388
            # (is that behaviour documented somewhere ??)
389
            for eor in self.new_keepalive('EOR'):
390
                yield _NOP
391
            yield _UPDATE
392
393
    def new_operational(self, operational, negotiated):
394
        for _ in self.write(operational, negotiated):
395
            yield _NOP
396
        log.debug('>> OPERATIONAL %s' % str(operational), self.connection.session())
397
        yield operational
398
399
    def new_refresh(self, refresh):
400
        for _ in self.write(refresh, None):
401
            yield _NOP
402
        log.debug('>> REFRESH %s' % str(refresh), self.connection.session())
403
        yield refresh
404