Completed
Push — master ( 0dc7c6...640170 )
by Thomas
15:16
created

Connection.reading()   B

Complexity

Conditions 8

Size

Total Lines 17
Code Lines 16

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 8
eloc 16
nop 1
dl 0
loc 17
rs 7.3333
c 0
b 0
f 0
1
# encoding: utf-8
2
"""
3
network.py
4
5
Created by Thomas Mangin on 2009-09-06.
6
Copyright (c) 2009-2017 Exa Networks. All rights reserved.
7
License: 3-clause BSD. (See the COPYRIGHT file)
8
"""
9
10
import random
11
import socket
12
import select
13
from struct import unpack
14
15
from exabgp.util import ordinal
16
17
from exabgp.configuration.environment import environment
18
19
from exabgp.util.errstr import errstr
20
21
from exabgp.logger import Logger
22
from exabgp.logger import FakeLogger
23
from exabgp.logger import LazyFormat
24
25
from exabgp.bgp.message import Message
26
27
from exabgp.reactor.network.error import error
28
from exabgp.reactor.network.error import errno
29
from exabgp.reactor.network.error import NetworkError
30
from exabgp.reactor.network.error import TooSlowError
31
from exabgp.reactor.network.error import NotConnected
32
from exabgp.reactor.network.error import LostConnection
33
from exabgp.reactor.network.error import NotifyError
34
35
from exabgp.bgp.message.open.capability.extended import ExtendedMessage
36
37
from .error import *
38
39
40
class Connection (object):
41
	direction = 'undefined'
42
	identifier = {}
43
44
	def __init__ (self, afi, peer, local):
45
		self.msg_size = ExtendedMessage.INITIAL_SIZE
46
47
		# peer and local are strings of the IP
48
		try:
49
			self.defensive = environment.settings().debug.defensive
50
			self.logger = Logger()
51
		except RuntimeError:
52
			self.defensive = True
53
			self.logger = FakeLogger()
54
55
		self.afi = afi
56
		self.peer = peer
57
		self.local = local
58
59
		self.io = None
60
		self.established = False
61
		self._rpoller = {}
62
		self._wpoller = {}
63
64
		self.id = self.identifier.get(self.direction,1)
65
66
	def success (self):
67
		identifier = self.identifier.get(self.direction,1) + 1
68
		self.identifier[self.direction] = identifier
69
		return identifier
70
71
	# Just in case ..
72
	def __del__ (self):
73
		if self.io:
74
			self.close()
75
			self.logger.warning('connection to %s closed' % self.peer, self.session())
76
77
	def name (self):
78
		return "%s-%d %s-%s" % (self.direction,self.id,self.local,self.peer)
79
80
	def session (self):
81
		return "%s-%d" % (self.direction,self.id)
82
83
	def fd (self):
84
		if self.io:
85
			return self.io.fileno()
86
		# the socket is closed (fileno() == -1) or not open yet (io is None)
87
		return -1
88
89
	def close (self):
90
		try:
91
			self.logger.warning('%s, closing connection' % self.name(),source=self.session())
92
			if self.io:
93
				self.io.close()
94
				self.io = None
95
		except KeyboardInterrupt as exc:
96
			raise exc
97
		except Exception:
98
			self.io = None
99
100
	def reading (self):
101
		poller = self._rpoller.get(self.io,None)
102
		if poller is None:
103
			poller = select.poll()
104
			poller.register(self.io, select.POLLIN | select.POLLPRI | select.POLLHUP | select.POLLNVAL | select.POLLERR)
105
			self._rpoller = {self.io: poller}
106
107
		ready = False
108
		for _, event in poller.poll(0):
109
			if event & select.POLLIN or event & select.POLLPRI:
110
				ready = True
111
			elif event & select.POLLHUP:
112
				raise KeyboardInterrupt()
113
			elif event & select.POLLERR or event & select.POLLNVAL:
114
				self._rpoller = {}
115
				continue
116
		return ready
117
118
	def writing (self):
119
		poller = self._wpoller.get(self.io, None)
120
		if poller is None:
121
			poller = select.poll()
122
			poller.register(self.io, select.POLLOUT| select.POLLHUP | select.POLLNVAL | select.POLLERR)
123
			self._wpoller = {self.io: poller}
124
125
		ready = False
126
		for _, event in poller.poll(0):
127
			if event & select.POLLOUT:
128
				ready = True
129
			elif event & select.POLLHUP:
130
				raise KeyboardInterrupt()
131
			elif event & select.POLLERR or event & select.POLLNVAL:
132
				self._wpoller = {}
133
		return ready
134
135
	def _reader (self, number):
136
		# The function must not be called if it does not return with no data with a smaller size as parameter
137
		if not self.io:
138
			self.close()
139
			raise NotConnected('Trying to read on a closed TCP connection')
140
		if number == 0:
141
			yield b''
142
			return
143
144
		while not self.reading():
145
			yield b''
146
		data = b''
147
		reported = ''
148
		while True:
149
			try:
150
				while True:
151
					if self.defensive and random.randint(0,2):
