Completed
Pull Request — master (#84)
by
unknown
02:17
created

build.v0x04.match_fields   F

Complexity

Total Complexity 92

Size/Duplication

Total Lines 721
Duplicated Lines 30.51 %

Test Coverage

Coverage 52.41%

Importance

Changes 0
Metric Value
wmc 92
eloc 440
dl 220
loc 721
ccs 76
cts 145
cp 0.5241
rs 2
c 0
b 0
f 0

64 Methods

Rating   Name   Duplication   Size   Complexity  
A MatchField.oxm_field() 0 5 1
A MatchField.__init__() 0 3 1
A MatchTCPDst.as_of_tlv() 0 4 1
A MatchField.from_of_tlv() 0 4 1
A MatchTCPSrc.from_of_tlv() 0 5 1
A MatchField.__eq__() 0 7 1
A MatchInPort.from_of_tlv() 0 5 1
A MatchTCPSrc.as_of_tlv() 0 4 1
A MatchField.as_of_tlv() 0 3 1
A MatchNwDst.from_of_tlv() 10 10 2
A MatchDLVLAN.from_of_tlv() 0 9 2
A MatchDLSrc.from_of_tlv() 12 13 2
A MatchDLDst.as_of_tlv() 20 20 4
A MatchDLDst.from_of_tlv() 12 13 2
A MatchNwDst.as_of_tlv() 9 9 2
A MatchDLVLAN.as_of_tlv() 0 17 3
A MatchNwProto.as_of_tlv() 0 4 1
A MatchDLVLANPCP.from_of_tlv() 0 5 1
A MatchTCPDst.from_of_tlv() 0 5 1
A MatchNwSrc.from_of_tlv() 10 10 2
A MatchDLType.from_of_tlv() 0 5 1
A MatchInPort.as_of_tlv() 0 4 1
A MatchField.name() 0 5 1
A MatchDLVLANPCP.as_of_tlv() 0 4 1
A MatchNwProto.from_of_tlv() 0 5 1
A MatchDLType.as_of_tlv() 0 4 1
A MatchNwSrc.as_of_tlv() 9 9 2
A MatchDLSrc.as_of_tlv() 20 20 4
A MatchICMPV6Code.from_of_tlv() 0 5 1
A MatchFieldFactory._index_classes() 0 5 2
A MatchICMPV4Code.as_of_tlv() 0 4 1
A MatchNDTLL.from_of_tlv() 0 5 1
A MatchInPhyPort.from_of_tlv() 0 5 1
A MatchARPOP.as_of_tlv() 0 4 1
A MatchICMPV4Type.from_of_tlv() 0 5 1
A MatchNDTarget.as_of_tlv() 0 4 1
A MatchUDPDst.from_of_tlv() 0 5 1
A MatchIPDSCP.from_of_tlv() 0 5 1
A MatchFieldFactory.from_name() 0 7 2
A MatchNDTLL.as_of_tlv() 0 4 1
A MatchUDPSrc.from_of_tlv() 0 5 1
A MatchNDTarget.from_of_tlv() 0 5 1
A MatchIPDSCP.as_of_tlv() 0 4 1
A MatchNDSLL.as_of_tlv() 0 4 1
A MatchFieldFactory.from_of_tlv() 0 7 2
A MatchICMPV6Type.as_of_tlv() 0 4 1
A MatchEXTHDR.from_of_tlv() 8 8 2
A MatchPBBISID.from_of_tlv() 8 8 2
A MatchPBBISID.as_of_tlv() 15 15 3
A MatchICMPV4Code.from_of_tlv() 0 5 1
A MatchNDSLL.from_of_tlv() 0 5 1
A MatchIVP6FLabel.from_of_tlv() 8 8 2
A MatchARPOP.from_of_tlv() 0 5 1
A MatchUDPDst.as_of_tlv() 0 4 1
A MatchUDPSrc.as_of_tlv() 0 4 1
A MatchInPhyPort.as_of_tlv() 0 4 1
A MatchICMPV6Type.from_of_tlv() 0 5 1
A MatchICMPV4Type.as_of_tlv() 0 4 1
A MatchIPECN.from_of_tlv() 0 5 1
A MatchIVP6FLabel.as_of_tlv() 15 15 3
A MatchICMPV6Code.as_of_tlv() 0 4 1
A MatchFieldFactory._get_class() 0 6 2
A MatchIPECN.as_of_tlv() 0 4 1
A MatchEXTHDR.as_of_tlv() 15 15 3

How to fix   Duplicated Code    Complexity   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

Complexity

 Tip:   Before tackling complexity, make sure that you eliminate any duplication first. This often can reduce the size of classes significantly.

Complex classes like build.v0x04.match_fields often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""OpenFlow 1.3 OXM match fields.
2
3
Flow's match is very different from OF 1.0. Instead of always having all
4
fields, there's a variable list of match fields and each one is an Openflow
5
eXtended Match Type-Length-Value (OXM TLV) element.
6
7
This module provides high-level Python classes for OXM TLV fields in order to
8
make the OF 1.3 match fields easy to use and to be coded.
9
"""
10 1
from abc import ABC, abstractmethod
11
12 1
from pyof.foundation.basic_types import HWAddress, IPAddress
13 1
from pyof.v0x04.common.flow_match import OxmOfbMatchField, OxmTLV, VlanId
14
15
from napps.kytos.of_core.v0x04.utils import bytes_to_mask, mask_to_bytes
16 1
17
18
class MatchField(ABC):
19
    """Base class for match fields. Abstract OXM TLVs of python-openflow.
20
21
    Just extend this class and you will be forced to define the required
22
    low-level attributes and methods below:
23
24
    * "name" attribute (field name to be displayed in JSON);
25
    * "oxm_field" attribute (``OxmOfbMatchField`` enum);
26
    * Method to return a pyof OxmTLV;
27
    * Method to create an instance from an OxmTLV.
28 1
    """
29
30
    def __init__(self, value):
31
        """Define match field value."""
32 1
        self.value = value
33 1
34 1
    @property
35
    @classmethod
36
    @abstractmethod
37
    def name(cls):
38
        """Define a name to be displayed in JSON.
39
40
        It can be overriden just by a class attibute.
41 1
        """
42 1
43 1
    @property
44
    @classmethod
45
    @abstractmethod
46
    def oxm_field(cls):
47
        """Define this subclass ``OxmOfbMatchField`` value.
48
49
        It can be overriden just by as a class attibute.
50 1
        """
51
52
    @abstractmethod
53
    def as_of_tlv(self):
54 1
        """Return a pyof OXM TLV instance."""
55 1
56
    @classmethod
57
    @abstractmethod
58
    def from_of_tlv(cls, tlv):
59 1
        """Return an instance from a pyof OXM TLV."""
60
61
    def __eq__(self, other):
62
        """Two objects are equal if their values are the same.
63
64
        The oxm_field equality is checked indirectly when comparing whether
65
        the objects are instances of the same class.
66
        """
67
        return isinstance(other, self.__class__) and other.value == self.value
68 1
69
70
class MatchDLVLAN(MatchField):
71 1
    """Match for datalink VLAN ID."""
72 1
73
    name = 'dl_vlan'
74 1
    oxm_field = OxmOfbMatchField.OFPXMT_OFB_VLAN_VID
75
76
    def as_of_tlv(self):
77
        """Return a pyof OXM TLV instance."""
78
        try:
79
            value = int(self.value)
80 1
            mask = None
81
            oxm_hasmask = False
82
        except ValueError:
83
            value, mask = map(int, self.value.split('/'))
84
            oxm_hasmask = True
85
        value = value | VlanId.OFPVID_PRESENT
86
        value_bytes = value.to_bytes(2, 'big')
87 1
        if mask:
88
            mask = mask | VlanId.OFPVID_PRESENT
89
            value_bytes += mask.to_bytes(2, 'big')
90 1
        return OxmTLV(oxm_field=self.oxm_field,
91 1
                      oxm_hasmask=oxm_hasmask,
92
                      oxm_value=value_bytes)
93 1
94
    @classmethod
95
    def from_of_tlv(cls, tlv):
96
        """Return an instance from a pyof OXM TLV."""
97
        vlan_id = int.from_bytes(tlv.oxm_value[:2], 'big') & 4095
98 1
        value = vlan_id
99
        if tlv.oxm_hasmask:
100
            vlan_mask = int.from_bytes(tlv.oxm_value[2:], 'big') & 4095
101
            value = f'{vlan_id}/{vlan_mask}'
102
        return cls(value)
103
104
105 1
class MatchDLVLANPCP(MatchField):
106
    """Match for VLAN Priority Code Point."""
107
108 1
    name = 'dl_vlan_pcp'
109 1
    oxm_field = OxmOfbMatchField.OFPXMT_OFB_VLAN_PCP
110
111 1
    def as_of_tlv(self):
112
        """Return a pyof OXM TLV instance."""
113
        value_bytes = self.value.to_bytes(1, 'big')
114
        return OxmTLV(oxm_field=self.oxm_field, oxm_value=value_bytes)
115
116 1
    @classmethod
117
    def from_of_tlv(cls, tlv):
118
        """Return an instance from a pyof OXM TLV."""
119
        priority = int.from_bytes(tlv.oxm_value, 'big')
120
        return cls(priority)
121
122
123 View Code Duplication
class MatchDLSrc(MatchField):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
124
    """Match for datalink source."""
125 1
126
    name = 'dl_src'
127
    oxm_field = OxmOfbMatchField.OFPXMT_OFB_ETH_SRC
128 1
129 1
    def as_of_tlv(self):
130
        """Return a pyof OXM TLV instance."""
131 1
        if '/' in self.value:
132
            value, mask = self.value.split('/')
133
            if mask.upper() == 'FF:FF:FF:FF:FF:FF':
134
                mask = None
135
                oxm_hasmask = False
136 1
            else:
137
                mask = mask.upper()
138
                oxm_hasmask = True
139
        else:
140
            value = self.value
141
            mask = None
142
            oxm_hasmask = False
143
        value_bytes = HWAddress(value).pack()
144
        if mask:
145 1
            value_bytes += HWAddress(mask).pack()
146
        return OxmTLV(oxm_field=self.oxm_field,
147
                      oxm_hasmask=oxm_hasmask,
148 1
                      oxm_value=value_bytes)
149 1
150
    @classmethod
151 1
    def from_of_tlv(cls, tlv):
152
        """Return an instance from a pyof OXM TLV."""
153
        hw_address = HWAddress()
154
        hw_address.unpack(tlv.oxm_value)
155
        addr_str = str(hw_address)
156 1
        value = addr_str
157
        if tlv.oxm_hasmask:
158
            hw_mask = HWAddress()
159
            hw_mask.unpack(tlv.oxm_value[6:])
160
            mask_str = str(hw_mask)
161
            value = f'{addr_str}/{mask_str}'
162
        return cls(value)
163 1
164
165 View Code Duplication
class MatchDLDst(MatchField):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
166 1
    """Match for datalink destination."""
167 1
168
    name = 'dl_dst'
169 1
    oxm_field = OxmOfbMatchField.OFPXMT_OFB_ETH_DST
170
171
    def as_of_tlv(self):
172
        """Return a pyof OXM TLV instance."""
173
        if '/' in self.value:
174 1
            value, mask = self.value.split('/')
175
            if mask.upper() == 'FF:FF:FF:FF:FF:FF':
176
                mask = None
177
                oxm_hasmask = False
178
            else:
179
                mask = mask.upper()
180
                oxm_hasmask = True
181
        else:
182
            value = self.value
183 1
            mask = None
184
            oxm_hasmask = False
185
        value_bytes = HWAddress(value).pack()
186 1
        if mask:
187 1
            value_bytes += HWAddress(mask).pack()
188
        return OxmTLV(oxm_field=self.oxm_field,
189 1
                      oxm_hasmask=oxm_hasmask,
190
                      oxm_value=value_bytes)
191
192
    @classmethod
193
    def from_of_tlv(cls, tlv):
194 1
        """Return an instance from a pyof OXM TLV."""
195
        hw_address = HWAddress()
196
        hw_address.unpack(tlv.oxm_value)
197
        addr_str = str(hw_address)
198
        value = addr_str
199
        if tlv.oxm_hasmask:
200
            hw_mask = HWAddress()
201
            hw_mask.unpack(tlv.oxm_value[6:])
202
            mask_str = str(hw_mask)
203 1
            value = f'{addr_str}/{mask_str}'
204
        return cls(value)
205
206 1
207 1
class MatchDLType(MatchField):
208
    """Match for datalink type."""
209 1
210
    name = 'dl_type'
211
    oxm_field = OxmOfbMatchField.OFPXMT_OFB_ETH_TYPE
212
213
    def as_of_tlv(self):
214 1
        """Return a pyof OXM TLV instance."""
215
        value_bytes = self.value.to_bytes(2, 'big')
216
        return OxmTLV(oxm_field=self.oxm_field, oxm_value=value_bytes)
217
218
    @classmethod
219
    def from_of_tlv(cls, tlv):
220
        """Return an instance from a pyof OXM TLV."""
221 1
        port = int.from_bytes(tlv.oxm_value, 'big')
222
        return cls(port)
223
224 1
225 1 View Code Duplication
class MatchNwSrc(MatchField):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
226
    """Match for IPV4 source."""
227 1
228
    name = 'nw_src'
229
    oxm_field = OxmOfbMatchField.OFPXMT_OFB_IPV4_SRC
230
231
    def as_of_tlv(self):
232 1
        """Return a pyof OXM TLV instance."""
233
        ip_addr = IPAddress(self.value)
234
        value_bytes = ip_addr.pack()
235
        if ip_addr.netmask < 32:
236
            value_bytes += mask_to_bytes(ip_addr.netmask, 32)
237
        return OxmTLV(oxm_field=self.oxm_field,
238
                      oxm_hasmask=ip_addr.netmask < 32,
239 1
                      oxm_value=value_bytes)
240
241
    @classmethod
242 1
    def from_of_tlv(cls, tlv):
243 1
        """Return an instance from a pyof OXM TLV."""
244
        ip_address = IPAddress()
245 1
        ip_address.unpack(tlv.oxm_value)
246
        addr_str = str(ip_address)
247
        value = addr_str
248
        if tlv.oxm_hasmask:
249
            value = f'{addr_str}/{bytes_to_mask(tlv.oxm_value[4:], 32)}'
250 1
        return cls(value)
251
252
253 View Code Duplication
class MatchNwDst(MatchField):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
254
    """Match for IPV4 destination."""
255
256
    name = 'nw_dst'
257 1
    oxm_field = OxmOfbMatchField.OFPXMT_OFB_IPV4_DST
258
259
    def as_of_tlv(self):
260 1
        """Return a pyof OXM TLV instance."""
261 1
        ip_addr = IPAddress(self.value)
262
        value_bytes = ip_addr.pack()
263 1
        if ip_addr.netmask < 32:
264
            value_bytes += mask_to_bytes(ip_addr.netmask, 32)
265
        return OxmTLV(oxm_field=self.oxm_field,
266
                      oxm_hasmask=ip_addr.netmask < 32,
267
                      oxm_value=value_bytes)
268 1
269
    @classmethod
270
    def from_of_tlv(cls, tlv):
271
        """Return an instance from a pyof OXM TLV."""
272
        ip_address = IPAddress()
273
        ip_address.unpack(tlv.oxm_value)
274
        addr_str = str(ip_address)
275 1
        value = addr_str
276
        if tlv.oxm_hasmask:
277
            value = f'{addr_str}/{bytes_to_mask(tlv.oxm_value[4:], 32)}'
278
        return cls(value)
279
280
281
class MatchNwProto(MatchField):
282
    """Match for IP protocol."""
283 1
284
    name = 'nw_proto'
285 1
    oxm_field = OxmOfbMatchField.OFPXMT_OFB_IP_PROTO
286
287
    def as_of_tlv(self):
288
        """Return a pyof OXM TLV instance."""
289
        value_bytes = self.value.to_bytes(1, 'big')
290
        return OxmTLV(oxm_field=self.oxm_field, oxm_value=value_bytes)
291
292
    @classmethod
293 1
    def from_of_tlv(cls, tlv):
294
        """Return an instance from a pyof OXM TLV."""
295
        priority = int.from_bytes(tlv.oxm_value, 'big')
296
        return cls(priority)
297
298
299
class MatchInPort(MatchField):
300
    """Match for input port."""
301 1
302
    name = 'in_port'
303
    oxm_field = OxmOfbMatchField.OFPXMT_OFB_IN_PORT
304
305
    def as_of_tlv(self):
306
        """Return a pyof OXM TLV instance."""
307
        value_bytes = self.value.to_bytes(4, 'big')
308 1
        return OxmTLV(oxm_field=self.oxm_field, oxm_value=value_bytes)
309
310
    @classmethod
311
    def from_of_tlv(cls, tlv):
312
        """Return an instance from a pyof OXM TLV."""
313
        port = int.from_bytes(tlv.oxm_value, 'big')
314
        return cls(port)
315
316
317
class MatchTCPSrc(MatchField):
318
    """Match for TCP source."""
319
320
    name = 'tp_src'
321
    oxm_field = OxmOfbMatchField.OFPXMT_OFB_TCP_SRC
322
323
    def as_of_tlv(self):
324
        """Return a pyof OXM TLV instance."""
325
        value_bytes = self.value.to_bytes(2, 'big')
326
        return OxmTLV(oxm_field=self.oxm_field, oxm_value=value_bytes)
327
328
    @classmethod
329
    def from_of_tlv(cls, tlv):
330
        """Return an instance from a pyof OXM TLV."""
331
        port = int.from_bytes(tlv.oxm_value, 'big')
332
        return cls(port)
333
334
335
class MatchTCPDst(MatchField):
336
    """Match for TCP destination."""
337
338
    name = 'tp_dst'
339
    oxm_field = OxmOfbMatchField.OFPXMT_OFB_TCP_DST
340
341
    def as_of_tlv(self):
342
        """Return a pyof OXM TLV instance."""
343
        value_bytes = self.value.to_bytes(2, 'big')
344
        return OxmTLV(oxm_field=self.oxm_field, oxm_value=value_bytes)
345
346
    @classmethod
347
    def from_of_tlv(cls, tlv):
348
        """Return an instance from a pyof OXM TLV."""
349
        port = int.from_bytes(tlv.oxm_value, 'big')
350
        return cls(port)
351
352
353
class MatchInPhyPort(MatchField):
354
    """Match for physical input port."""
355
356
    name = 'in_phy_port'
357
    oxm_field = OxmOfbMatchField.OFPXMT_OFB_IN_PHY_PORT
358
359
    def as_of_tlv(self):
360
        """Return a pyof OXM TLV instance."""
361
        value_bytes = self.value.to_bytes(4, 'big')
362
        return OxmTLV(oxm_field=self.oxm_field, oxm_value=value_bytes)
363
364
    @classmethod
365
    def from_of_tlv(cls, tlv):
366
        """Return an instance from a pyof OXM TLV."""
367
        port = int.from_bytes(tlv.oxm_value, 'big')
368
        return cls(port)
369
370
371
class MatchIPDSCP(MatchField):
372
    """Match for IP DSCP."""
373
374
    name = 'ip_dscp'
375
    oxm_field = OxmOfbMatchField.OFPXMT_OFB_IP_DSCP
376
377
    def as_of_tlv(self):
378
        """Return a pyof OXM TLV instance."""
379
        value_bytes = self.value.to_bytes(1, 'big')
380
        return OxmTLV(oxm_field=self.oxm_field, oxm_value=value_bytes)
381
382
    @classmethod
383
    def from_of_tlv(cls, tlv):
384
        """Return an instance from a pyof OXM TLV."""
385
        value = int.from_bytes(tlv.oxm_value, 'big')
386
        return cls(value)
387
388
389
class MatchIPECN(MatchField):
390
    """Match for IP ECN."""
391
392
    name = 'ip_ecn'
393
    oxm_field = OxmOfbMatchField.OFPXMT_OFB_IP_ECN
394
395
    def as_of_tlv(self):
396
        """Return a pyof OXM TLV instance."""
397
        value_bytes = self.value.to_bytes(1, 'big')
398
        return OxmTLV(oxm_field=self.oxm_field, oxm_value=value_bytes)
399
400
    @classmethod
401
    def from_of_tlv(cls, tlv):
402
        """Return an instance from a pyof OXM TLV."""
403
        value = int.from_bytes(tlv.oxm_value, 'big')
404
        return cls(value)
405
406
407
class MatchUDPSrc(MatchField):
408
    """Match for UDP source."""
409
410
    name = 'udp_src'
411
    oxm_field = OxmOfbMatchField.OFPXMT_OFB_UDP_SRC
412
413
    def as_of_tlv(self):
414
        """Return a pyof OXM TLV instance."""
415
        value_bytes = self.value.to_bytes(2, 'big')
416
        return OxmTLV(oxm_field=self.oxm_field, oxm_value=value_bytes)
417
418
    @classmethod
419
    def from_of_tlv(cls, tlv):
420
        """Return an instance from a pyof OXM TLV."""
421
        port = int.from_bytes(tlv.oxm_value, 'big')
422
        return cls(port)
423
424
425
class MatchUDPDst(MatchField):
426
    """Match for UDP destination."""
427
428
    name = 'udp_dst'
429
    oxm_field = OxmOfbMatchField.OFPXMT_OFB_UDP_DST
430
431
    def as_of_tlv(self):
432
        """Return a pyof OXM TLV instance."""
433
        value_bytes = self.value.to_bytes(2, 'big')
434
        return OxmTLV(oxm_field=self.oxm_field, oxm_value=value_bytes)
435
436
    @classmethod
437
    def from_of_tlv(cls, tlv):
438
        """Return an instance from a pyof OXM TLV."""
439
        port = int.from_bytes(tlv.oxm_value, 'big')
440
        return cls(port)
441
442
443
class MatchICMPV4Type(MatchField):
444
    """Match for ICMPV4 type."""
445
446
    name = 'icmpv4_type'
447
    oxm_field = OxmOfbMatchField.OFPXMT_OFB_ICMPV4_TYPE
448
449
    def as_of_tlv(self):
450
        """Return a pyof OXM TLV instance."""
451
        value_bytes = self.value.to_bytes(1, 'big')
452
        return OxmTLV(oxm_field=self.oxm_field, oxm_value=value_bytes)
453
454
    @classmethod
455
    def from_of_tlv(cls, tlv):
456
        """Return an instance from a pyof OXM TLV."""
457
        port = int.from_bytes(tlv.oxm_value, 'big')
458
        return cls(port)
459
460
461
class MatchICMPV4Code(MatchField):
462
    """Match for ICMPV4 code."""
463
464
    name = 'icmpv4_code'
465
    oxm_field = OxmOfbMatchField.OFPXMT_OFB_ICMPV4_CODE
466
467
    def as_of_tlv(self):
468
        """Return a pyof OXM TLV instance."""
469
        value_bytes = self.value.to_bytes(1, 'big')
470
        return OxmTLV(oxm_field=self.oxm_field, oxm_value=value_bytes)
471
472
    @classmethod
473
    def from_of_tlv(cls, tlv):
474
        """Return an instance from a pyof OXM TLV."""
475
        priority = int.from_bytes(tlv.oxm_value, 'big')
476
        return cls(priority)
477
478
479
class MatchARPOP(MatchField):
480
    """Match for ARP opcode."""
481
482
    name = 'arp_op'
483
    oxm_field = OxmOfbMatchField.OFPXMT_OFB_ARP_OP
484
485
    def as_of_tlv(self):
486
        """Return a pyof OXM TLV instance."""
487
        value_bytes = self.value.to_bytes(2, 'big')
488
        return OxmTLV(oxm_field=self.oxm_field, oxm_value=value_bytes)
489
490
    @classmethod
491
    def from_of_tlv(cls, tlv):
492
        """Return an instance from a pyof OXM TLV."""
493
        opcode = int.from_bytes(tlv.oxm_value, 'big')
494
        return cls(opcode)
495
496
497 View Code Duplication
class MatchIVP6FLabel(MatchField):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
498
    """Match for IPV6 Flow Label."""
499
500
    name = 'ipv6_flabel'
501
    oxm_field = OxmOfbMatchField.OFPXMT_OFB_IPV6_FLABEL
502
503
    def as_of_tlv(self):
504
        """Return a pyof OXM TLV instance."""
505
        try:
506
            value = int(self.value)
507
            mask = None
508
            oxm_hasmask = False
509
        except ValueError:
510
            value, mask = map(int, self.value.split('/'))
511
            oxm_hasmask = True
512
        value_bytes = value.to_bytes(4, 'big')
513
        if mask:
514
            value_bytes += mask.to_bytes(4, 'big')
515
        return OxmTLV(oxm_field=self.oxm_field,
516
                      oxm_hasmask=oxm_hasmask,
517
                      oxm_value=value_bytes)
518
519
    @classmethod
520
    def from_of_tlv(cls, tlv):
521
        """Return an instance from a pyof OXM TLV."""
522
        value = int.from_bytes(tlv.oxm_value[:4], 'big')
523
        if tlv.oxm_hasmask:
524
            flabel_mask = int.from_bytes(tlv.oxm_value[4:], 'big')
525
            value = f'{value}/{flabel_mask}'
526
        return cls(value)
527
528
529
class MatchICMPV6Type(MatchField):
530
    """Match for ICMPV6 type."""
531
532
    name = 'icmpv6_type'
533
    oxm_field = OxmOfbMatchField.OFPXMT_OFB_ICMPV6_TYPE
534
535
    def as_of_tlv(self):
536
        """Return a pyof OXM TLV instance."""
537
        value_bytes = self.value.to_bytes(1, 'big')
538
        return OxmTLV(oxm_field=self.oxm_field, oxm_value=value_bytes)
539
540
    @classmethod
541
    def from_of_tlv(cls, tlv):
542
        """Return an instance from a pyof OXM TLV."""
543
        port = int.from_bytes(tlv.oxm_value, 'big')
544
        return cls(port)
545
546
547
class MatchICMPV6Code(MatchField):
548
    """Match for ICMPV6 code."""
549
550
    name = 'icmpv6_code'
551
    oxm_field = OxmOfbMatchField.OFPXMT_OFB_ICMPV6_CODE
552
553
    def as_of_tlv(self):
554
        """Return a pyof OXM TLV instance."""
555
        value_bytes = self.value.to_bytes(1, 'big')
556
        return OxmTLV(oxm_field=self.oxm_field, oxm_value=value_bytes)
557
558
    @classmethod
559
    def from_of_tlv(cls, tlv):
560
        """Return an instance from a pyof OXM TLV."""
561
        priority = int.from_bytes(tlv.oxm_value, 'big')
562
        return cls(priority)
563
564
565
class MatchNDTarget(MatchField):
566
    """Match for IPV6 ND Target"""
567
568
    name = 'nd_tar'
569
    oxm_field = OxmOfbMatchField.OFPXMT_OFB_IPV6_ND_TARGET
570
571
    def as_of_tlv(self):
572
        """Return a pyof OXM TLV instance."""
573
        value_bytes = self.value.to_bytes(16,'big')
574
        return OxmTLV(oxm_field=self.oxm_field, oxm_value=value_bytes)
575
    
576
    @classmethod
577
    def from_of_tlv(cls, tlv):
578
        """Return an instance from a pyof OXM TLV."""
579
        target = int.from_bytes(tlv.oxm_value, 'big')
580
        return cls(target)
581
582
583
class MatchNDSLL(MatchField):
584
    """Match for IPV6 ND SLL"""
585
586
    name = 'nd_sll'
587
    oxm_field = OxmOfbMatchField.OFPXMT_OFB_IPV6_ND_SLL
588
589
    def as_of_tlv(self):
590
        """Return a pyof OXM TLV instance."""
591
        value_bytes = self.value.to_bytes(6,'big')
592
        return OxmTLV(oxm_field=self.oxm_field, oxm_value=value_bytes)
593
    
594
    @classmethod
595
    def from_of_tlv(cls, tlv):
596
        """Return an instance from a pyof OXM TLV."""
597
        sll = int.from_bytes(tlv.oxm_value, 'big')
598
        return cls(sll)
599
600
601
class MatchNDTLL(MatchField):
602
    """Match for IPV6 ND TLL"""
603
604
    name = 'nd_tll'
605
    oxm_field = OxmOfbMatchField.OFPXMT_OFB_IPV6_ND_TLL
606
607
    def as_of_tlv(self):
608
        """Return a pyof OXM TLV instance."""
609
        value_bytes = self.value.to_bytes(6,'big')
610
        return OxmTLV(oxm_field=self.oxm_field, oxm_value=value_bytes)
611
    
612
    @classmethod
613
    def from_of_tlv(cls, tlv):
614
        """Return an instance from a pyof OXM TLV."""
615
        tll = int.from_bytes(tlv.oxm_value, 'big')
616
        return cls(tll)
617
618
619 View Code Duplication
class MatchPBBISID(MatchField):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
620
    """Match for PBB ISID."""
621
622
    name = 'pbb_isid'
623
    oxm_field = OxmOfbMatchField.OFPXMT_OFB_PBB_ISID
624
625
    def as_of_tlv(self):
626
        """Return a pyof OXM TLV instance."""
627
        try:
628
            value = int(self.value)
629
            mask = None
630
            oxm_hasmask = False
631
        except ValueError:
632
            value, mask = map(int, self.value.split('/'))
633
            oxm_hasmask = True
634
        value_bytes = value.to_bytes(3, 'big')
635
        if mask:
636
            value_bytes += mask.to_bytes(3, 'big')
637
        return OxmTLV(oxm_field=self.oxm_field,
638
                      oxm_hasmask=oxm_hasmask,
639
                      oxm_value=value_bytes)
640
641
    @classmethod
642
    def from_of_tlv(cls, tlv):
643
        """Return an instance from a pyof OXM TLV."""
644
        value = int.from_bytes(tlv.oxm_value[:3], 'big')
645
        if tlv.oxm_hasmask:
646
            pbb_isid_mask = int.from_bytes(tlv.oxm_value[3:], 'big')
647
            value = f'{value}/{pbb_isid_mask}'
648
        return cls(value)
649
650
651 View Code Duplication
class MatchEXTHDR(MatchField):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
652
    """Match for IPV6 EXTHDR"""
653
654
    name = 'v6_hdr'
655
    oxm_field = OxmOfbMatchField.OFPXMT_OFB_IPV6_EXTHDR
656
657
    def as_of_tlv(self):
658
        """Return a pyof OXM TLV instance."""
659
        try:
660
            value = int(self.value)
661
            mask = None
662
            oxm_hasmask = False
663
        except ValueError:
664
            value, mask = map(int, self.value.split('/'))
665
            oxm_hasmask = True
666
        value_bytes = value.to_bytes(2, 'big')
667
        if mask:
668
            value_bytes += mask.to_bytes(2, 'big')
669
        return OxmTLV(oxm_field=self.oxm_field,
670
                      oxm_hasmask=oxm_hasmask,
671
                      oxm_value=value_bytes)
672
673
    @classmethod
674
    def from_of_tlv(cls, tlv):
675
        """Return an instance from a pyof OXM TLV."""
676
        value = int.from_bytes(tlv.oxm_value[:2],'big')
677
        if tlv.oxm_hasmask:
678
            exhead_mask = int.from_bytes(tlv.oxm_value[2:], 'big')
679
            value = f'{value}/{exhead_mask}'
680
        return cls(value)
681
682
683
class MatchFieldFactory(ABC):
684
    """Create the correct MatchField subclass instance.
685
686
    As OF 1.3 has many match fields and there are many ways to (un)pack their
687
    OxmTLV.oxm_value, this class does all the work of finding the correct
688
    MatchField class and instantiating the corresponding object.
689
    """
690
691
    __classes = {}
692
693
    @classmethod
694
    def from_name(cls, name, value):
695
        """Return the proper object from name and value."""
696
        field_class = cls._get_class(name)
697
        if field_class:
698
            return field_class(value)
699
        return None
700
701
    @classmethod
702
    def from_of_tlv(cls, tlv):
703
        """Return the proper object from a pyof OXM TLV."""
704
        field_class = cls._get_class(tlv.oxm_field)
705
        if field_class:
706
            return field_class.from_of_tlv(tlv)
707
        return None
708
709
    @classmethod
710
    def _get_class(cls, name_or_field):
711
        """Return the proper object from field name or OxmTLV.oxm_field."""
712
        if not cls.__classes:
713
            cls._index_classes()
714
        return cls.__classes.get(name_or_field)
715
716
    @classmethod
717
    def _index_classes(cls):
718
        for subclass in MatchField.__subclasses__():
719
            cls.__classes[subclass.name] = subclass
720
            cls.__classes[subclass.oxm_field] = subclass
721