Passed
Pull Request — master (#923)
by
unknown
12:29
created

TrafficNextHop.__repr__()   A

Complexity

Conditions 2

Size

Total Lines 2
Code Lines 2

Duplication

Lines 2
Ratio 100 %

Importance

Changes 0
Metric Value
cc 2
eloc 2
nop 1
dl 2
loc 2
rs 10
c 0
b 0
f 0
1
# encoding: utf-8
2
"""
3
traffic.py
4
5
Created by Thomas Mangin on 2014-06-21.
6
Copyright (c) 2014-2017 Exa Networks. All rights reserved.
7
License: 3-clause BSD. (See the COPYRIGHT file)
8
"""
9
10
from struct import pack
11
from struct import unpack
12
13
from exabgp.protocol.ip import IPv4
14
from exabgp.protocol.ip import IPv6
15
from exabgp.bgp.message.open.asn import ASN
16
from exabgp.bgp.message.open.capability.asn4 import ASN4
17
from exabgp.bgp.message.update.attribute.community.extended import ExtendedCommunity
18
from exabgp.bgp.message.update.attribute.community.extended import ExtendedCommunityIPv6
19
20
21
# ================================================================== TrafficRate
22
# RFC 5575
23
24 View Code Duplication
@ExtendedCommunity.register
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
25
class TrafficRate (ExtendedCommunity):
26
	COMMUNITY_TYPE = 0x80
27
	COMMUNITY_SUBTYPE = 0x06
28
29
	__slots__ = ['asn','rate']
30
31
	def __init__ (self, asn, rate, community=None):
32
		self.asn = asn
33
		self.rate = rate
34
		ExtendedCommunity.__init__(
35
			self,
36
			community if community is not None else pack(
37
				"!2sHf",
38
				self._subtype(),
39
				asn,rate
40
			)
41
		)
42
43
	def __repr__ (self):
44
		return "rate-limit:%d" % self.rate
45
46
	@staticmethod
47
	def unpack (data):
48
		asn,rate = unpack('!Hf',data[2:8])
49
		return TrafficRate(ASN(asn),rate,data[:8])
50
51
52
# ================================================================ TrafficAction
53
# RFC 5575
54
55
@ExtendedCommunity.register
56
class TrafficAction (ExtendedCommunity):
57
	COMMUNITY_TYPE = 0x80
58
	COMMUNITY_SUBTYPE = 0x07
59
60
	_sample = {
61
		False: 0x0,
62
		True:  0x2,
63
	}
64
65
	_terminal = {
66
		False: 0x0,
67
		True:  0x1,
68
	}
69
70
	__slots__ = ['sample','terminal']
71
72
	def __init__ (self, sample, terminal, community=None):
73
		self.sample = sample
74
		self.terminal = terminal
75
		bitmask = self._sample[sample] | self._terminal[terminal]
76
		ExtendedCommunity.__init__(
77
			self,
78
			community if community is not None else pack(
79
				'!2sLBB',
80
				self._subtype(),
81
				0,0,bitmask
82
			)
83
		)
84
85
	def __repr__ (self):
86
		s = []
87
		if self.sample:
88
			s.append('sample')
89
		if self.terminal:
90
			s.append('terminal')
91
		return 'action %s' % '-'.join(s)
92
93
	@staticmethod
94
	def unpack (data):
95
		bit, = unpack('!B',data[7:8])
96
		sample = bool(bit & 0x02)
97
		terminal = bool(bit & 0x01)
98
		return TrafficAction(sample,terminal,data[:8])
99
100
101
# ============================================================== TrafficRedirect
102
# RFC 5575 and 7674
103
104 View Code Duplication
@ExtendedCommunity.register
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
105
class TrafficRedirect (ExtendedCommunity):
106
	COMMUNITY_TYPE = 0x80
107
	COMMUNITY_SUBTYPE = 0x08
108
109
	__slots__ = ['asn','target']
110
111
	def __init__ (self, asn, target, community=None):
112
		self.asn = asn
113
		self.target = target
114
		ExtendedCommunity.__init__(
115
			self,
116
			community if community is not None else pack(
117
				"!2sHL",
118
				self._subtype(),
119
				asn,target
120
			)
121
		)
122
123
	def __repr__ (self):
124
		return "redirect:%s:%s" % (self.asn,self.target)
125
126
	@staticmethod
127
	def unpack (data):
128
		asn,target = unpack('!HL',data[2:8])
129
		return TrafficRedirect(ASN(asn),target,data[:8])
130
131
132 View Code Duplication
@ExtendedCommunity.register
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
133
class TrafficRedirectASN4 (ExtendedCommunity):
134
	COMMUNITY_TYPE = 0x82
135
	COMMUNITY_SUBTYPE = 0x08
136
137
	__slots__ = ['asn', 'target']
138
139
	def __init__(self, asn, target, community=None):
140
		self.asn = asn
141
		self.target = target
142
		ExtendedCommunity.__init__(
143
			self,
144
			community if community is not None else pack(
145
				"!2sLH",
146
				self._subtype(),
147
					asn, target
148
			)
149
		)
150
151
	def __str__(self):
152
		return "redirect:%s:%s" % (self.asn, self.target)
153
154
	@staticmethod
155
	def unpack(data):
156
		asn, target = unpack('!LH', data[2:8])
157
		return TrafficRedirectASN4(ASN4(asn), target, data[:8])
158
159
# ================================================================== TrafficMark
160
# RFC 5575
161
162 View Code Duplication
@ExtendedCommunity.register
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
163
class TrafficMark (ExtendedCommunity):
164
	COMMUNITY_TYPE = 0x80
165
	COMMUNITY_SUBTYPE = 0x09
166
167
	__slots__ = ['dscp']
168
169
	def __init__ (self, dscp, community=None):
170
		self.dscp = dscp
171
		ExtendedCommunity.__init__(
172
			self,
173
			community if community is not None else pack(
174
				"!2sLBB",
175
				self._subtype(),
176
				0,0,dscp
177
			)
178
		)
179
180
	def __repr__ (self):
181
		return "mark %d" % self.dscp
182
183
	@staticmethod
184
	def unpack (data):
185
		dscp, = unpack('!B',data[7:8])
186
		return TrafficMark(dscp,data[:8])
187
188
# =============================================================== TrafficNextHopIPv4IETF
189
# draft-ietf-idr-flowspec-redirect-02
190
# see RFC 4360 for ipv4 address specific extended community format
191
192
@ExtendedCommunity.register
193
class TrafficNextHopIPv4IETF (ExtendedCommunity):
194
	COMMUNITY_TYPE = 0x01
195
	COMMUNITY_SUBTYPE = 0x0C
196
197
	__slots__ = ['ip', 'copy']
198
199
	def __init__ (self, ip, copy, community=None):
200
		self.ip = ip
201
		self.copy = copy
202
		ExtendedCommunity.__init__(
203
			self,
204
			community if community is not None else pack(
205
				"!2s4sH",
206
				self._subtype(),
207
				ip.pack(),
208
				1 if copy else 0
209
			)
210
		)
211
212
	def __repr__ (self):
213
		return "copy-to-nexthop-ietf" if self.copy else "redirect-to-nexthop-ietf"
214
215
	@staticmethod
216
	def unpack (data):
217
		ip, bit = unpack('!4sH', data[2:8])
218
		return TrafficNextHopIPv4IETF(IPv4.ntop(ip), bool(bit & 0x01), data[:8])
219
220
# =============================================================== TrafficNextHopIPv6IETF
221
# draft-ietf-idr-flowspec-redirect-02
222
# see RFC 5701 for ipv6 address specific extended community format
223
224
@ExtendedCommunity.register
225
class TrafficNextHopIPv6IETF (ExtendedCommunityIPv6):
226
	COMMUNITY_TYPE = 0x00
227
	COMMUNITY_SUBTYPE = 0x0C
228
229
	__slots__ = ['ip', 'copy']
230
231
	def __init__ (self, ip, copy, community=None):
232
		self.ip = ip
233
		self.copy = copy
234
		ExtendedCommunityIPv6.__init__(
235
			self,
236
			community if community is not None else pack(
237
				"!2s16sH",
238
				self._subtype(),
239
				ip.pack(),
240
				1 if copy else 0
241
			)
242
		)
243
244
	def __repr__ (self):
245
		return "copy-to-nexthop-ietf" if self.copy else "redirect-to-nexthop-ietf"
246
247
	@staticmethod
248
	def unpack (data):
249
		ip, bit = unpack('!16sH', data[2:20])
250
		return TrafficNextHopIPv6IETF(IPv6.ntop(ip), bool(bit & 0x01), data[:20])
251
252
# =============================================================== TrafficNextHopSimpson
253
# draft-simpson-idr-flowspec-redirect-02
254
255
# XXX: FIXME: I guess this should be a subclass of NextHop or IP ..
256
257 View Code Duplication
@ExtendedCommunity.register
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
258
class TrafficNextHopSimpson (ExtendedCommunity):
259
	COMMUNITY_TYPE = 0x08
260
	COMMUNITY_SUBTYPE = 0x00
261
262
	__slots__ = ['copy']
263
264
	def __init__ (self, copy, community=None):
265
		self.copy = copy
266
		ExtendedCommunity.__init__(
267
			self,
268
			community if community is not None else pack(
269
				"!2sLH",
270
				self._subtype(),
271
				0,1 if copy else 0
272
			)
273
		)
274
275
	def __repr__ (self):
276
		return "copy-to-nexthop" if self.copy else "redirect-to-nexthop"
277
278
	@staticmethod
279
	def unpack (data):
280
		bit, = unpack('!B',data[7:8])
281
		return TrafficNextHopSimpson(bool(bit & 0x01), data[:8])
282
283
284
# ============================================================ TrafficRedirectIP
285
# RFC 5575
286
# If we need to provide the <IP>:<ASN> form for the FlowSpec Redirect ...
287
288
# import socket
289
# Do not use socket, use IPv4.ntop or pton
290
291
# TrafficRedirectASN = TrafficRedirect
292
293
# class TrafficRedirectIP (ExtendedCommunity):
294
# 	COMMUNITY_TYPE = 0x80
295
# 	COMMUNITY_SUBTYPE = 0x08
296
297
# 	def __init__ (self, ip, target, community=None):
298
# 		self.ip = ip
299
# 		self.target = target
300
# 		ExtendedCommunity.__init__(self,community if community is not None else pack("!BB4sH",0x80,0x08,socket.inet_pton(socket.AF_INET,ip),target))
301
302
# 	def __str__ (self):
303
# 		return "redirect %s:%d" % (self.ip,self.target)
304
305
# 	@staticmethod
306
# 	def unpack (data):
307
# 		ip,target = unpack('!4sH',data[2:8])
308
# 		return TrafficRedirectIP(socket.inet_ntop(socket.AF_INET,ip),target,data[:8])
309