Completed
Push — master ( 0dc7c6...640170 )
by Thomas
15:16
created

exabgp.reactor.network.tcp   F

Complexity

Total Complexity 65

Size/Duplication

Total Lines 281
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 188
dl 0
loc 281
rs 3.2
c 0
b 0
f 0
wmc 65

10 Functions

Rating   Name   Duplication   Size   Complexity  
A TTL() 0 7 3
A nagle() 0 6 2
A asynchronous() 0 5 2
F MD5() 0 77 19
A MIN_TTL() 0 14 5
A bind() 0 8 4
B create() 0 17 6
A TTLv6() 0 6 3
B connect() 0 12 6
F ready() 0 47 15

How to fix   Complexity   

Complexity

Complex classes like exabgp.reactor.network.tcp often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# encoding: utf-8
2
"""
3
tcp.py
4
5
Created by Thomas Mangin on 2013-07-13.
6
Copyright (c) 2013-2017 Exa Networks. All rights reserved.
7
License: 3-clause BSD. (See the COPYRIGHT file)
8
"""
9
10
import re
11
import base64
12
import time
13
import socket
14
import select
15
import platform
16
17
from struct import pack,calcsize
18
19
from exabgp.util import bytes_ascii
20
from exabgp.util.errstr import errstr
21
22
from exabgp.protocol.family import AFI
23
from exabgp.protocol.ip import IP
24
from exabgp.reactor.network.error import errno
25
from exabgp.reactor.network.error import error
26
27
from exabgp.reactor.network.error import NotConnected
28
from exabgp.reactor.network.error import BindingError
29
from exabgp.reactor.network.error import MD5Error
30
from exabgp.reactor.network.error import NagleError
31
from exabgp.reactor.network.error import TTLError
32
from exabgp.reactor.network.error import AsyncError
33
34
from exabgp.logger import Logger
35
36
37
def create (afi):
38
	try:
39
		if afi == AFI.ipv4:
40
			io = socket.socket(socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_TCP)
41
		if afi == AFI.ipv6:
42
			io = socket.socket(socket.AF_INET6, socket.SOCK_STREAM, socket.IPPROTO_TCP)
43
		try:
44
			io.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
0 ignored issues
show
introduced by
The variable io does not seem to be defined in case afi == AFI.ipv4 on line 39 is False. Are you sure this can never be the case?
Loading history...
45
		except (socket.error,AttributeError):
46
			pass
47
		try:
48
			io.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)  # pylint: disable=E1101
49
		except (socket.error,AttributeError):
50
			pass
51
	except socket.error:
52
		raise NotConnected('Could not create socket')
53
	return io
54
55
56
def bind (io, ip, afi):
57
	try:
58
		if afi == AFI.ipv4:
59
			io.bind((ip,0))
60
		if afi == AFI.ipv6:
61
			io.bind((ip,0,0,0))
62
	except socket.error as exc:
63
		raise BindingError('Could not bind to local ip %s - %s' % (ip,str(exc)))
64
65
66
def connect (io, ip, port, afi, md5):
67
	try:
68
		if afi == AFI.ipv4:
69
			io.connect((ip,port))
70
		if afi == AFI.ipv6:
71
			io.connect((ip,port,0,0))
72
	except socket.error as exc:
73
		if exc.errno == errno.EINPROGRESS:
74
			return
75
		if md5:
76
			raise NotConnected('Could not connect to peer %s:%d, check your MD5 password (%s)' % (ip,port,errstr(exc)))
77
		raise NotConnected('Could not connect to peer %s:%d (%s)' % (ip,port,errstr(exc)))
