Completed
Push — master ( d75605...cff8bf )
by Thomas
12:23
created

Connection.reading()   C

Complexity

Conditions 9

Size

Total Lines 15
Code Lines 14

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 9
eloc 14
nop 1
dl 0
loc 15
rs 6.6666
c 0
b 0
f 0
1
# encoding: utf-8
2
"""
3
network.py
4
5
Created by Thomas Mangin on 2009-09-06.
6
Copyright (c) 2009-2017 Exa Networks. All rights reserved.
7
License: 3-clause BSD. (See the COPYRIGHT file)
8
"""
9
10
import random
11
import socket
12
import select
13
from struct import unpack
14
15
from exabgp.util import ordinal
16
17
from exabgp.configuration.environment import environment
18
19
from exabgp.util.errstr import errstr
20
21
from exabgp.logger import Logger
22
from exabgp.logger import FakeLogger
23
from exabgp.logger import LazyFormat
24
25
from exabgp.bgp.message import Message
26
27
from exabgp.reactor.network.error import error
28
from exabgp.reactor.network.error import errno
29
from exabgp.reactor.network.error import NetworkError
30
from exabgp.reactor.network.error import TooSlowError
31
from exabgp.reactor.network.error import NotConnected
32
from exabgp.reactor.network.error import LostConnection
33
from exabgp.reactor.network.error import NotifyError
34
35
from exabgp.bgp.message.open.capability.extended import ExtendedMessage
36
37
from .error import *
38
39
40
class Connection (object):
41
	direction = 'undefined'
42
	identifier = {}
43
44
	def __init__ (self, afi, peer, local):
45
		self.msg_size = ExtendedMessage.INITIAL_SIZE
46
47
		# peer and local are strings of the IP
48
		try:
49
			self.defensive = environment.settings().debug.defensive
50
			self.logger = Logger()
51
		except RuntimeError:
52
			self.defensive = True
53
			self.logger = FakeLogger()
54
55
		self.afi = afi
56
		self.peer = peer
57
		self.local = local
58
59
		self.io = None
60
		self.established = False
61
		self._rpoller = {}
62
		self._wpoller = {}
63
64
		self.id = self.identifier.get(self.direction,1)
65
66
	def success (self):
67
		identifier = self.identifier.get(self.direction,1) + 1
68
		self.identifier[self.direction] = identifier
69
		return identifier
70
71
	# Just in case ..
72
	def __del__ (self):
73
		if self.io:
74
			self.close()
75
			self.logger.warning('connection to %s closed' % self.peer, self.session())
76
77
	def name (self):
78
		return "%s-%d %s-%s" % (self.direction,self.id,self.local,self.peer)
79
80
	def session (self):
81
		return "%s-%d" % (self.direction,self.id)
82
83
	def fd (self):
84
		if self.io:
85
			return self.io.fileno()
86
		# the socket is closed (fileno() == -1) or not open yet (io is None)
87
		return -1
88
89
	def close (self):
90
		try:
91
			self.logger.warning('%s, closing connection' % self.name(),source=self.session())
92
			if self.io:
93
				self.io.close()
94
				self.io = None
95
		except KeyboardInterrupt as exc:
96
			raise exc
97
		except Exception:
98
			self.io = None
99
100
	def reading (self):
101
		poller = self._rpoller.get(self.io,None)
102
		if poller is None:
103
			poller = select.poll()
104
			poller.register(self.io, select.POLLIN | select.POLLPRI | select.POLLHUP | select.POLLNVAL | select.POLLERR)
105
			self._rpoller = {self.io: poller}
106
107
		ready = False
108
		for _, event in poller.poll(0):
109
			if event & select.POLLIN or event & select.POLLPRI:
110
				ready = True
111
			elif event & select.POLLHUP or event & select.POLLRDHUP or event & select.POLLERR or event & select.POLLNVAL:
112
				self._rpoller = {}
113
				ready = True
114
		return ready
115
116
	def writing (self):
117
		poller = self._wpoller.get(self.io, None)
118
		if poller is None:
119
			poller = select.poll()
120
			poller.register(self.io, select.POLLOUT| select.POLLHUP | select.POLLNVAL | select.POLLERR)
121
			self._wpoller = {self.io: poller}
122
123
		ready = False
124
		for _, event in poller.poll(0):
125
			if event & select.POLLOUT:
126
				ready = True
127
			elif event & select.POLLHUP or event & select.POLLRDHUP or event & select.POLLERR or event & select.POLLNVAL:
128
				self._wpoller = {}
129
				ready = True
130
		return ready
131
132
	def _reader (self, number):
133
		# The function must not be called if it does not return with no data with a smaller size as parameter
134
		if not self.io:
135
			self.close()
136
			raise NotConnected('Trying to read on a closed TCP connection')
137
		if number == 0:
138
			yield b''
139
			return
140
141
		while not self.reading():
142
			yield b''
143
		data = b''
144
		reported = ''
145
		while True:
146
			try:
147
				while True:
148
					if self.defensive and random.randint(0,2):
149
						raise socket.error(errno.EAGAIN,'raising network error on purpose')
