Completed
Push — master ( d75605...cff8bf )
by Thomas
12:23
created

exabgp.reactor.protocol.Protocol.write()   A

Complexity

Conditions 3

Size

Total Lines 10
Code Lines 8

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 3
eloc 8
nop 3
dl 0
loc 10
rs 10
c 0
b 0
f 0
1
# encoding: utf-8
2
"""
3
protocol.py
4
5
Created by Thomas Mangin on 2009-08-25.
6
Copyright (c) 2009-2017 Exa Networks. All rights reserved.
7
License: 3-clause BSD. (See the COPYRIGHT file)
8
"""
9
10
import os
11
12
from exabgp.vendoring import six
13
import traceback
14
15
# ================================================================ Registration
16
#
17
18
from exabgp.util import ordinal
19
20
from exabgp.reactor.network.outgoing import Outgoing
21
# from exabgp.reactor.network.error import NotifyError
22
23
from exabgp.protocol.family import AFI
24
from exabgp.protocol.family import SAFI
25
from exabgp.bgp.message import Message
26
from exabgp.bgp.message import NOP
27
from exabgp.bgp.message import _NOP
28
from exabgp.bgp.message import Open
29
from exabgp.bgp.message.open import Version
30
from exabgp.bgp.message.open.capability import Capabilities
31
from exabgp.bgp.message.open.capability import Negotiated
32
from exabgp.bgp.message import Update
33
from exabgp.bgp.message import EOR
34
from exabgp.bgp.message import KeepAlive
35
from exabgp.bgp.message import Notification
36
from exabgp.bgp.message import Notify
37
from exabgp.bgp.message import Operational
38
39
from exabgp.bgp.message.direction import IN
40
from exabgp.bgp.message.update.attribute import Attribute
41
42
from exabgp.protocol.ip import IP
43
from exabgp.reactor.api.processes import ProcessError
44
45
from exabgp.logger import Logger
46
from exabgp.logger import FakeLogger
47
48
# This is the number of chuncked message we are willing to buffer, not the number of routes
49
MAX_BACKLOG = 15000
50
51
_UPDATE = Update([],b'')
52
_OPERATIONAL = Operational(0x00)
53
54
55
class Protocol (object):
56
	decode = True
57
58
	def __init__ (self, peer):
59
		try:
60
			self.logger = Logger()
61
		except RuntimeError:
62
			self.logger = FakeLogger()
63
		self.peer = peer
64
		self.neighbor = peer.neighbor
65
		self.negotiated = Negotiated(self.neighbor)
66
		self.connection = None
67
68
		if self.neighbor.connect:
69
			self.port = self.neighbor.connect
70
		elif os.environ.get('exabgp.tcp.port','').isdigit():
71
			self.port = int(os.environ.get('exabgp.tcp.port'))
72
		elif os.environ.get('exabgp_tcp_port','').isdigit():
73
			self.port = int(os.environ.get('exabgp_tcp_port'))
74
		else:
75
			self.port = 179
76
77
		from exabgp.configuration.environment import environment
78
		self.log_routes = peer.neighbor.adj_rib_in or environment.settings().log.routes
79
80
	def fd (self):
81
		if self.connection is None:
82
			return -1
83
		return self.connection.fd()
84
85
	# XXX: we use self.peer.neighbor.peer_address when we could use self.neighbor.peer_address
86
87
	def me (self, message):
88
		return "%s/%s %s" % (self.peer.neighbor.peer_address,self.peer.neighbor.peer_as,message)
89
90
	def accept (self, incoming):
91
		self.connection = incoming
92
93
		if self.peer.neighbor.api['neighbor-changes']:
94
			self.peer.reactor.processes.connected(self.peer.neighbor)
95
96
		# very important - as we use this function on __init__
97
		return self
98
99
	def connect (self):
100
		# allows to test the protocol code using modified StringIO with a extra 'pending' function
101
		if self.connection:
102
			return
103
104
		local = self.neighbor.md5_ip.top() if not self.neighbor.auto_discovery else None
105
		peer = self.neighbor.peer_address.top()
106
		afi = self.neighbor.peer_address.afi
