Completed
Push — master ( 9001be...e9c8d4 )
by Thomas
10:58
created

exabgp.bgp.message.update.attribute.community.extended.traffic   B

Complexity

Total Complexity 44

Size/Duplication

Total Lines 306
Duplicated Lines 42.81 %

Importance

Changes 0
Metric Value
eloc 199
dl 131
loc 306
rs 8.8798
c 0
b 0
f 0
wmc 44

27 Methods

Rating   Name   Duplication   Size   Complexity  
A TrafficNextHopIPv6IETF.__init__() 10 10 3
A TrafficRate.unpack() 4 4 1
A TrafficMark.__repr__() 0 2 1
A TrafficAction.__init__() 0 10 2
A TrafficRedirectASN4.unpack() 0 4 1
A TrafficNextHopIPv6IETF.unpack() 4 4 1
A TrafficNextHopIPv4IETF.__init__() 10 10 3
A TrafficMark.unpack() 0 4 1
A TrafficRedirect.__init__() 9 9 2
A TrafficRedirect.unpack() 4 4 1
A TrafficRedirectASN4.__init__() 0 9 2
A TrafficMark.__init__() 0 8 2
A TrafficRate.__repr__() 2 2 1
A TrafficRedirectIPv6.unpack() 0 4 1
A TrafficNextHopIPv4IETF.__repr__() 2 2 2
A TrafficRedirectASN4.__str__() 0 2 1
A TrafficNextHopSimpson.__init__() 8 8 3
A TrafficNextHopSimpson.unpack() 4 4 1
A TrafficAction.__repr__() 0 7 3
A TrafficRedirectIPv6.__init__() 0 5 2
A TrafficRate.__init__() 9 9 2
A TrafficNextHopSimpson.__repr__() 2 2 2
A TrafficRedirectIPv6.__str__() 0 2 1
A TrafficRedirect.__repr__() 2 2 1
A TrafficAction.unpack() 0 6 1
A TrafficNextHopIPv6IETF.__repr__() 2 2 2
A TrafficNextHopIPv4IETF.unpack() 4 4 1

How to fix   Duplicated Code    Complexity   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

Complexity

 Tip:   Before tackling complexity, make sure that you eliminate any duplication first. This often can reduce the size of classes significantly.

Complex classes like exabgp.bgp.message.update.attribute.community.extended.traffic often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# encoding: utf-8
2
"""
3
traffic.py
4
5
Created by Thomas Mangin on 2014-06-21.
6
Copyright (c) 2014-2017 Exa Networks. All rights reserved.
7
License: 3-clause BSD. (See the COPYRIGHT file)
8
"""
9
10
import socket
11
12
from struct import pack
13
from struct import unpack
14
15
from exabgp.protocol.ip import IPv4
16
from exabgp.protocol.ip import IPv6
17
from exabgp.bgp.message.open.asn import ASN
18
from exabgp.bgp.message.open.capability.asn4 import ASN4
19
from exabgp.bgp.message.update.attribute.community.extended import ExtendedCommunity
20
from exabgp.bgp.message.update.attribute.community.extended import ExtendedCommunityIPv6
21
22
23
# ================================================================== TrafficRate
24
# RFC 5575
25
26 View Code Duplication
@ExtendedCommunity.register
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
27
class TrafficRate (ExtendedCommunity):
28
	COMMUNITY_TYPE = 0x80
29
	COMMUNITY_SUBTYPE = 0x06
30
31
	__slots__ = ['asn','rate']
32
33
	def __init__ (self, asn, rate, community=None):
34
		self.asn = asn
35
		self.rate = rate
36
		ExtendedCommunity.__init__(
37
			self,
38
			community if community is not None else pack(
39
				"!2sHf",
40
				self._subtype(),
41
				asn,rate
42
			)
43
		)
44
45
	def __repr__ (self):
46
		return "rate-limit:%d" % self.rate
47
48
	@staticmethod
49
	def unpack (data):
50
		asn,rate = unpack('!Hf',data[2:8])
51
		return TrafficRate(ASN(asn),rate,data[:8])
52
53
54
# ================================================================ TrafficAction
55
# RFC 5575
56
57
@ExtendedCommunity.register
58
class TrafficAction (ExtendedCommunity):
59
	COMMUNITY_TYPE = 0x80
60
	COMMUNITY_SUBTYPE = 0x07
61
62
	_sample = {
63
		False: 0x0,
64
		True:  0x2,
65
	}
66
67
	_terminal = {
68
		False: 0x0,
69
		True:  0x1,
70
	}
71
72
	__slots__ = ['sample','terminal']
73
74
	def __init__ (self, sample, terminal, community=None):
75
		self.sample = sample
