1
|
|
|
# encoding: utf-8 |
2
|
|
|
""" |
3
|
|
|
protocol.py |
4
|
|
|
|
5
|
|
|
Created by Thomas Mangin on 2009-08-25. |
6
|
|
|
Copyright (c) 2009-2017 Exa Networks. All rights reserved. |
7
|
|
|
License: 3-clause BSD. (See the COPYRIGHT file) |
8
|
|
|
""" |
9
|
|
|
|
10
|
|
|
import os |
11
|
|
|
|
12
|
|
|
from exabgp.vendoring import six |
13
|
|
|
import traceback |
14
|
|
|
|
15
|
|
|
# ================================================================ Registration |
16
|
|
|
# |
17
|
|
|
|
18
|
|
|
from exabgp.util import ordinal |
19
|
|
|
|
20
|
|
|
from exabgp.reactor.network.outgoing import Outgoing |
21
|
|
|
# from exabgp.reactor.network.error import NotifyError |
22
|
|
|
|
23
|
|
|
from exabgp.protocol.family import AFI |
24
|
|
|
from exabgp.protocol.family import SAFI |
25
|
|
|
from exabgp.bgp.message import Message |
26
|
|
|
from exabgp.bgp.message import NOP |
27
|
|
|
from exabgp.bgp.message import _NOP |
28
|
|
|
from exabgp.bgp.message import Open |
29
|
|
|
from exabgp.bgp.message.open import Version |
30
|
|
|
from exabgp.bgp.message.open.capability import Capabilities |
31
|
|
|
from exabgp.bgp.message.open.capability import Negotiated |
32
|
|
|
from exabgp.bgp.message import Update |
33
|
|
|
from exabgp.bgp.message import EOR |
34
|
|
|
from exabgp.bgp.message import KeepAlive |
35
|
|
|
from exabgp.bgp.message import Notification |
36
|
|
|
from exabgp.bgp.message import Notify |
37
|
|
|
from exabgp.bgp.message import Operational |
38
|
|
|
|
39
|
|
|
from exabgp.bgp.message.direction import IN |
40
|
|
|
from exabgp.bgp.message.update.attribute import Attribute |
41
|
|
|
|
42
|
|
|
from exabgp.protocol.ip import IP |
43
|
|
|
from exabgp.reactor.api.processes import ProcessError |
44
|
|
|
|
45
|
|
|
from exabgp.logger import Logger |
46
|
|
|
from exabgp.logger import FakeLogger |
47
|
|
|
|
48
|
|
|
# This is the number of chuncked message we are willing to buffer, not the number of routes |
49
|
|
|
MAX_BACKLOG = 15000 |
50
|
|
|
|
51
|
|
|
_UPDATE = Update([],b'') |
52
|
|
|
_OPERATIONAL = Operational(0x00) |
53
|
|
|
|
54
|
|
|
|
55
|
|
|
class Protocol (object): |
56
|
|
|
decode = True |
57
|
|
|
|
58
|
|
|
def __init__ (self, peer): |
59
|
|
|
try: |
60
|
|
|
self.logger = Logger() |
61
|
|
|
except RuntimeError: |
62
|
|
|
self.logger = FakeLogger() |
63
|
|
|
self.peer = peer |
64
|
|
|
self.neighbor = peer.neighbor |
65
|
|
|
self.negotiated = Negotiated(self.neighbor) |
66
|
|
|
self.connection = None |
67
|
|
|
|
68
|
|
|
if self.neighbor.connect: |
69
|
|
|
self.port = self.neighbor.connect |
70
|
|
|
elif os.environ.get('exabgp.tcp.port','').isdigit(): |
71
|
|
|
self.port = int(os.environ.get('exabgp.tcp.port')) |
72
|
|
|
elif os.environ.get('exabgp_tcp_port','').isdigit(): |
73
|
|
|
self.port = int(os.environ.get('exabgp_tcp_port')) |
74
|
|
|
else: |
75
|
|
|
self.port = 179 |
76
|
|
|
|
77
|
|
|
from exabgp.configuration.environment import environment |
78
|
|
|
self.log_routes = peer.neighbor.adj_rib_in or environment.settings().log.routes |
79
|
|
|
|
80
|
|
|
def fd (self): |
81
|
|
|
if self.connection is None: |
82
|
|
|
return -1 |
83
|
|
|
return self.connection.fd() |
84
|
|
|
|
85
|
|
|
# XXX: we use self.peer.neighbor.peer_address when we could use self.neighbor.peer_address |
86
|
|
|
|
87
|
|
|
def me (self, message): |
88
|
|
|
return "%s/%s %s" % (self.peer.neighbor.peer_address,self.peer.neighbor.peer_as,message) |
89
|
|
|
|
90
|
|
|
def accept (self, incoming): |
91
|
|
|
self.connection = incoming |
92
|
|
|
|
93
|
|
|
if self.peer.neighbor.api['neighbor-changes']: |
94
|
|
|
self.peer.reactor.processes.connected(self.peer.neighbor) |
95
|
|
|
|
96
|
|
|
# very important - as we use this function on __init__ |
97
|
|
|
return self |
98
|
|
|
|
99
|
|
|
def connect (self): |
100
|
|
|
# allows to test the protocol code using modified StringIO with a extra 'pending' function |
101
|
|
|
if self.connection: |
102
|
|
|
return |
103
|
|
|
|
104
|
|
|
local = self.neighbor.md5_ip.top() if not self.neighbor.auto_discovery else None |
105
|
|
|
peer = self.neighbor.peer_address.top() |
106
|
|
|
afi = self.neighbor.peer_address.afi |
107
|
|
|
md5 = self.neighbor.md5_password |
108
|
|
|
md5_base64 = self.neighbor.md5_base64 |
109
|
|
|
ttl_out = self.neighbor.ttl_out |
110
|
|
|
self.connection = Outgoing(afi,peer,local,self.port,md5,md5_base64,ttl_out) |
111
|
|
|
|
112
|
|
|
for connected in self.connection.establish(): |
113
|
|
|
yield False |
114
|
|
|
|
115
|
|
|
if self.peer.neighbor.api['neighbor-changes']: |
116
|
|
|
self.peer.reactor.processes.connected(self.peer.neighbor) |
117
|
|
|
|
118
|
|
|
if not local: |
119
|
|
|
self.neighbor.local_address = IP.create(self.connection.local) |
120
|
|
|
if self.neighbor.router_id is None and self.neighbor.local_address.afi == AFI.ipv4: |
121
|
|
|
self.neighbor.router_id = self.neighbor.local_address |
122
|
|
|
|
123
|
|
|
yield True |
124
|
|
|
|
125
|
|
|
def close (self, reason='protocol closed, reason unspecified'): |
126
|
|
|
if self.connection: |
127
|
|
|
self.logger.debug(reason,self.connection.session()) |
128
|
|
|
|
129
|
|
|
# must be first otherwise we could have a loop caused by the raise in the below |
130
|
|
|
self.connection.close() |
131
|
|
|
self.connection = None |
132
|
|
|
|
133
|
|
|
self.peer.stats['down'] = self.peer.stats.get('down',0) + 1 |
134
|
|
|
try: |
135
|
|
|
if self.peer.neighbor.api['neighbor-changes']: |
136
|
|
|
self.peer.reactor.processes.down(self.peer.neighbor,reason) |
137
|
|
|
except ProcessError: |
138
|
|
|
self.logger.debug('could not send notification of neighbor close to API',self.connection.session()) |
139
|
|
|
|
140
|
|
|
def _to_api (self,direction,message,raw): |
141
|
|
|
packets = self.neighbor.api['%s-packets' % direction] |
142
|
|
|
parsed = self.neighbor.api['%s-parsed' % direction] |
143
|
|
|
consolidate = self.neighbor.api['%s-consolidate' % direction] |
144
|
|
|
negotiated = self.negotiated if self.neighbor.api['negotiated'] else None |
145
|
|
|
|
146
|
|
|
if consolidate: |
147
|
|
|
if packets: |
148
|
|
|
self.peer.reactor.processes.message(self.peer.neighbor,direction,message,negotiated,raw[:19],raw[19:]) |
149
|
|
|
else: |
150
|
|
|
self.peer.reactor.processes.message(self.peer.neighbor,direction,message,negotiated,b'',b'') |
151
|
|
|
else: |
152
|
|
|
if packets: |
153
|
|
|
self.peer.reactor.processes.packets(self.peer.neighbor,direction,int(message.ID),negotiated,raw[:19],raw[19:]) |
154
|
|
|
if parsed: |
155
|
|
|
self.peer.reactor.processes.message(message.ID,self.peer.neighbor,direction,message,negotiated,b'',b'') |
156
|
|
|
|
157
|
|
|
def write (self, message, negotiated=None): |
158
|
|
|
raw = message.message(negotiated) |
159
|
|
|
|
160
|
|
|
code = 'send-%s' % Message.CODE.short(message.ID) |
161
|
|
|
self.peer.stats[code] = self.peer.stats.get(code,0) + 1 |
162
|
|
|
if self.neighbor.api.get(code,False): |
163
|
|
|
self._to_api('send',message,raw) |
164
|
|
|
|
165
|
|
|
for boolean in self.connection.writer(raw): |
166
|
|
|
yield boolean |
167
|
|
|
|
168
|
|
|
def send (self, raw): |
169
|
|
|
code = 'send-%s' % Message.CODE.short(ordinal(raw[18])) |
170
|
|
|
self.peer.stats[code] = self.peer.stats.get(code,0) + 1 |
171
|
|
|
if self.neighbor.api.get(code,False): |
172
|
|
|
message = Update.unpack_message(raw[19:],self.negotiated) |
173
|
|
|
self._to_api('send',message,raw) |
174
|
|
|
|
175
|
|
|
for boolean in self.connection.writer(raw): |
176
|
|
|
yield boolean |
177
|
|
|
|
178
|
|
|
# Read from network ....................................................... |
179
|
|
|
|
180
|
|
|
def read_message (self): |
181
|
|
|
# This will always be defined by the loop but scope leaking upset scrutinizer/pylint |
182
|
|
|
msg_id = None |
183
|
|
|
|
184
|
|
|
packets = self.neighbor.api['receive-packets'] |
185
|
|
|
consolidate = self.neighbor.api['receive-consolidate'] |
186
|
|
|
parsed = self.neighbor.api['receive-parsed'] |
187
|
|
|
|
188
|
|
|
body,header = b'',b'' # just because pylint/pylama are getting more clever |
189
|
|
|
|
190
|
|
|
for length,msg_id,header,body,notify in self.connection.reader(): |
191
|
|
|
# internal issue |
192
|
|
|
if notify: |
193
|
|
|
code = 'receive-%s' % Message.CODE.NOTIFICATION.SHORT |
194
|
|
|
if self.neighbor.api.get(code,False): |
195
|
|
|
if consolidate: |
196
|
|
|
self.peer.reactor.processes.notification(self.peer.neighbor,'receive',notify.code,notify.subcode,str(notify),None,header,body) |
197
|
|
|
elif parsed: |
198
|
|
|
self.peer.reactor.processes.notification(self.peer.neighbor,'receive',notify.code,notify.subcode,str(notify),None,b'',b'') |
199
|
|
|
elif packets: |
200
|
|
|
self.peer.reactor.processes.packets(self.peer.neighbor,'receive',msg_id,None,header,body) |
201
|
|
|
# XXX: is notify not already Notify class ? |
202
|
|
|
raise Notify(notify.code,notify.subcode,str(notify)) |
203
|
|
|
|
204
|
|
|
if not length: |
205
|
|
|
yield _NOP |
206
|
|
|
continue |
207
|
|
|
|
208
|
|
|
self.logger.debug('<< message of type %s' % Message.CODE.name(msg_id),self.connection.session()) |
209
|
|
|
|
210
|
|
|
code = 'receive-%s' % Message.CODE.short(msg_id) |
211
|
|
|
self.peer.stats[code] = self.peer.stats.get(code,0) + 1 |
212
|
|
|
for_api = self.neighbor.api.get(code,False) |
213
|
|
|
|
214
|
|
|
if for_api and packets and not consolidate: |
215
|
|
|
negotiated = self.negotiated if self.neighbor.api.get('negotiated',False) else None |
216
|
|
|
self.peer.reactor.processes.packets(self.peer.neighbor,'receive',msg_id,negotiated,header,body) |
217
|
|
|
|
218
|
|
|
if msg_id == Message.CODE.UPDATE: |
219
|
|
|
if not self.neighbor.adj_rib_in and not (for_api or self.log_routes) and not (parsed or consolidate): |
220
|
|
|
yield _UPDATE |
221
|
|
|
return |
222
|
|
|
|
223
|
|
|
try: |
224
|
|
|
message = Message.unpack(msg_id,body,self.negotiated) |
225
|
|
|
except (KeyboardInterrupt,SystemExit,Notify): |
226
|
|
|
raise |
227
|
|
|
except Exception as exc: |
228
|
|
|
self.logger.debug('could not decode message "%d"' % msg_id,self.connection.session()) |
229
|
|
|
self.logger.debug('%s' % str(exc),self.connection.session()) |
230
|
|
|
self.logger.debug(traceback.format_exc(),self.connection.session()) |
231
|
|
|
raise Notify(1,0,'can not decode update message of type "%d"' % msg_id) |
232
|
|
|
# raise Notify(5,0,'unknown message received') |
233
|
|
|
|
234
|
|
|
if message.TYPE == Update.TYPE: |
235
|
|
|
if Attribute.CODE.INTERNAL_TREAT_AS_WITHDRAW in message.attributes: |
236
|
|
|
for nlri in message.nlris: |
237
|
|
|
nlri.action = IN.WITHDRAWN |
238
|
|
|
|
239
|
|
|
if for_api: |
240
|
|
|
negotiated = self.negotiated if self.neighbor.api.get('negotiated',False) else None |
241
|
|
|
if consolidate: |
242
|
|
|
self.peer.reactor.processes.message(msg_id,self.neighbor,'receive',message,negotiated,header,body) |
243
|
|
|
elif parsed: |
244
|
|
|
self.peer.reactor.processes.message(msg_id,self.neighbor,'receive',message,negotiated,b'',b'') |
245
|
|
|
|
246
|
|
|
if message.TYPE == Notification.TYPE: |
247
|
|
|
raise message |
248
|
|
|
|
249
|
|
|
if message.TYPE == Update.TYPE and Attribute.CODE.INTERNAL_DISCARD in message.attributes: |
250
|
|
|
yield _NOP |
251
|
|
|
else: |
252
|
|
|
yield message |
253
|
|
|
|
254
|
|
|
def validate_open (self): |
255
|
|
|
error = self.negotiated.validate(self.neighbor) |
256
|
|
|
if error is not None: |
257
|
|
|
raise Notify(*error) |
258
|
|
|
|
259
|
|
|
if self.neighbor.api['negotiated']: |
260
|
|
|
self.peer.reactor.processes.negotiated(self.peer.neighbor,self.negotiated) |
261
|
|
|
|
262
|
|
|
if self.negotiated.mismatch: |
263
|
|
|
self.logger.warning('--------------------------------------------------------------------',self.connection.session()) |
264
|
|
|
self.logger.warning('the connection can not carry the following family/families',self.connection.session()) |
265
|
|
|
for reason,(afi,safi) in self.negotiated.mismatch: |
266
|
|
|
self.logger.warning(' - %s is not configured for %s/%s' % (reason,afi,safi),self.connection.session()) |
267
|
|
|
self.logger.warning('therefore no routes of this kind can be announced on the connection',self.connection.session()) |
268
|
|
|
self.logger.warning('--------------------------------------------------------------------',self.connection.session()) |
269
|
|
|
|
270
|
|
|
def read_open (self, ip): |
271
|
|
|
for received_open in self.read_message(): |
272
|
|
|
if received_open.TYPE == NOP.TYPE: |
273
|
|
|
yield received_open |
274
|
|
|
else: |
275
|
|
|
break |
276
|
|
|
|
277
|
|
|
if received_open.TYPE != Open.TYPE: |
|
|
|
|
278
|
|
|
raise Notify(5,1,'The first packet received is not an open message (%s)' % received_open) |
279
|
|
|
|
280
|
|
|
self.logger.debug('<< %s' % received_open,self.connection.session()) |
281
|
|
|
yield received_open |
282
|
|
|
|
283
|
|
|
def read_keepalive (self): |
284
|
|
|
for message in self.read_message(): |
285
|
|
|
if message.TYPE == NOP.TYPE: |
286
|
|
|
yield message |
287
|
|
|
else: |
288
|
|
|
break |
289
|
|
|
|
290
|
|
|
if message.TYPE != KeepAlive.TYPE: |
|
|
|
|
291
|
|
|
raise Notify(5,2) |
292
|
|
|
|
293
|
|
|
yield message |
294
|
|
|
|
295
|
|
|
# |
296
|
|
|
# Sending message to peer |
297
|
|
|
# |
298
|
|
|
|
299
|
|
|
def new_open (self): |
300
|
|
|
if self.neighbor.local_as: |
301
|
|
|
local_as = self.neighbor.local_as |
302
|
|
|
elif self.negotiated.received_open: |
303
|
|
|
local_as = self.negotiated.received_open.asn |
304
|
|
|
else: |
305
|
|
|
raise RuntimeError('no ASN available for the OPEN message') |
306
|
|
|
|
307
|
|
|
sent_open = Open( |
308
|
|
|
Version(4), |
309
|
|
|
local_as, |
310
|
|
|
self.neighbor.hold_time, |
311
|
|
|
self.neighbor.router_id, |
312
|
|
|
Capabilities().new(self.neighbor,self.peer._restarted) |
313
|
|
|
) |
314
|
|
|
|
315
|
|
|
# we do not buffer open message in purpose |
316
|
|
|
for _ in self.write(sent_open): |
317
|
|
|
yield _NOP |
318
|
|
|
|
319
|
|
|
self.logger.debug('>> %s' % sent_open,self.connection.session()) |
320
|
|
|
yield sent_open |
321
|
|
|
|
322
|
|
|
def new_keepalive (self, comment=''): |
323
|
|
|
keepalive = KeepAlive() |
324
|
|
|
|
325
|
|
|
for _ in self.write(keepalive): |
326
|
|
|
yield _NOP |
327
|
|
|
|
328
|
|
|
self.logger.debug('>> KEEPALIVE%s' % (' (%s)' % comment if comment else ''),self.connection.session()) |
329
|
|
|
|
330
|
|
|
yield keepalive |
331
|
|
|
|
332
|
|
|
def new_notification (self, notification): |
333
|
|
|
for _ in self.write(notification): |
334
|
|
|
yield _NOP |
335
|
|
|
self.logger.debug('>> NOTIFICATION (%d,%d,"%s")' % (notification.code,notification.subcode,notification.data.decode('utf-8')),self.connection.session()) |
336
|
|
|
yield notification |
337
|
|
|
|
338
|
|
|
def new_update (self, include_withdraw): |
339
|
|
|
updates = self.neighbor.rib.outgoing.updates(self.neighbor.group_updates) |
340
|
|
|
number = 0 |
341
|
|
|
for update in updates: |
342
|
|
|
for message in update.messages(self.negotiated,include_withdraw): |
343
|
|
|
number += 1 |
344
|
|
|
for boolean in self.send(message): |
345
|
|
|
# boolean is a transient network error we already announced |
346
|
|
|
yield _NOP |
347
|
|
|
if number: |
348
|
|
|
self.logger.debug('>> %d UPDATE(s)' % number,self.connection.session()) |
349
|
|
|
yield _UPDATE |
350
|
|
|
|
351
|
|
|
def new_eor (self, afi, safi): |
352
|
|
|
eor = EOR(afi,safi) |
353
|
|
|
for _ in self.write(eor): |
354
|
|
|
yield _NOP |
355
|
|
|
self.logger.debug('>> EOR %s %s' % (afi,safi),self.connection.session()) |
356
|
|
|
yield eor |
357
|
|
|
|
358
|
|
|
def new_eors (self, afi=AFI.undefined,safi=SAFI.undefined): |
359
|
|
|
# Send EOR to let our peer know he can perform a RIB update |
360
|
|
|
if self.negotiated.families: |
361
|
|
|
families = self.negotiated.families if (afi,safi) == (AFI.undefined,SAFI.undefined) else [(afi,safi),] |
362
|
|
|
for eor_afi,eor_safi in families: |
363
|
|
|
for _ in self.new_eor(eor_afi,eor_safi): |
364
|
|
|
yield _ |
365
|
|
|
else: |
366
|
|
|
# If we are not sending an EOR, send a keepalive as soon as when finished |
367
|
|
|
# So the other routers knows that we have no (more) routes to send ... |
368
|
|
|
# (is that behaviour documented somewhere ??) |
369
|
|
|
for eor in self.new_keepalive('EOR'): |
370
|
|
|
yield _NOP |
371
|
|
|
yield _UPDATE |
372
|
|
|
|
373
|
|
|
def new_operational (self, operational, negotiated): |
374
|
|
|
for _ in self.write(operational,negotiated): |
375
|
|
|
yield _NOP |
376
|
|
|
self.logger.debug('>> OPERATIONAL %s' % str(operational),self.connection.session()) |
377
|
|
|
yield operational |
378
|
|
|
|
379
|
|
|
def new_refresh (self, refresh): |
380
|
|
|
for _ in self.write(refresh,None): |
381
|
|
|
yield _NOP |
382
|
|
|
self.logger.debug('>> REFRESH %s' % str(refresh),self.connection.session()) |
383
|
|
|
yield refresh |
384
|
|
|
|