107
		md5 = self.neighbor.md5_password
108
		md5_base64 = self.neighbor.md5_base64
109
		ttl_out = self.neighbor.ttl_out
110
		self.connection = Outgoing(afi,peer,local,self.port,md5,md5_base64,ttl_out)
111
112
		for connected in self.connection.establish():
113
			yield False
114
115
		if self.peer.neighbor.api['neighbor-changes']:
116
			self.peer.reactor.processes.connected(self.peer.neighbor)
117
118
		if not local:
119
			self.neighbor.local_address = IP.create(self.connection.local)
120
			if self.neighbor.router_id is None and self.neighbor.local_address.afi == AFI.ipv4:
121
				self.neighbor.router_id = self.neighbor.local_address
122
123
		yield True
124
125
	def close (self, reason='protocol closed, reason unspecified'):
126
		if self.connection:
127
			self.logger.debug(reason,self.connection.session())
128
129
			# must be first otherwise we could have a loop caused by the raise in the below
130
			self.connection.close()
131
			self.connection = None
132
133
			self.peer.stats['down'] = self.peer.stats.get('down',0) + 1
134
			try:
135
				if self.peer.neighbor.api['neighbor-changes']:
136
					self.peer.reactor.processes.down(self.peer.neighbor,reason)
137
			except ProcessError:
138
				self.logger.debug('could not send notification of neighbor close to API',self.connection.session())
139
140
	def _to_api (self,direction,message,raw):
141
		packets = self.neighbor.api['%s-packets' % direction]
142
		parsed = self.neighbor.api['%s-parsed' % direction]
143
		consolidate = self.neighbor.api['%s-consolidate' % direction]
144
		negotiated = self.negotiated if self.neighbor.api['negotiated'] else None
145
146
		if consolidate:
147
			if packets:
148
				self.peer.reactor.processes.message(self.peer.neighbor,direction,message,negotiated,raw[:19],raw[19:])
149
			else:
150
				self.peer.reactor.processes.message(self.peer.neighbor,direction,message,negotiated,b'',b'')
151
		else:
152
			if packets:
153
				self.peer.reactor.processes.packets(self.peer.neighbor,direction,int(message.ID),negotiated,raw[:19],raw[19:])
154
			if parsed:
155
				self.peer.reactor.processes.message(message.ID,self.peer.neighbor,direction,message,negotiated,b'',b'')
156
157
	def write (self, message, negotiated=None):
158
		raw = message.message(negotiated)
159
160
		code = 'send-%s' % Message.CODE.short(message.ID)
161
		self.peer.stats[code] = self.peer.stats.get(code,0) + 1
162
		if self.neighbor.api.get(code,False):
163
			self._to_api('send',message,raw)
164
165
		for boolean in self.connection.writer(raw):
166
			yield boolean
167
168
	def send (self, raw):
169
		code = 'send-%s' % Message.CODE.short(ordinal(raw[18]))
170
		self.peer.stats[code] = self.peer.stats.get(code,0) + 1
171
		if self.neighbor.api.get(code,False):
172
			message = Update.unpack_message(raw[19:],self.negotiated)
173
			self._to_api('send',message,raw)
174
175
		for boolean in self.connection.writer(raw):
176
			yield boolean
177
178
	# Read from network .......................................................
179
180
	def read_message (self):
181
		# This will always be defined by the loop but scope leaking upset scrutinizer/pylint
182
		msg_id = None
183
184
		packets = self.neighbor.api['receive-packets']
185
		consolidate = self.neighbor.api['receive-consolidate']
186
		parsed = self.neighbor.api['receive-parsed']
187
188
		body,header = b'',b''  # just because pylint/pylama are getting more clever
189
190
		for length,msg_id,header,body,notify in self.connection.reader():
191
			# internal issue
192
			if notify:
193
				code = 'receive-%s' % Message.CODE.NOTIFICATION.SHORT
194
				if self.neighbor.api.get(code,False):
195
					if consolidate:
196
						self.peer.reactor.processes.notification(self.peer.neighbor,'receive',notify.code,notify.subcode,str(notify),None,header,body)