78
79
80
# http://lxr.free-electrons.com/source/include/uapi/linux/tcp.h#L197
81
#
82
# #define TCP_MD5SIG_MAXKEYLEN    80
83
#
84
# struct tcp_md5sig {
85
# 	struct __kernel_sockaddr_storage tcpm_addr;     /* address associated */  128
86
# 	__u16   __tcpm_pad1;                            /* zero */                  2
87
# 	__u16   tcpm_keylen;                            /* key length */            2
88
# 	__u32   __tcpm_pad2;                            /* zero */                  4
89
# 	__u8    tcpm_key[TCP_MD5SIG_MAXKEYLEN];         /* key (binary) */         80
90
# }
91
#
92
# #define _K_SS_MAXSIZE   128
93
#
94
# #define _K_SS_ALIGNSIZE (__alignof__ (struct sockaddr *))
95
# /* Implementation specific desired alignment */
96
#
97
# typedef unsigned short __kernel_sa_family_t;
98
#
99
# struct __kernel_sockaddr_storage {
100
# 	__kernel_sa_family_t    ss_family;              /* address family */
101
# 	/* Following field(s) are implementation specific */
102
# 	char    __data[_K_SS_MAXSIZE - sizeof(unsigned short)];
103
# 	/* space to achieve desired size, */
104
# 	/* _SS_MAXSIZE value minus size of ss_family */
105
# } __attribute__ ((aligned(_K_SS_ALIGNSIZE)));   /* force desired alignment */
106
107
def MD5 (io, ip, port, md5, md5_base64):
108
	platform_os = platform.system()
109
	if platform_os == 'FreeBSD':
110
		if md5:
111
			if md5 != 'kernel':
112
				raise MD5Error(
113
					'FreeBSD requires that you set your MD5 key via ipsec.conf.\n'
114
					'Something like:\n'
115
					'flush;\n'
116
					'add <local ip> <peer ip> tcp 0x1000 -A tcp-md5 "password";'
117
					)
118
			try:
119
				TCP_MD5SIG = 0x10
120
				io.setsockopt(socket.IPPROTO_TCP, TCP_MD5SIG, 1)
121
			except socket.error:
122
				raise MD5Error(
123
					'FreeBSD requires that you rebuild your kernel to enable TCP MD5 Signatures:\n'
124
					'options         IPSEC\n'
125
					'options         TCP_SIGNATURE\n'
126
					'device          crypto\n'
127
				)
128
	elif platform_os == 'Linux':
129
		try:
130
			md5_bytes = None
131
			if md5:
132
				if md5_base64 is True:
133
					try:
134
						md5_bytes = base64.b64decode(md5)
135
					except TypeError:
136
						raise MD5Error("Failed to decode base 64 encoded PSK")
137
				elif md5_base64 is None and not re.match('.*[^a-f0-9].*', md5):  # auto
138
					options = [md5+'==', md5+'=', md5]
139
					for md5 in options:
140
						try:
141
							md5_bytes = base64.b64decode(md5)
142
							break
143
						except TypeError:
144
							pass
145
146
			# __kernel_sockaddr_storage
147
			n_af   = IP.toaf(ip)
148
			n_addr = IP.pton(ip)
149
			n_port = socket.htons(port)
150
151
			# pack 'x' is padding, so we want the struct
152
			# Do not use '!' for the pack, the network (big) endian switch in
153
			# struct.pack is fighting against inet_pton and htons (note the n)
154
155
			if IP.toafi(ip) == AFI.ipv4:
156
				# SS_MAXSIZE is 128 but addr_family, port and ipaddr (8 bytes total) are written independently of the padding
157
				SS_MAXSIZE_PADDING = 128 - calcsize('HH4s')  # 8
158
				sockaddr = pack('HH4s%dx' % SS_MAXSIZE_PADDING, socket.AF_INET, n_port, n_addr)
159
			else:
160
				SS_MAXSIZE_PADDING = 128 - calcsize('HI16sI')  # 28
161
				SIN6_FLOWINFO = 0
162
				SIN6_SCOPE_ID = 0
163
				sockaddr = pack('HHI16sI%dx' % SS_MAXSIZE_PADDING, n_af, n_port, SIN6_FLOWINFO, n_addr, SIN6_SCOPE_ID)
164
165
			TCP_MD5SIG_MAXKEYLEN = 80
166
			TCP_MD5SIG = 14
167
168
			if md5_bytes:
169
				key = pack('2xH4x%ds' % TCP_MD5SIG_MAXKEYLEN, len(md5_bytes), md5_bytes)
170
				io.setsockopt(socket.IPPROTO_TCP, TCP_MD5SIG, sockaddr + key)
171
			elif md5:
172
				md5_bytes = bytes_ascii(md5)