152
						raise socket.error(errno.EAGAIN,'raising network error on purpose')
153
154
					read = self.io.recv(number)
155
					if not read:
156
						self.close()
157
						self.logger.warning('%s %s lost TCP session with peer' % (self.name(),self.peer),self.session())
158
						raise LostConnection('the TCP connection was closed by the remote end')
159
					data += read
160
161
					number -= len(read)
162
					if not number:
163
						self.logger.debug(LazyFormat('received TCP payload',data),self.session())
164
						yield data
165
						return
166
167
					yield b''
168
			except socket.timeout as exc:
169
				self.close()
170
				self.logger.warning('%s %s peer is too slow' % (self.name(),self.peer),self.session())
171
				raise TooSlowError('Timeout while reading data from the network (%s)' % errstr(exc))
172
			except socket.error as exc:
173
				if exc.args[0] in error.block:
174
					message = '%s %s blocking io problem mid-way through reading a message %s, trying to complete' % (self.name(),self.peer,errstr(exc))
175
					if message != reported:
176
						reported = message
177
						self.logger.debug(message,self.session())
178
					yield b''
179
				elif exc.args[0] in error.fatal:
180
					self.close()
181
					raise LostConnection('issue reading on the socket: %s' % errstr(exc))
182
				# what error could it be !
183
				else:
184
					self.logger.critical('%s %s undefined error reading on socket' % (self.name(),self.peer),self.session())
185
					raise NetworkError('Problem while reading data from the network (%s)' % errstr(exc))
186
187
	def writer (self, data):
188
		if not self.io:
189
			# XXX: FIXME: Make sure it does not hold the cleanup during the closing of the peering session
190
			yield True
191
			return
192
		while not self.writing():
193
			yield False
194
		self.logger.debug(LazyFormat('sending TCP payload',data),self.session())
195
		# The first while is here to setup the try/catch block once as it is very expensive
196
		while True:
197
			try:
198
				while True:
199
					if self.defensive and random.randint(0,2):
200
						raise socket.error(errno.EAGAIN,'raising network error on purpose')
201
202
					# we can not use sendall as in case of network buffer filling
203
					# it does raise and does not let you know how much was sent
204
					number = self.io.send(data)
205
					if not number:
206
						self.close()
207
						self.logger.warning('%s %s lost TCP connection with peer' % (self.name(),self.peer),self.session())
208
						raise LostConnection('lost the TCP connection')
209
210
					data = data[number:]
211
					if not data:
212
						yield True
213
						return
214
					yield False
215
			except socket.error as exc:
216
				if exc.args[0] in error.block:
217
					self.logger.debug(
218
						'%s %s blocking io problem mid-way through writing a message %s, trying to complete' % (
219
							self.name(),
220
							self.peer,
221
							errstr(exc)
222
						),
223
						self.session()
224
					)
225
					yield False
226
				elif exc.errno == errno.EPIPE:
227
					# The TCP connection is gone.
228
					self.close()
229
					raise NetworkError('Broken TCP connection')
230
				elif exc.args[0] in error.fatal:
231
					self.close()
232
					self.logger.critical('%s %s problem sending message (%s)' % (self.name(),self.peer,errstr(exc)),self.session())
233
					raise NetworkError('Problem while writing data to the network (%s)' % errstr(exc))
234
				# what error could it be !
235
				else:
236
					self.logger.critical('%s %s undefined error writing on socket' % (self.name(),self.peer),self.session())
237
					yield False
238
239
	def reader (self):
240
		# _reader returns the whole number requested or nothing and then stops
241
		for header in self._reader(Message.HEADER_LEN):
242
			if not header:
243
				yield 0,0,b'',b'',None
244
245
		if not header.startswith(Message.MARKER):
0 ignored issues
show
introduced by
The variable header does not seem to be defined in case the for loop on line 241 is not entered. Are you sure this can never be the case?
Loading history...
246
			report = 'The packet received does not contain a BGP marker'
247
			yield 0,0,header,b'',NotifyError(1,1,report)
248
			return
249
250
		msg = ordinal(header[18])
251
		length = unpack('!H',header[16:18])[0]
252
253
		if length < Message.HEADER_LEN or length > self.msg_size:
254
			report = '%s has an invalid message length of %d' % (Message.CODE.name(msg),length)
255
			yield length,0,header,b'',NotifyError(1,2,report)
256
			return
257
258
		validator = Message.Length.get(msg,lambda _: _ >= 19)
259
		if not validator(length):
260
			# MUST send the faulty length back
261
			report = '%s has an invalid message length of %d' % (Message.CODE.name(msg),length)
262
			yield length,0,header,b'',NotifyError(1,2,report)
263
			return
264
265
		number = length - Message.HEADER_LEN
266
267
		if not number:
268
			yield length,msg,header,b'',None
269
			return
270
271
		for body in self._reader(number):
272
			if not body:
273
				yield 0,0,b'',b'',None
274
275
		yield length,msg,header,body,None
0 ignored issues
show
introduced by
The variable body does not seem to be defined in case the for loop on line 271 is not entered. Are you sure this can never be the case?
Loading history...
276