197
					elif parsed:
198
						self.peer.reactor.processes.notification(self.peer.neighbor,'receive',notify.code,notify.subcode,str(notify),None,b'',b'')
199
					elif packets:
200
						self.peer.reactor.processes.packets(self.peer.neighbor,'receive',msg_id,None,header,body)
201
				# XXX: is notify not already Notify class ?
202
				raise Notify(notify.code,notify.subcode,str(notify))
203
204
			if not length:
205
				yield _NOP
206
				continue
207
208
			self.logger.debug('<< message of type %s' % Message.CODE.name(msg_id),self.connection.session())
209
210
			code = 'receive-%s' % Message.CODE.short(msg_id)
211
			self.peer.stats[code] = self.peer.stats.get(code,0) + 1
212
			for_api = self.neighbor.api.get(code,False)
213
214
			if for_api and packets and not consolidate:
215
				negotiated = self.negotiated if self.neighbor.api.get('negotiated',False) else None
216
				self.peer.reactor.processes.packets(self.peer.neighbor,'receive',msg_id,negotiated,header,body)
217
218
			if msg_id == Message.CODE.UPDATE:
219
				if not self.neighbor.adj_rib_in and not (for_api or self.log_routes) and not (parsed or consolidate):
220
					yield _UPDATE
221
					return
222
223
			try:
224
				message = Message.unpack(msg_id,body,self.negotiated)
225
			except (KeyboardInterrupt,SystemExit,Notify):
226
				raise
227
			except Exception as exc:
228
				self.logger.debug('could not decode message "%d"' % msg_id,self.connection.session())
229
				self.logger.debug('%s' % str(exc),self.connection.session())
230
				self.logger.debug(traceback.format_exc(),self.connection.session())
231
				raise Notify(1,0,'can not decode update message of type "%d"' % msg_id)
232
				# raise Notify(5,0,'unknown message received')
233
234
			if message.TYPE == Update.TYPE:
235
				if Attribute.CODE.INTERNAL_TREAT_AS_WITHDRAW in message.attributes:
236
					for nlri in message.nlris:
237
						nlri.action = IN.WITHDRAWN
238
239
			if for_api:
240
				negotiated = self.negotiated if self.neighbor.api.get('negotiated',False) else None
241
				if consolidate:
242
					self.peer.reactor.processes.message(msg_id,self.neighbor,'receive',message,negotiated,header,body)
243
				elif parsed:
244
					self.peer.reactor.processes.message(msg_id,self.neighbor,'receive',message,negotiated,b'',b'')
245
246
			if message.TYPE == Notification.TYPE:
247
				raise message
248
249
			if message.TYPE == Update.TYPE and Attribute.CODE.INTERNAL_DISCARD in message.attributes:
250
				yield _NOP
251
			else:
252
				yield message
253
254
	def validate_open (self):
255
		error = self.negotiated.validate(self.neighbor)
256
		if error is not None:
257
			raise Notify(*error)
258
259
		if self.neighbor.api['negotiated']:
260
			self.peer.reactor.processes.negotiated(self.peer.neighbor,self.negotiated)
261
262
		if self.negotiated.mismatch:
263
			self.logger.warning('--------------------------------------------------------------------',self.connection.session())
264
			self.logger.warning('the connection can not carry the following family/families',self.connection.session())
265
			for reason,(afi,safi) in self.negotiated.mismatch:
266
				self.logger.warning(' - %s is not configured for %s/%s' % (reason,afi,safi),self.connection.session())
267
			self.logger.warning('therefore no routes of this kind can be announced on the connection',self.connection.session())
268
			self.logger.warning('--------------------------------------------------------------------',self.connection.session())
269
270
	def read_open (self, ip):
271
		for received_open in self.read_message():
272
			if received_open.TYPE == NOP.TYPE:
273
				yield received_open
274
			else:
275
				break
276
277
		if received_open.TYPE != Open.TYPE:
0 ignored issues
show
introduced by
The variable received_open does not seem to be defined in case the for loop on line 271 is not entered. Are you sure this can never be the case?
Loading history...
278
			raise Notify(5,1,'The first packet received is not an open message (%s)' % received_open)