150
151
					read = self.io.recv(number)
152
					if not read:
153
						self.close()
154
						self.logger.warning('%s %s lost TCP session with peer' % (self.name(),self.peer),self.session())
155
						raise LostConnection('the TCP connection was closed by the remote end')
156
					data += read
157
158
					number -= len(read)
159
					if not number:
160
						self.logger.debug(LazyFormat('received TCP payload',data),self.session())
161
						yield data
162
						return
163
164
					yield b''
165
			except socket.timeout as exc:
166
				self.close()
167
				self.logger.warning('%s %s peer is too slow' % (self.name(),self.peer),self.session())
168
				raise TooSlowError('Timeout while reading data from the network (%s)' % errstr(exc))
169
			except socket.error as exc:
170
				if exc.args[0] in error.block:
171
					message = '%s %s blocking io problem mid-way through reading a message %s, trying to complete' % (self.name(),self.peer,errstr(exc))
172
					if message != reported:
173
						reported = message
174
						self.logger.debug(message,self.session())
175
					yield b''
176
				elif exc.args[0] in error.fatal:
177
					self.close()
178
					raise LostConnection('issue reading on the socket: %s' % errstr(exc))
179
				# what error could it be !
180
				else:
181
					self.logger.critical('%s %s undefined error reading on socket' % (self.name(),self.peer),self.session())
182
					raise NetworkError('Problem while reading data from the network (%s)' % errstr(exc))
183
184
	def writer (self, data):
185
		if not self.io:
186
			# XXX: FIXME: Make sure it does not hold the cleanup during the closing of the peering session
187
			yield True
188
			return
189
		while not self.writing():
190
			yield False
191
		self.logger.debug(LazyFormat('sending TCP payload',data),self.session())
192
		# The first while is here to setup the try/catch block once as it is very expensive
193
		while True:
194
			try:
195
				while True:
196
					if self.defensive and random.randint(0,2):
197
						raise socket.error(errno.EAGAIN,'raising network error on purpose')
198
199
					# we can not use sendall as in case of network buffer filling
200
					# it does raise and does not let you know how much was sent
201
					number = self.io.send(data)
202
					if not number:
203
						self.close()
204
						self.logger.warning('%s %s lost TCP connection with peer' % (self.name(),self.peer),self.session())
205
						raise LostConnection('lost the TCP connection')
206
207
					data = data[number:]
208
					if not data:
209
						yield True
210
						return
211
					yield False
212
			except socket.error as exc:
213
				if exc.args[0] in error.block:
214
					self.logger.debug(
215
						'%s %s blocking io problem mid-way through writing a message %s, trying to complete' % (
216
							self.name(),
217
							self.peer,
218
							errstr(exc)
219
						),
220
						self.session()
221
					)
222
					yield False
223
				elif exc.errno == errno.EPIPE:
224
					# The TCP connection is gone.
225
					self.close()
226
					raise NetworkError('Broken TCP connection')
227
				elif exc.args[0] in error.fatal:
228
					self.close()
229
					self.logger.critical('%s %s problem sending message (%s)' % (self.name(),self.peer,errstr(exc)),self.session())
230
					raise NetworkError('Problem while writing data to the network (%s)' % errstr(exc))
231
				# what error could it be !
232
				else:
233
					self.logger.critical('%s %s undefined error writing on socket' % (self.name(),self.peer),self.session())
234
					yield False
235
236
	def reader (self):
237
		# _reader returns the whole number requested or nothing and then stops
238
		for header in self._reader(Message.HEADER_LEN):
239
			if not header:
240
				yield 0,0,b'',b'',None
241
242
		if not header.startswith(Message.MARKER):
0 ignored issues
show
introduced by
The variable header does not seem to be defined in case the for loop on line 238 is not entered. Are you sure this can never be the case?
Loading history...
243
			report = 'The packet received does not contain a BGP marker'
244
			yield 0,0,header,b'',NotifyError(1,1,report)
245
			return
246
247
		msg = ordinal(header[18])
248
		length = unpack('!H',header[16:18])[0]
249
250
		if length < Message.HEADER_LEN or length > self.msg_size:
251
			report = '%s has an invalid message length of %d' % (Message.CODE.name(msg),length)
252
			yield length,0,header,b'',NotifyError(1,2,report)
253
			return
254
255
		validator = Message.Length.get(msg,lambda _: _ >= 19)
256
		if not validator(length):
257
			# MUST send the faulty length back
258
			report = '%s has an invalid message length of %d' % (Message.CODE.name(msg),length)
259
			yield length,0,header,b'',NotifyError(1,2,report)
260
			return
261
262
		number = length - Message.HEADER_LEN
263
264
		if not number:
265
			yield length,msg,header,b'',None
266
			return
267
268
		for body in self._reader(number):
269
			if not body:
270
				yield 0,0,b'',b'',None
271
272
		yield length,msg,header,body,None
0 ignored issues
show
introduced by
The variable body does not seem to be defined in case the for loop on line 268 is not entered. Are you sure this can never be the case?
Loading history...
273