1
|
|
|
# encoding: utf-8 |
2
|
|
|
""" |
3
|
|
|
protocol.py |
4
|
|
|
|
5
|
|
|
Created by Thomas Mangin on 2009-08-25. |
6
|
|
|
Copyright (c) 2009-2017 Exa Networks. All rights reserved. |
7
|
|
|
License: 3-clause BSD. (See the COPYRIGHT file) |
8
|
|
|
""" |
9
|
|
|
|
10
|
|
|
import os |
11
|
|
|
|
12
|
|
|
import traceback |
13
|
|
|
|
14
|
|
|
# ================================================================ Registration |
15
|
|
|
# |
16
|
|
|
|
17
|
|
|
from exabgp.reactor.network.outgoing import Outgoing |
18
|
|
|
|
19
|
|
|
# from exabgp.reactor.network.error import NotifyError |
20
|
|
|
|
21
|
|
|
from exabgp.protocol.family import AFI |
22
|
|
|
from exabgp.protocol.family import SAFI |
23
|
|
|
from exabgp.bgp.message import Message |
24
|
|
|
from exabgp.bgp.message import NOP |
25
|
|
|
from exabgp.bgp.message import _NOP |
26
|
|
|
from exabgp.bgp.message import Open |
27
|
|
|
from exabgp.bgp.message.open import Version |
28
|
|
|
from exabgp.bgp.message.open.capability import Capabilities |
29
|
|
|
from exabgp.bgp.message.open.capability import Negotiated |
30
|
|
|
from exabgp.bgp.message import Update |
31
|
|
|
from exabgp.bgp.message import EOR |
32
|
|
|
from exabgp.bgp.message import KeepAlive |
33
|
|
|
from exabgp.bgp.message import Notification |
34
|
|
|
from exabgp.bgp.message import Notify |
35
|
|
|
from exabgp.bgp.message import Operational |
36
|
|
|
|
37
|
|
|
from exabgp.bgp.message.direction import IN |
38
|
|
|
from exabgp.bgp.message.update.attribute import Attribute |
39
|
|
|
|
40
|
|
|
from exabgp.protocol.ip import IP |
41
|
|
|
from exabgp.reactor.api.processes import ProcessError |
42
|
|
|
|
43
|
|
|
from exabgp.logger import log |
44
|
|
|
|
45
|
|
|
# This is the number of chuncked message we are willing to buffer, not the number of routes |
46
|
|
|
MAX_BACKLOG = 15000 |
47
|
|
|
|
48
|
|
|
_UPDATE = Update([], b'') |
49
|
|
|
_OPERATIONAL = Operational(0x00) |
50
|
|
|
|
51
|
|
|
|
52
|
|
|
class Protocol(object): |
53
|
|
|
decode = True |
54
|
|
|
|
55
|
|
|
def __init__(self, peer): |
56
|
|
|
self.peer = peer |
57
|
|
|
self.neighbor = peer.neighbor |
58
|
|
|
self.negotiated = Negotiated(self.neighbor) |
59
|
|
|
self.connection = None |
60
|
|
|
|
61
|
|
|
if self.neighbor.connect: |
62
|
|
|
self.port = self.neighbor.connect |
63
|
|
|
elif os.environ.get('exabgp.tcp.port', '').isdigit(): |
64
|
|
|
self.port = int(os.environ.get('exabgp.tcp.port')) |
65
|
|
|
elif os.environ.get('exabgp_tcp_port', '').isdigit(): |
66
|
|
|
self.port = int(os.environ.get('exabgp_tcp_port')) |
67
|
|
|
else: |
68
|
|
|
self.port = 179 |
69
|
|
|
|
70
|
|
|
from exabgp.environment import getenv |
71
|
|
|
|
72
|
|
|
self.log_routes = peer.neighbor.adj_rib_in or getenv().log.routes |
73
|
|
|
|
74
|
|
|
def fd(self): |
75
|
|
|
if self.connection is None: |
76
|
|
|
return -1 |
77
|
|
|
return self.connection.fd() |
78
|
|
|
|
79
|
|
|
# XXX: we use self.peer.neighbor.peer_address when we could use self.neighbor.peer_address |
80
|
|
|
|
81
|
|
|
def me(self, message): |
82
|
|
|
return "%s/%s %s" % (self.peer.neighbor.peer_address, self.peer.neighbor.peer_as, message) |
83
|
|
|
|
84
|
|
|
def accept(self, incoming): |
85
|
|
|
self.connection = incoming |
86
|
|
|
|
87
|
|
|
if self.peer.neighbor.api['neighbor-changes']: |
88
|
|
|
self.peer.reactor.processes.connected(self.peer.neighbor) |
89
|
|
|
|
90
|
|
|
# very important - as we use this function on __init__ |
91
|
|
|
return self |
92
|
|
|
|
93
|
|
|
def connect(self): |
94
|
|
|
# allows to test the protocol code using modified StringIO with a extra 'pending' function |
95
|
|
|
if self.connection: |
96
|
|
|
return |
97
|
|
|
|
98
|
|
|
local = self.neighbor.md5_ip.top() if not self.neighbor.auto_discovery else None |
99
|
|
|
peer = self.neighbor.peer_address.top() |
100
|
|
|
afi = self.neighbor.peer_address.afi |
101
|
|
|
md5 = self.neighbor.md5_password |
102
|
|
|
md5_base64 = self.neighbor.md5_base64 |
103
|
|
|
ttl_out = self.neighbor.ttl_out |
104
|
|
|
self.connection = Outgoing(afi, peer, local, self.port, md5, md5_base64, ttl_out) |
105
|
|
|
|
106
|
|
|
for connected in self.connection.establish(): |
107
|
|
|
yield False |
108
|
|
|
|
109
|
|
|
if self.peer.neighbor.api['neighbor-changes']: |
110
|
|
|
self.peer.reactor.processes.connected(self.peer.neighbor) |
111
|
|
|
|
112
|
|
|
if not local: |
113
|
|
|
self.neighbor.local_address = IP.create(self.connection.local) |
114
|
|
|
if self.neighbor.router_id is None and self.neighbor.local_address.afi == AFI.ipv4: |
115
|
|
|
self.neighbor.router_id = self.neighbor.local_address |
116
|
|
|
|
117
|
|
|
yield True |
118
|
|
|
|
119
|
|
|
def close(self, reason='protocol closed, reason unspecified'): |
120
|
|
|
if self.connection: |
121
|
|
|
log.debug(reason, self.connection.session()) |
122
|
|
|
|
123
|
|
|
# must be first otherwise we could have a loop caused by the raise in the below |
124
|
|
|
self.connection.close() |
125
|
|
|
self.connection = None |
126
|
|
|
|
127
|
|
|
self.peer.stats['down'] = self.peer.stats.get('down', 0) + 1 |
128
|
|
|
try: |
129
|
|
|
if self.peer.neighbor.api['neighbor-changes']: |
130
|
|
|
self.peer.reactor.processes.down(self.peer.neighbor, reason) |
131
|
|
|
except ProcessError: |
132
|
|
|
log.debug('could not send notification of neighbor close to API', self.connection.session()) |
133
|
|
|
|
134
|
|
|
def _to_api(self, direction, message, raw): |
135
|
|
|
packets = self.neighbor.api['%s-packets' % direction] |
136
|
|
|
parsed = self.neighbor.api['%s-parsed' % direction] |
137
|
|
|
consolidate = self.neighbor.api['%s-consolidate' % direction] |
138
|
|
|
negotiated = self.negotiated if self.neighbor.api['negotiated'] else None |
139
|
|
|
|
140
|
|
|
if consolidate: |
141
|
|
|
if packets: |
142
|
|
|
self.peer.reactor.processes.message( |
143
|
|
|
message.ID, self.peer.neighbor, direction, message, negotiated, raw[:19], raw[19:] |
144
|
|
|
) |
145
|
|
|
else: |
146
|
|
|
self.peer.reactor.processes.message( |
147
|
|
|
message.ID, self.peer.neighbor, direction, message, negotiated, b'', b'' |
148
|
|
|
) |
149
|
|
|
else: |
150
|
|
|
if packets: |
151
|
|
|
self.peer.reactor.processes.packets( |
152
|
|
|
self.peer.neighbor, direction, int(message.ID), negotiated, raw[:19], raw[19:] |
153
|
|
|
) |
154
|
|
|
if parsed: |
155
|
|
|
self.peer.reactor.processes.message( |
156
|
|
|
message.ID, self.peer.neighbor, direction, message, negotiated, b'', b'' |
157
|
|
|
) |
158
|
|
|
|
159
|
|
|
def write(self, message, negotiated=None): |
160
|
|
|
raw = message.message(negotiated) |
161
|
|
|
|
162
|
|
|
code = 'send-%s' % Message.CODE.short(message.ID) |
163
|
|
|
self.peer.stats[code] = self.peer.stats.get(code, 0) + 1 |
164
|
|
|
if self.neighbor.api.get(code, False): |
165
|
|
|
self._to_api('send', message, raw) |
166
|
|
|
|
167
|
|
|
for boolean in self.connection.writer(raw): |
168
|
|
|
yield boolean |
169
|
|
|
|
170
|
|
|
def send(self, raw): |
171
|
|
|
code = 'send-%s' % Message.CODE.short(raw[18]) |
172
|
|
|
self.peer.stats[code] = self.peer.stats.get(code, 0) + 1 |
173
|
|
|
if self.neighbor.api.get(code, False): |
174
|
|
|
message = Update.unpack_message(raw[19:], self.negotiated) |
175
|
|
|
self._to_api('send', message, raw) |
176
|
|
|
|
177
|
|
|
for boolean in self.connection.writer(raw): |
178
|
|
|
yield boolean |
179
|
|
|
|
180
|
|
|
# Read from network ....................................................... |
181
|
|
|
|
182
|
|
|
def read_message(self): |
183
|
|
|
# This will always be defined by the loop but scope leaking upset scrutinizer/pylint |
184
|
|
|
msg_id = None |
185
|
|
|
|
186
|
|
|
packets = self.neighbor.api['receive-packets'] |
187
|
|
|
consolidate = self.neighbor.api['receive-consolidate'] |
188
|
|
|
parsed = self.neighbor.api['receive-parsed'] |
189
|
|
|
|
190
|
|
|
body, header = b'', b'' # just because pylint/pylama are getting more clever |
191
|
|
|
|
192
|
|
|
for length, msg_id, header, body, notify in self.connection.reader(): |
193
|
|
|
# internal issue |
194
|
|
|
if notify: |
195
|
|
|
code = 'receive-%s' % Message.CODE.NOTIFICATION.SHORT |
196
|
|
|
if self.neighbor.api.get(code, False): |
197
|
|
|
if consolidate: |
198
|
|
|
self.peer.reactor.processes.notification( |
199
|
|
|
self.peer.neighbor, 'receive', notify.code, notify.subcode, str(notify), None, header, body |
200
|
|
|
) |
201
|
|
|
elif parsed: |
202
|
|
|
self.peer.reactor.processes.notification( |
203
|
|
|
self.peer.neighbor, 'receive', notify.code, notify.subcode, str(notify), None, b'', b'' |
204
|
|
|
) |
205
|
|
|
elif packets: |
206
|
|
|
self.peer.reactor.processes.packets(self.peer.neighbor, 'receive', msg_id, None, header, body) |
207
|
|
|
# XXX: is notify not already Notify class ? |
208
|
|
|
raise Notify(notify.code, notify.subcode, str(notify)) |
209
|
|
|
|
210
|
|
|
if not length: |
211
|
|
|
yield _NOP |
212
|
|
|
continue |
213
|
|
|
|
214
|
|
|
log.debug('<< message of type %s' % Message.CODE.name(msg_id), self.connection.session()) |
215
|
|
|
|
216
|
|
|
code = 'receive-%s' % Message.CODE.short(msg_id) |
217
|
|
|
self.peer.stats[code] = self.peer.stats.get(code, 0) + 1 |
218
|
|
|
for_api = self.neighbor.api.get(code, False) |
219
|
|
|
|
220
|
|
|
if for_api and packets and not consolidate: |
221
|
|
|
negotiated = self.negotiated if self.neighbor.api.get('negotiated', False) else None |
222
|
|
|
self.peer.reactor.processes.packets(self.peer.neighbor, 'receive', msg_id, negotiated, header, body) |
223
|
|
|
|
224
|
|
|
if msg_id == Message.CODE.UPDATE: |
225
|
|
|
if not self.neighbor.adj_rib_in and not (for_api or self.log_routes) and not (parsed or consolidate): |
226
|
|
|
yield _UPDATE |
227
|
|
|
return |
228
|
|
|
|
229
|
|
|
try: |
230
|
|
|
message = Message.unpack(msg_id, body, self.negotiated) |
231
|
|
|
except (KeyboardInterrupt, SystemExit, Notify): |
232
|
|
|
raise |
233
|
|
|
except Exception as exc: |
234
|
|
|
log.debug('could not decode message "%d"' % msg_id, self.connection.session()) |
235
|
|
|
log.debug('%s' % str(exc), self.connection.session()) |
236
|
|
|
log.debug(traceback.format_exc(), self.connection.session()) |
237
|
|
|
raise Notify(1, 0, 'can not decode update message of type "%d"' % msg_id) |
238
|
|
|
# raise Notify(5,0,'unknown message received') |
239
|
|
|
|
240
|
|
|
if message.TYPE == Update.TYPE: |
241
|
|
|
if Attribute.CODE.INTERNAL_TREAT_AS_WITHDRAW in message.attributes: |
242
|
|
|
for nlri in message.nlris: |
243
|
|
|
nlri.action = IN.WITHDRAWN |
244
|
|
|
|
245
|
|
|
if for_api: |
246
|
|
|
negotiated = self.negotiated if self.neighbor.api.get('negotiated', False) else None |
247
|
|
|
if consolidate: |
248
|
|
|
self.peer.reactor.processes.message( |
249
|
|
|
msg_id, self.neighbor, 'receive', message, negotiated, header, body |
250
|
|
|
) |
251
|
|
|
elif parsed: |
252
|
|
|
self.peer.reactor.processes.message(msg_id, self.neighbor, 'receive', message, negotiated, b'', b'') |
253
|
|
|
|
254
|
|
|
if message.TYPE == Notification.TYPE: |
255
|
|
|
raise message |
256
|
|
|
|
257
|
|
|
if message.TYPE == Update.TYPE and Attribute.CODE.INTERNAL_DISCARD in message.attributes: |
258
|
|
|
yield _NOP |
259
|
|
|
else: |
260
|
|
|
yield message |
261
|
|
|
|
262
|
|
|
def validate_open(self): |
263
|
|
|
error = self.negotiated.validate(self.neighbor) |
264
|
|
|
if error is not None: |
265
|
|
|
raise Notify(*error) |
266
|
|
|
|
267
|
|
|
if self.neighbor.api['negotiated']: |
268
|
|
|
self.peer.reactor.processes.negotiated(self.peer.neighbor, self.negotiated) |
269
|
|
|
|
270
|
|
|
if self.negotiated.mismatch: |
271
|
|
|
log.warning( |
272
|
|
|
'--------------------------------------------------------------------', self.connection.session() |
273
|
|
|
) |
274
|
|
|
log.warning('the connection can not carry the following family/families', self.connection.session()) |
275
|
|
|
for reason, (afi, safi) in self.negotiated.mismatch: |
276
|
|
|
log.warning(' - %s is not configured for %s/%s' % (reason, afi, safi), self.connection.session()) |
277
|
|
|
log.warning( |
278
|
|
|
'therefore no routes of this kind can be announced on the connection', self.connection.session() |
279
|
|
|
) |
280
|
|
|
log.warning( |
281
|
|
|
'--------------------------------------------------------------------', self.connection.session() |
282
|
|
|
) |
283
|
|
|
|
284
|
|
|
def read_open(self, ip): |
285
|
|
|
for received_open in self.read_message(): |
286
|
|
|
if received_open.TYPE == NOP.TYPE: |
287
|
|
|
yield received_open |
288
|
|
|
else: |
289
|
|
|
break |
290
|
|
|
|
291
|
|
|
if received_open.TYPE != Open.TYPE: |
|
|
|
|
292
|
|
|
raise Notify(5, 1, 'The first packet received is not an open message (%s)' % received_open) |
293
|
|
|
|
294
|
|
|
log.debug('<< %s' % received_open, self.connection.session()) |
295
|
|
|
yield received_open |
296
|
|
|
|
297
|
|
|
def read_keepalive(self): |
298
|
|
|
for message in self.read_message(): |
299
|
|
|
if message.TYPE == NOP.TYPE: |
300
|
|
|
yield message |
301
|
|
|
else: |
302
|
|
|
break |
303
|
|
|
|
304
|
|
|
if message.TYPE != KeepAlive.TYPE: |
|
|
|
|
305
|
|
|
raise Notify(5, 2) |
306
|
|
|
|
307
|
|
|
yield message |
308
|
|
|
|
309
|
|
|
# |
310
|
|
|
# Sending message to peer |
311
|
|
|
# |
312
|
|
|
|
313
|
|
|
def new_open(self): |
314
|
|
|
if self.neighbor.local_as: |
315
|
|
|
local_as = self.neighbor.local_as |
316
|
|
|
elif self.negotiated.received_open: |
317
|
|
|
local_as = self.negotiated.received_open.asn |
318
|
|
|
else: |
319
|
|
|
raise RuntimeError('no ASN available for the OPEN message') |
320
|
|
|
|
321
|
|
|
sent_open = Open( |
322
|
|
|
Version(4), |
323
|
|
|
local_as, |
324
|
|
|
self.neighbor.hold_time, |
325
|
|
|
self.neighbor.router_id, |
326
|
|
|
Capabilities().new(self.neighbor, self.peer._restarted), |
327
|
|
|
) |
328
|
|
|
|
329
|
|
|
# we do not buffer open message in purpose |
330
|
|
|
for _ in self.write(sent_open): |
331
|
|
|
yield _NOP |
332
|
|
|
|
333
|
|
|
log.debug('>> %s' % sent_open, self.connection.session()) |
334
|
|
|
yield sent_open |
335
|
|
|
|
336
|
|
|
def new_keepalive(self, comment=''): |
337
|
|
|
keepalive = KeepAlive() |
338
|
|
|
|
339
|
|
|
for _ in self.write(keepalive): |
340
|
|
|
yield _NOP |
341
|
|
|
|
342
|
|
|
log.debug('>> KEEPALIVE%s' % (' (%s)' % comment if comment else ''), self.connection.session()) |
343
|
|
|
|
344
|
|
|
yield keepalive |
345
|
|
|
|
346
|
|
|
def new_notification(self, notification): |
347
|
|
|
for _ in self.write(notification): |
348
|
|
|
yield _NOP |
349
|
|
|
log.debug( |
350
|
|
|
'>> NOTIFICATION (%d,%d,"%s")' |
351
|
|
|
% (notification.code, notification.subcode, notification.data.decode('utf-8')), |
352
|
|
|
self.connection.session(), |
353
|
|
|
) |
354
|
|
|
yield notification |
355
|
|
|
|
356
|
|
|
def new_update(self, include_withdraw): |
357
|
|
|
updates = self.neighbor.rib.outgoing.updates(self.neighbor.group_updates) |
358
|
|
|
number = 0 |
359
|
|
|
for update in updates: |
360
|
|
|
for message in update.messages(self.negotiated, include_withdraw): |
361
|
|
|
number += 1 |
362
|
|
|
for boolean in self.send(message): |
363
|
|
|
# boolean is a transient network error we already announced |
364
|
|
|
yield _NOP |
365
|
|
|
if number: |
366
|
|
|
log.debug('>> %d UPDATE(s)' % number, self.connection.session()) |
367
|
|
|
yield _UPDATE |
368
|
|
|
|
369
|
|
|
def new_eor(self, afi, safi): |
370
|
|
|
eor = EOR(afi, safi) |
371
|
|
|
for _ in self.write(eor): |
372
|
|
|
yield _NOP |
373
|
|
|
log.debug('>> EOR %s %s' % (afi, safi), self.connection.session()) |
374
|
|
|
yield eor |
375
|
|
|
|
376
|
|
|
def new_eors(self, afi=AFI.undefined, safi=SAFI.undefined): |
377
|
|
|
# Send EOR to let our peer know he can perform a RIB update |
378
|
|
|
if self.negotiated.families: |
379
|
|
|
families = ( |
380
|
|
|
self.negotiated.families if (afi, safi) == (AFI.undefined, SAFI.undefined) else [(afi, safi),] |
381
|
|
|
) |
382
|
|
|
for eor_afi, eor_safi in families: |
383
|
|
|
for _ in self.new_eor(eor_afi, eor_safi): |
384
|
|
|
yield _ |
385
|
|
|
else: |
386
|
|
|
# If we are not sending an EOR, send a keepalive as soon as when finished |
387
|
|
|
# So the other routers knows that we have no (more) routes to send ... |
388
|
|
|
# (is that behaviour documented somewhere ??) |
389
|
|
|
for eor in self.new_keepalive('EOR'): |
390
|
|
|
yield _NOP |
391
|
|
|
yield _UPDATE |
392
|
|
|
|
393
|
|
|
def new_operational(self, operational, negotiated): |
394
|
|
|
for _ in self.write(operational, negotiated): |
395
|
|
|
yield _NOP |
396
|
|
|
log.debug('>> OPERATIONAL %s' % str(operational), self.connection.session()) |
397
|
|
|
yield operational |
398
|
|
|
|
399
|
|
|
def new_refresh(self, refresh): |
400
|
|
|
for _ in self.write(refresh, None): |
401
|
|
|
yield _NOP |
402
|
|
|
log.debug('>> REFRESH %s' % str(refresh), self.connection.session()) |
403
|
|
|
yield refresh |
404
|
|
|
|