279
280
		self.logger.debug('<< %s' % received_open,self.connection.session())
281
		yield received_open
282
283
	def read_keepalive (self):
284
		for message in self.read_message():
285
			if message.TYPE == NOP.TYPE:
286
				yield message
287
			else:
288
				break
289
290
		if message.TYPE != KeepAlive.TYPE:
0 ignored issues
show
introduced by
The variable message does not seem to be defined in case the for loop on line 284 is not entered. Are you sure this can never be the case?
Loading history...
291
			raise Notify(5,2)
292
293
		yield message
294
295
	#
296
	# Sending message to peer
297
	#
298
299
	def new_open (self):
300
		if self.neighbor.local_as:
301
			local_as = self.neighbor.local_as
302
		elif self.negotiated.received_open:
303
			local_as = self.negotiated.received_open.asn
304
		else:
305
			raise RuntimeError('no ASN available for the OPEN message')
306
307
		sent_open = Open(
308
			Version(4),
309
			local_as,
310
			self.neighbor.hold_time,
311
			self.neighbor.router_id,
312
			Capabilities().new(self.neighbor,self.peer._restarted)
313
		)
314
315
		# we do not buffer open message in purpose
316
		for _ in self.write(sent_open):
317
			yield _NOP
318
319
		self.logger.debug('>> %s' % sent_open,self.connection.session())
320
		yield sent_open
321
322
	def new_keepalive (self, comment=''):
323
		keepalive = KeepAlive()
324
325
		for _ in self.write(keepalive):
326
			yield _NOP
327
328
		self.logger.debug('>> KEEPALIVE%s' % (' (%s)' % comment if comment else ''),self.connection.session())
329
330
		yield keepalive
331
332
	def new_notification (self, notification):
333
		for _ in self.write(notification):
334
			yield _NOP
335
		self.logger.debug('>> NOTIFICATION (%d,%d,"%s")' % (notification.code,notification.subcode,notification.data.decode('utf-8')),self.connection.session())
336
		yield notification
337
338
	def new_update (self, include_withdraw):
339
		updates = self.neighbor.rib.outgoing.updates(self.neighbor.group_updates)
340
		number = 0
341
		for update in updates:
342
			for message in update.messages(self.negotiated,include_withdraw):
343
				number += 1
344
				for boolean in self.send(message):
345
					# boolean is a transient network error we already announced
346
					yield _NOP
347
		if number:
348
			self.logger.debug('>> %d UPDATE(s)' % number,self.connection.session())
349
		yield _UPDATE
350
351
	def new_eor (self, afi, safi):
352
		eor = EOR(afi,safi)
353
		for _ in self.write(eor):
354
			yield _NOP
355
		self.logger.debug('>> EOR %s %s' % (afi,safi),self.connection.session())
356
		yield eor
357
358
	def new_eors (self, afi=AFI.undefined,safi=SAFI.undefined):
359
		# Send EOR to let our peer know he can perform a RIB update
360
		if self.negotiated.families:
361
			families = self.negotiated.families if (afi,safi) == (AFI.undefined,SAFI.undefined) else [(afi,safi),]
362
			for eor_afi,eor_safi in families:
363
				for _ in self.new_eor(eor_afi,eor_safi):
364
					yield _
365
		else:
366
			# If we are not sending an EOR, send a keepalive as soon as when finished
367
			# So the other routers knows that we have no (more) routes to send ...
368
			# (is that behaviour documented somewhere ??)
369
			for eor in self.new_keepalive('EOR'):
370
				yield _NOP
371
			yield _UPDATE
372
373
	def new_operational (self, operational, negotiated):
374
		for _ in self.write(operational,negotiated):
375
			yield _NOP
376
		self.logger.debug('>> OPERATIONAL %s' % str(operational),self.connection.session())
377
		yield operational
378
379
	def new_refresh (self, refresh):
380
		for _ in self.write(refresh,None):
381
			yield _NOP
382
		self.logger.debug('>> REFRESH %s' % str(refresh),self.connection.session())
383
		yield refresh
384