Code Duplication    Length = 25-27 lines in 5 locations

lib/exabgp/bgp/message/update/attribute/community/extended/traffic.py 5 locations

@@ 226-252 (lines=27) @@
223
# draft-ietf-idr-flowspec-redirect-02
224
# see RFC 5701 for ipv6 address specific extended community format
225
226
@ExtendedCommunityIPv6.register
227
class TrafficNextHopIPv6IETF (	ExtendedCommunityIPv6):
228
	COMMUNITY_TYPE = 0x00
229
	COMMUNITY_SUBTYPE = 0x0C
230
231
	__slots__ = ['ip', 'copy']
232
233
	def __init__ (self, ip, copy, community=None):
234
		self.ip = ip
235
		self.copy = copy
236
		ExtendedCommunityIPv6.__init__(
237
			self,
238
			community if community is not None else pack(
239
				"!2s16sH",
240
				self._subtype(),
241
				ip.pack(),
242
				1 if copy else 0
243
			)
244
		)
245
246
	def __repr__ (self):
247
		return "copy-to-nexthop-ietf" if self.copy else "redirect-to-nexthop-ietf"
248
249
	@staticmethod
250
	def unpack (data):
251
		ip, bit = unpack('!16sH', data[2:20])
252
		return TrafficNextHopIPv6IETF(IPv6.ntop(ip), bool(bit & 0x01), data[:20])
253
254
# =============================================================== TrafficNextHopSimpson
255
# draft-simpson-idr-flowspec-redirect-02
@@ 194-220 (lines=27) @@
191
# draft-ietf-idr-flowspec-redirect-02
192
# see RFC 4360 for ipv4 address specific extended community format
193
194
@ExtendedCommunity.register
195
class TrafficNextHopIPv4IETF (ExtendedCommunity):
196
	COMMUNITY_TYPE = 0x01
197
	COMMUNITY_SUBTYPE = 0x0C
198
199
	__slots__ = ['ip', 'copy']
200
201
	def __init__ (self, ip, copy, community=None):
202
		self.ip = ip
203
		self.copy = copy
204
		ExtendedCommunity.__init__(
205
			self,
206
			community if community is not None else pack(
207
				"!2s4sH",
208
				self._subtype(),
209
				ip.pack(),
210
				1 if copy else 0
211
			)
212
		)
213
214
	def __repr__ (self):
215
		return "copy-to-nexthop-ietf" if self.copy else "redirect-to-nexthop-ietf"
216
217
	@staticmethod
218
	def unpack (data):
219
		ip, bit = unpack('!4sH', data[2:8])
220
		return TrafficNextHopIPv4IETF(IPv4.ntop(ip), bool(bit & 0x01), data[:8])
221
222
# =============================================================== TrafficNextHopIPv6IETF
223
# draft-ietf-idr-flowspec-redirect-02
@@ 106-131 (lines=26) @@
103
# ============================================================== TrafficRedirect
104
# RFC 5575 and 7674
105
106
@ExtendedCommunity.register
107
class TrafficRedirect (ExtendedCommunity):
108
	COMMUNITY_TYPE = 0x80
109
	COMMUNITY_SUBTYPE = 0x08
110
111
	__slots__ = ['asn','target']
112
113
	def __init__ (self, asn, target, community=None):
114
		self.asn = asn
115
		self.target = target
116
		ExtendedCommunity.__init__(
117
			self,
118
			community if community is not None else pack(
119
				"!2sHL",
120
				self._subtype(),
121
				asn,target
122
			)
123
		)
124
125
	def __repr__ (self):
126
		return "redirect:%s:%s" % (self.asn,self.target)
127
128
	@staticmethod
129
	def unpack (data):
130
		asn,target = unpack('!HL',data[2:8])
131
		return TrafficRedirect(ASN(asn),target,data[:8])
132
133
134
@ExtendedCommunity.register
@@ 26-51 (lines=26) @@
23
# ================================================================== TrafficRate
24
# RFC 5575
25
26
@ExtendedCommunity.register
27
class TrafficRate (ExtendedCommunity):
28
	COMMUNITY_TYPE = 0x80
29
	COMMUNITY_SUBTYPE = 0x06
30
31
	__slots__ = ['asn','rate']
32
33
	def __init__ (self, asn, rate, community=None):
34
		self.asn = asn
35
		self.rate = rate
36
		ExtendedCommunity.__init__(
37
			self,
38
			community if community is not None else pack(
39
				"!2sHf",
40
				self._subtype(),
41
				asn,rate
42
			)
43
		)
44
45
	def __repr__ (self):
46
		return "rate-limit:%d" % self.rate
47
48
	@staticmethod
49
	def unpack (data):
50
		asn,rate = unpack('!Hf',data[2:8])
51
		return TrafficRate(ASN(asn),rate,data[:8])
52
53
54
# ================================================================ TrafficAction
@@ 259-283 (lines=25) @@
256
257
# XXX: FIXME: I guess this should be a subclass of NextHop or IP ..
258
259
@ExtendedCommunity.register
260
class TrafficNextHopSimpson (ExtendedCommunity):
261
	COMMUNITY_TYPE = 0x08
262
	COMMUNITY_SUBTYPE = 0x00
263
264
	__slots__ = ['copy']
265
266
	def __init__ (self, copy, community=None):
267
		self.copy = copy
268
		ExtendedCommunity.__init__(
269
			self,
270
			community if community is not None else pack(
271
				"!2sLH",
272
				self._subtype(),
273
				0,1 if copy else 0
274
			)
275
		)
276
277
	def __repr__ (self):
278
		return "copy-to-nexthop" if self.copy else "redirect-to-nexthop"
279
280
	@staticmethod
281
	def unpack (data):
282
		bit, = unpack('!B',data[7:8])
283
		return TrafficNextHopSimpson(bool(bit & 0x01), data[:8])
284
285
# ============================================================ TrafficRedirectIPv6
286
# https://tools.ietf.org/html/rfc5701