76
		self.terminal = terminal
77
		bitmask = self._sample[sample] | self._terminal[terminal]
78
		ExtendedCommunity.__init__(
79
			self,
80
			community if community is not None else pack(
81
				'!2sLBB',
82
				self._subtype(),
83
				0,0,bitmask
84
			)
85
		)
86
87
	def __repr__ (self):
88
		s = []
89
		if self.sample:
90
			s.append('sample')
91
		if self.terminal:
92
			s.append('terminal')
93
		return 'action %s' % '-'.join(s)
94
95
	@staticmethod
96
	def unpack (data):
97
		bit, = unpack('!B',data[7:8])
98
		sample = bool(bit & 0x02)
99
		terminal = bool(bit & 0x01)
100
		return TrafficAction(sample,terminal,data[:8])
101
102
103
# ============================================================== TrafficRedirect
104
# RFC 5575 and 7674
105
106 View Code Duplication
@ExtendedCommunity.register
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
107
class TrafficRedirect (ExtendedCommunity):
108
	COMMUNITY_TYPE = 0x80
109
	COMMUNITY_SUBTYPE = 0x08
110
111
	__slots__ = ['asn','target']
112
113
	def __init__ (self, asn, target, community=None):
114
		self.asn = asn
115
		self.target = target
116
		ExtendedCommunity.__init__(
117
			self,
118
			community if community is not None else pack(
119
				"!2sHL",
120
				self._subtype(),
121
				asn,target
122
			)
123
		)
124
125
	def __repr__ (self):
126
		return "redirect:%s:%s" % (self.asn,self.target)
127
128
	@staticmethod
129
	def unpack (data):
130
		asn,target = unpack('!HL',data[2:8])
131
		return TrafficRedirect(ASN(asn),target,data[:8])
132
133
134
@ExtendedCommunity.register
135
class TrafficRedirectASN4 (ExtendedCommunity):
136
	COMMUNITY_TYPE = 0x82
137
	COMMUNITY_SUBTYPE = 0x08
138
139
	__slots__ = ['asn', 'target']
140
141
	def __init__(self, asn, target, community=None):
142
		self.asn = asn
143
		self.target = target
144
		ExtendedCommunity.__init__(
145
			self,
146
			community if community is not None else pack(
147
				"!2sLH",
148
				self._subtype(),
149
					asn, target
150
			)
151
		)
152
153
	def __str__(self):
154
		return "redirect:%s:%s" % (self.asn, self.target)
155
156
	@staticmethod
157
	def unpack(data):
158
		asn, target = unpack('!LH', data[2:8])
159
		return TrafficRedirectASN4(ASN4(asn), target, data[:8])
160
161
# ================================================================== TrafficMark
162
# RFC 5575
163
164
@ExtendedCommunity.register
165
class TrafficMark (ExtendedCommunity):
166
	COMMUNITY_TYPE = 0x80
167
	COMMUNITY_SUBTYPE = 0x09
168
169
	__slots__ = ['dscp']
170
171
	def __init__ (self, dscp, community=None):
172
		self.dscp = dscp
173
		ExtendedCommunity.__init__(
174
			self,
175
			community if community is not None else pack(
176
				"!2sLBB",
177
				self._subtype(),
178
				0,0,dscp
179
			)
180
		)
181
182
	def __repr__ (self):
183
		return "mark %d" % self.dscp
184
185
	@staticmethod
186
	def unpack (data):
187
		dscp, = unpack('!B',data[7:8])
188
		return TrafficMark(dscp,data[:8])
189
190
# =============================================================== TrafficNextHopIPv4IETF
191
# draft-ietf-idr-flowspec-redirect-02
192
# see RFC 4360 for ipv4 address specific extended community format
193
194 View Code Duplication
@ExtendedCommunity.register
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
195
class TrafficNextHopIPv4IETF (ExtendedCommunity):
196
	COMMUNITY_TYPE = 0x01
197
	COMMUNITY_SUBTYPE = 0x0C
198
199
	__slots__ = ['ip', 'copy']
200
201
	def __init__ (self, ip, copy, community=None):
202
		self.ip = ip
203
		self.copy = copy
204
		ExtendedCommunity.__init__(
205
			self,
206
			community if community is not None else pack(
207
				"!2s4sH",
208
				self._subtype(),
209
				ip.pack(),
210
				1 if copy else 0
211
			)
212
		)
213
214
	def __repr__ (self):
215
		return "copy-to-nexthop-ietf" if self.copy else "redirect-to-nexthop-ietf"
216
217
	@staticmethod
218
	def unpack (data):
219
		ip, bit = unpack('!4sH', data[2:8])