173
				key = pack('2xH4x%ds' % TCP_MD5SIG_MAXKEYLEN, len(md5_bytes), md5_bytes)
174
				io.setsockopt(socket.IPPROTO_TCP, TCP_MD5SIG, sockaddr + key)
175
			# else:
176
			# 	key = pack('2xH4x%ds' % TCP_MD5SIG_MAXKEYLEN, 0, b'')
177
			# 	io.setsockopt(socket.IPPROTO_TCP, TCP_MD5SIG, sockaddr + key)
178
179
		except socket.error as exc:
180
			if exc.errno != errno.ENOENT:
181
				raise MD5Error('This linux machine does not support TCP_MD5SIG, you can not use MD5 (%s)' % errstr(exc))
182
	elif md5:
183
		raise MD5Error('ExaBGP has no MD5 support for %s' % platform_os)
184
185
186
def nagle (io, ip):
187
	try:
188
		# diable Nagle's algorithm (no grouping of packets)
189
		io.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
190
	except (socket.error,AttributeError):
191
		raise NagleError("Could not disable nagle's algorithm for %s" % ip)
192
193
194
def TTL (io, ip, ttl):
195
	# None (ttl-security unset) or zero (maximum TTL) is the same thing
196
	if ttl:
197
		try:
198
			io.setsockopt(socket.IPPROTO_IP, socket.IP_TTL, ttl)
199
		except socket.error as exc:
200
			raise TTLError('This OS does not support IP_TTL (ttl-security) for %s (%s)' % (ip,errstr(exc)))
201
202
203
def TTLv6 (io, ip, ttl):
204
	if ttl:
205
		try:
206
			io.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_UNICAST_HOPS, ttl)
207
		except socket.error as exc:
208
			raise TTLError('This OS does not support unicast_hops (ttl-security) for %s (%s)' % (ip,errstr(exc)))
209
210
211
def MIN_TTL (io, ip, ttl):
212
	# None (ttl-security unset) or zero (maximum TTL) is the same thing
213
	if ttl:
214
		try:
215
			io.setsockopt(socket.IPPROTO_IP, socket.IP_MINTTL, ttl)
216
		except socket.error as exc:
217
			raise TTLError('This OS does not support IP_MINTTL (ttl-security) for %s (%s)' % (ip,errstr(exc)))
218
		except AttributeError:
219
			pass
220
221
		try:
222
			io.setsockopt(socket.IPPROTO_IP, socket.IP_TTL, ttl)
223
		except socket.error as exc:
224
			raise TTLError('This OS does not support IP_MINTTL or IP_TTL (ttl-security) for %s (%s)' % (ip,errstr(exc)))
225
226
227
def asynchronous(io, ip):
228
	try:
229
		io.setblocking(0)
230
	except socket.error as exc:
231
		raise AsyncError('could not set socket non-blocking for %s (%s)' % (ip,errstr(exc)))
232
233
234
def ready (io):
235
	logger = Logger()
236
	warned = False
237
	start = time.time()
238
239
	poller = select.poll()
240
	poller.register(io, select.POLLOUT | select.POLLNVAL | select.POLLERR)
241
242
	while True:
243
		try:
244
			found = False
245
			pulled = poller.poll(0)
246
			if not pulled:
247
				if not warned and time.time() - start > 1.0:
248
					logger.debug('attempting to establish connection', 'network')
249
					warned = True
250
				yield False
251
				continue
252
253
			for _, event in pulled:
254
				if event & select.POLLOUT:
255
					found = True
256
				elif event & select.POLLHUP:
257
					raise KeyboardInterrupt()
258
				elif event & select.POLLERR or event & select.POLLNVAL:
259
					logger.warning('connect attempt failed, issue with reading on the network, retrying', 'network')
260
				yield found
261
262
			if not found:
263
				continue
264
265
			err = io.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
266
			if not err:
267
				if warned:
268
					logger.warning('connection established','network')
269
				yield True
270
				return
271
			elif err in error.block:
272
				logger.warning('connect attempt failed, retrying, reason %s' % errno.errorcode[err],'network')
273
				yield False
274
				return
275
			else:
276
				yield False
277
				return
278
		except select.error:
279
			yield False
280
			return
281