220
		return TrafficNextHopIPv4IETF(IPv4.ntop(ip), bool(bit & 0x01), data[:8])
221
222
# =============================================================== TrafficNextHopIPv6IETF
223
# draft-ietf-idr-flowspec-redirect-02
224
# see RFC 5701 for ipv6 address specific extended community format
225
226 View Code Duplication
@ExtendedCommunityIPv6.register
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
227
class TrafficNextHopIPv6IETF (	ExtendedCommunityIPv6):
228
	COMMUNITY_TYPE = 0x00
229
	COMMUNITY_SUBTYPE = 0x0C
230
231
	__slots__ = ['ip', 'copy']
232
233
	def __init__ (self, ip, copy, community=None):
234
		self.ip = ip
235
		self.copy = copy
236
		ExtendedCommunityIPv6.__init__(
237
			self,
238
			community if community is not None else pack(
239
				"!2s16sH",
240
				self._subtype(),
241
				ip.pack(),
242
				1 if copy else 0
243
			)
244
		)
245
246
	def __repr__ (self):
247
		return "copy-to-nexthop-ietf" if self.copy else "redirect-to-nexthop-ietf"
248
249
	@staticmethod
250
	def unpack (data):
251
		ip, bit = unpack('!16sH', data[2:20])
252
		return TrafficNextHopIPv6IETF(IPv6.ntop(ip), bool(bit & 0x01), data[:20])
253
254
# =============================================================== TrafficNextHopSimpson
255
# draft-simpson-idr-flowspec-redirect-02
256
257
# XXX: FIXME: I guess this should be a subclass of NextHop or IP ..
258
259 View Code Duplication
@ExtendedCommunity.register
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
260
class TrafficNextHopSimpson (ExtendedCommunity):
261
	COMMUNITY_TYPE = 0x08
262
	COMMUNITY_SUBTYPE = 0x00
263
264
	__slots__ = ['copy']
265
266
	def __init__ (self, copy, community=None):
267
		self.copy = copy
268
		ExtendedCommunity.__init__(
269
			self,
270
			community if community is not None else pack(
271
				"!2sLH",
272
				self._subtype(),
273
				0,1 if copy else 0
274
			)
275
		)
276
277
	def __repr__ (self):
278
		return "copy-to-nexthop" if self.copy else "redirect-to-nexthop"
279
280
	@staticmethod
281
	def unpack (data):
282
		bit, = unpack('!B',data[7:8])
283
		return TrafficNextHopSimpson(bool(bit & 0x01), data[:8])
284
285
# ============================================================ TrafficRedirectIPv6
286
# https://tools.ietf.org/html/rfc5701
287
288
@ExtendedCommunityIPv6.register
289
class TrafficRedirectIPv6 (ExtendedCommunityIPv6):
290
	COMMUNITY_TYPE = 0x00
291
	COMMUNITY_SUBTYPE = 0x02
292
293
	def __init__(self, ip, asn, community=None):
294
		self.ip = ip
295
		self.asn = asn
296
		ExtendedCommunityIPv6.__init__(self, community if community is not None else pack(
297
			"!BB16sH", 0x00, 0x02, socket.inet_aton(socket.AF_INET6, ip), asn))
298
299
	def __str__(self):
300
		return "redirect %s:%d" % (self.ip, self.asn)
301
302
	@staticmethod
303
	def unpack(data):
304
		ip, asn = unpack('!16sH', data[2:11])
305
		return TrafficRedirectIPv6(socket.inet_ntoa(socket.AF_INET6, ip), asn, data[:11])
306
307
308
# ============================================================ TrafficRedirectIP
309
# RFC 5575
310
# If we need to provide the <IP>:<ASN> form for the FlowSpec Redirect ...
311
312
# import socket
313
# Do not use socket, use IPv4.ntop or pton
314
315
# TrafficRedirectASN = TrafficRedirect
316
317
# class TrafficRedirectIP (ExtendedCommunity):
318
# 	COMMUNITY_TYPE = 0x80
319
# 	COMMUNITY_SUBTYPE = 0x08
320
321
# 	def __init__ (self, ip, target, community=None):
322
# 		self.ip = ip
323
# 		self.target = target
324
# 		ExtendedCommunity.__init__(self,community if community is not None else pack("!BB4sH",0x80,0x08,socket.inet_pton(socket.AF_INET,ip),target))
325
326
# 	def __str__ (self):
327
# 		return "redirect %s:%d" % (self.ip,self.target)
328
329
# 	@staticmethod
330
# 	def unpack (data):
331
# 		ip,target = unpack('!4sH',data[2:8])
332
# 		return TrafficRedirectIP(socket.inet_ntop(socket.AF_INET,ip),target,data[:8])
333