Test Failed
Pull Request — master (#86)
by
unknown
02:04
created

build.v0x04.match_fields   F

Complexity

Total Complexity 99

Size/Duplication

Total Lines 741
Duplicated Lines 50.74 %

Test Coverage

Coverage 52.41%

Importance

Changes 0
Metric Value
wmc 99
eloc 480
dl 376
loc 741
ccs 76
cts 145
cp 0.5241
rs 2
c 0
b 0
f 0

58 Methods

Rating   Name   Duplication   Size   Complexity  
A MatchField.oxm_field() 0 5 1
A MatchField.__init__() 0 3 1
A MatchTCPDst.as_of_tlv() 0 4 1
A MatchField.from_of_tlv() 0 4 1
A MatchTCPSrc.from_of_tlv() 0 5 1
A MatchField.__eq__() 0 7 1
A MatchInPort.from_of_tlv() 0 5 1
A MatchTCPSrc.as_of_tlv() 0 4 1
A MatchField.as_of_tlv() 0 3 1
A MatchNwDst.from_of_tlv() 10 10 2
A MatchDLVLAN.from_of_tlv() 0 9 2
A MatchDLDst.as_of_tlv() 20 20 4
A MatchNwDst.as_of_tlv() 9 9 2
A MatchDLVLAN.as_of_tlv() 0 17 3
A MatchNwProto.as_of_tlv() 0 4 1
A MatchDLVLANPCP.from_of_tlv() 0 5 1
A MatchTCPDst.from_of_tlv() 0 5 1
A MatchNwSrc.from_of_tlv() 10 10 2
A MatchDLType.from_of_tlv() 0 5 1
A MatchInPort.as_of_tlv() 0 4 1
A MatchField.name() 0 5 1
A MatchDLVLANPCP.as_of_tlv() 0 4 1
A MatchNwProto.from_of_tlv() 0 5 1
A MatchDLType.as_of_tlv() 0 4 1
A MatchNwSrc.as_of_tlv() 9 9 2
A MatchDLSrc.as_of_tlv() 20 20 4
A MatchTunnelID.from_of_tlv() 8 8 2
A MatchFieldFactory._index_classes() 0 5 2
A MatchMPLSLabel.as_of_tlv() 0 4 1
A MatchMPLSTC.from_of_tlv() 0 5 1
A MatchMPLSTC.as_of_tlv() 0 4 1
A MatchARPTHA.from_of_tlv() 13 13 2
A MatchIPV6Dst.as_of_tlv() 9 9 2
A MatchARPTPA.from_of_tlv() 10 10 2
A MatchSCTPDst.from_of_tlv() 0 5 1
A MatchFieldFactory.from_name() 0 7 2
A MatchMetadata.as_of_tlv() 15 15 3
A MatchARPSHA.from_of_tlv() 13 13 2
A MatchSCTPSrc.from_of_tlv() 0 5 1
A MatchIPV6Src.as_of_tlv() 9 9 2
A MatchSCTPDst.as_of_tlv() 0 4 1
A MatchIPV6Src.from_of_tlv() 10 10 2
A MatchIPV6Dst.from_of_tlv() 10 10 2
A MatchMPLSBOS.from_of_tlv() 0 5 1
A MatchFieldFactory.from_of_tlv() 0 7 2
A MatchARPTHA.as_of_tlv() 20 20 4
A MatchMetadata.from_of_tlv() 8 8 2
A MatchDLSrc.from_of_tlv() 13 13 2
A MatchDLDst.from_of_tlv() 13 13 2
A MatchARPSHA.as_of_tlv() 20 20 4
A MatchTunnelID.as_of_tlv() 15 15 3
A MatchARPSPA.from_of_tlv() 10 10 2
A MatchARPSPA.as_of_tlv() 9 9 2
A MatchSCTPSrc.as_of_tlv() 0 4 1
A MatchFieldFactory._get_class() 0 6 2
A MatchARPTPA.as_of_tlv() 9 9 2
A MatchMPLSLabel.from_of_tlv() 0 5 1
A MatchMPLSBOS.as_of_tlv() 0 4 1

How to fix   Duplicated Code    Complexity   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

Complexity

 Tip:   Before tackling complexity, make sure that you eliminate any duplication first. This often can reduce the size of classes significantly.

Complex classes like build.v0x04.match_fields often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""OpenFlow 1.3 OXM match fields.
2
3
Flow's match is very different from OF 1.0. Instead of always having all
4
fields, there's a variable list of match fields and each one is an Openflow
5
eXtended Match Type-Length-Value (OXM TLV) element.
6
7
This module provides high-level Python classes for OXM TLV fields in order to
8
make the OF 1.3 match fields easy to use and to be coded.
9
"""
10 1
from abc import ABC, abstractmethod
11
12 1
from pyof.foundation.basic_types import HWAddress, IPAddress
13 1
from pyof.v0x04.common.flow_match import OxmOfbMatchField, OxmTLV, VlanId
14
15
from napps.kytos.of_core.v0x04.utils import bytes_to_mask, mask_to_bytes
16 1
17
18
class MatchField(ABC):
19
    """Base class for match fields. Abstract OXM TLVs of python-openflow.
20
21
    Just extend this class and you will be forced to define the required
22
    low-level attributes and methods below:
23
24
    * "name" attribute (field name to be displayed in JSON);
25
    * "oxm_field" attribute (``OxmOfbMatchField`` enum);
26
    * Method to return a pyof OxmTLV;
27
    * Method to create an instance from an OxmTLV.
28 1
    """
29
30
    def __init__(self, value):
31
        """Define match field value."""
32 1
        self.value = value
33 1
34 1
    @property
35
    @classmethod
36
    @abstractmethod
37
    def name(cls):
38
        """Define a name to be displayed in JSON.
39
40
        It can be overriden just by a class attibute.
41 1
        """
42 1
43 1
    @property
44
    @classmethod
45
    @abstractmethod
46
    def oxm_field(cls):
47
        """Define this subclass ``OxmOfbMatchField`` value.
48
49
        It can be overriden just by as a class attibute.
50 1
        """
51
52
    @abstractmethod
53
    def as_of_tlv(self):
54 1
        """Return a pyof OXM TLV instance."""
55 1
56
    @classmethod
57
    @abstractmethod
58
    def from_of_tlv(cls, tlv):
59 1
        """Return an instance from a pyof OXM TLV."""
60
61
    def __eq__(self, other):
62
        """Two objects are equal if their values are the same.
63
64
        The oxm_field equality is checked indirectly when comparing whether
65
        the objects are instances of the same class.
66
        """
67
        return isinstance(other, self.__class__) and other.value == self.value
68 1
69
70
class MatchDLVLAN(MatchField):
71 1
    """Match for datalink VLAN ID."""
72 1
73
    name = 'dl_vlan'
74 1
    oxm_field = OxmOfbMatchField.OFPXMT_OFB_VLAN_VID
75
76
    def as_of_tlv(self):
77
        """Return a pyof OXM TLV instance."""
78
        try:
79
            value = int(self.value)
80 1
            mask = None
81
            oxm_hasmask = False
82
        except ValueError:
83
            value, mask = map(int, self.value.split('/'))
84
            oxm_hasmask = True
85
        value = value | VlanId.OFPVID_PRESENT
86
        value_bytes = value.to_bytes(2, 'big')
87 1
        if mask:
88
            mask = mask | VlanId.OFPVID_PRESENT
89
            value_bytes += mask.to_bytes(2, 'big')
90 1
        return OxmTLV(oxm_field=self.oxm_field,
91 1
                      oxm_hasmask=oxm_hasmask,
92
                      oxm_value=value_bytes)
93 1
94
    @classmethod
95
    def from_of_tlv(cls, tlv):
96
        """Return an instance from a pyof OXM TLV."""
97
        vlan_id = int.from_bytes(tlv.oxm_value[:2], 'big') & 4095
98 1
        value = vlan_id
99
        if tlv.oxm_hasmask:
100
            vlan_mask = int.from_bytes(tlv.oxm_value[2:], 'big') & 4095
101
            value = f'{vlan_id}/{vlan_mask}'
102
        return cls(value)
103
104
105 1
class MatchDLVLANPCP(MatchField):
106
    """Match for VLAN Priority Code Point."""
107
108 1
    name = 'dl_vlan_pcp'
109 1
    oxm_field = OxmOfbMatchField.OFPXMT_OFB_VLAN_PCP
110
111 1
    def as_of_tlv(self):
112
        """Return a pyof OXM TLV instance."""
113
        value_bytes = self.value.to_bytes(1, 'big')
114
        return OxmTLV(oxm_field=self.oxm_field, oxm_value=value_bytes)
115
116 1
    @classmethod
117
    def from_of_tlv(cls, tlv):
118
        """Return an instance from a pyof OXM TLV."""
119
        priority = int.from_bytes(tlv.oxm_value, 'big')
120
        return cls(priority)
121
122
123 View Code Duplication
class MatchDLSrc(MatchField):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
124
    """Match for datalink source."""
125 1
126
    name = 'dl_src'
127
    oxm_field = OxmOfbMatchField.OFPXMT_OFB_ETH_SRC
128 1
129 1
    def as_of_tlv(self):
130
        """Return a pyof OXM TLV instance."""
131 1
        if '/' in self.value:
132
            value, mask = self.value.split('/')
133
            if mask.upper() == 'FF:FF:FF:FF:FF:FF':
134
                mask = None
135
                oxm_hasmask = False
136 1
            else:
137
                mask = mask.upper()
138
                oxm_hasmask = True
139
        else:
140
            value = self.value
141
            mask = None
142
            oxm_hasmask = False
143
        value_bytes = HWAddress(value).pack()
144
        if mask:
145 1
            value_bytes += HWAddress(mask).pack()
146
        return OxmTLV(oxm_field=self.oxm_field,
147
                      oxm_hasmask=oxm_hasmask,
148 1
                      oxm_value=value_bytes)
149 1
150
    @classmethod
151 1
    def from_of_tlv(cls, tlv):
152
        """Return an instance from a pyof OXM TLV."""
153
        hw_address = HWAddress()
154
        hw_address.unpack(tlv.oxm_value)
155
        addr_str = str(hw_address)
156 1
        value = addr_str
157
        if tlv.oxm_hasmask:
158
            hw_mask = HWAddress()
159
            hw_mask.unpack(tlv.oxm_value[6:])
160
            mask_str = str(hw_mask)
161
            value = f'{addr_str}/{mask_str}'
162
        return cls(value)
163 1
164
165 View Code Duplication
class MatchDLDst(MatchField):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
166 1
    """Match for datalink destination."""
167 1
168
    name = 'dl_dst'
169 1
    oxm_field = OxmOfbMatchField.OFPXMT_OFB_ETH_DST
170
171
    def as_of_tlv(self):
172
        """Return a pyof OXM TLV instance."""
173
        if '/' in self.value:
174 1
            value, mask = self.value.split('/')
175
            if mask.upper() == 'FF:FF:FF:FF:FF:FF':
176
                mask = None
177
                oxm_hasmask = False
178
            else:
179
                mask = mask.upper()
180
                oxm_hasmask = True
181
        else:
182
            value = self.value
183 1
            mask = None
184
            oxm_hasmask = False
185
        value_bytes = HWAddress(value).pack()
186 1
        if mask:
187 1
            value_bytes += HWAddress(mask).pack()
188
        return OxmTLV(oxm_field=self.oxm_field,
189 1
                      oxm_hasmask=oxm_hasmask,
190
                      oxm_value=value_bytes)
191
192
    @classmethod
193
    def from_of_tlv(cls, tlv):
194 1
        """Return an instance from a pyof OXM TLV."""
195
        hw_address = HWAddress()
196
        hw_address.unpack(tlv.oxm_value)
197
        addr_str = str(hw_address)
198
        value = addr_str
199
        if tlv.oxm_hasmask:
200
            hw_mask = HWAddress()
201
            hw_mask.unpack(tlv.oxm_value[6:])
202
            mask_str = str(hw_mask)
203 1
            value = f'{addr_str}/{mask_str}'
204
        return cls(value)
205
206 1
207 1
class MatchDLType(MatchField):
208
    """Match for datalink type."""
209 1
210
    name = 'dl_type'
211
    oxm_field = OxmOfbMatchField.OFPXMT_OFB_ETH_TYPE
212
213
    def as_of_tlv(self):
214 1
        """Return a pyof OXM TLV instance."""
215
        value_bytes = self.value.to_bytes(2, 'big')
216
        return OxmTLV(oxm_field=self.oxm_field, oxm_value=value_bytes)
217
218
    @classmethod
219
    def from_of_tlv(cls, tlv):
220
        """Return an instance from a pyof OXM TLV."""
221 1
        port = int.from_bytes(tlv.oxm_value, 'big')
222
        return cls(port)
223
224 1
225 1 View Code Duplication
class MatchNwSrc(MatchField):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
226
    """Match for IPV4 source."""
227 1
228
    name = 'nw_src'
229
    oxm_field = OxmOfbMatchField.OFPXMT_OFB_IPV4_SRC
230
231
    def as_of_tlv(self):
232 1
        """Return a pyof OXM TLV instance."""
233
        ip_addr = IPAddress(self.value)
234
        value_bytes = ip_addr.pack()
235
        if ip_addr.netmask < 32:
236
            value_bytes += mask_to_bytes(ip_addr.netmask, 32)
237
        return OxmTLV(oxm_field=self.oxm_field,
238
                      oxm_hasmask=ip_addr.netmask < 32,
239 1
                      oxm_value=value_bytes)
240
241
    @classmethod
242 1
    def from_of_tlv(cls, tlv):
243 1
        """Return an instance from a pyof OXM TLV."""
244
        ip_address = IPAddress()
245 1
        ip_address.unpack(tlv.oxm_value)
246
        addr_str = str(ip_address)
247
        value = addr_str
248
        if tlv.oxm_hasmask:
249
            value = f'{addr_str}/{bytes_to_mask(tlv.oxm_value[4:], 32)}'
250 1
        return cls(value)
251
252
253 View Code Duplication
class MatchNwDst(MatchField):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
254
    """Match for IPV4 destination."""
255
256
    name = 'nw_dst'
257 1
    oxm_field = OxmOfbMatchField.OFPXMT_OFB_IPV4_DST
258
259
    def as_of_tlv(self):
260 1
        """Return a pyof OXM TLV instance."""
261 1
        ip_addr = IPAddress(self.value)
262
        value_bytes = ip_addr.pack()
263 1
        if ip_addr.netmask < 32:
264
            value_bytes += mask_to_bytes(ip_addr.netmask, 32)
265
        return OxmTLV(oxm_field=self.oxm_field,
266
                      oxm_hasmask=ip_addr.netmask < 32,
267
                      oxm_value=value_bytes)
268 1
269
    @classmethod
270
    def from_of_tlv(cls, tlv):
271
        """Return an instance from a pyof OXM TLV."""
272
        ip_address = IPAddress()
273
        ip_address.unpack(tlv.oxm_value)
274
        addr_str = str(ip_address)
275 1
        value = addr_str
276
        if tlv.oxm_hasmask:
277
            value = f'{addr_str}/{bytes_to_mask(tlv.oxm_value[4:], 32)}'
278
        return cls(value)
279
280
281
class MatchNwProto(MatchField):
282
    """Match for IP protocol."""
283 1
284
    name = 'nw_proto'
285 1
    oxm_field = OxmOfbMatchField.OFPXMT_OFB_IP_PROTO
286
287
    def as_of_tlv(self):
288
        """Return a pyof OXM TLV instance."""
289
        value_bytes = self.value.to_bytes(1, 'big')
290
        return OxmTLV(oxm_field=self.oxm_field, oxm_value=value_bytes)
291
292
    @classmethod
293 1
    def from_of_tlv(cls, tlv):
294
        """Return an instance from a pyof OXM TLV."""
295
        priority = int.from_bytes(tlv.oxm_value, 'big')
296
        return cls(priority)
297
298
299
class MatchInPort(MatchField):
300
    """Match for input port."""
301 1
302
    name = 'in_port'
303
    oxm_field = OxmOfbMatchField.OFPXMT_OFB_IN_PORT
304
305
    def as_of_tlv(self):
306
        """Return a pyof OXM TLV instance."""
307
        value_bytes = self.value.to_bytes(4, 'big')
308 1
        return OxmTLV(oxm_field=self.oxm_field, oxm_value=value_bytes)
309
310
    @classmethod
311
    def from_of_tlv(cls, tlv):
312
        """Return an instance from a pyof OXM TLV."""
313
        port = int.from_bytes(tlv.oxm_value, 'big')
314
        return cls(port)
315
316
317
class MatchTCPSrc(MatchField):
318
    """Match for TCP source."""
319
320
    name = 'tp_src'
321
    oxm_field = OxmOfbMatchField.OFPXMT_OFB_TCP_SRC
322
323
    def as_of_tlv(self):
324
        """Return a pyof OXM TLV instance."""
325
        value_bytes = self.value.to_bytes(2, 'big')
326
        return OxmTLV(oxm_field=self.oxm_field, oxm_value=value_bytes)
327
328
    @classmethod
329
    def from_of_tlv(cls, tlv):
330
        """Return an instance from a pyof OXM TLV."""
331
        port = int.from_bytes(tlv.oxm_value, 'big')
332
        return cls(port)
333
334
335
class MatchTCPDst(MatchField):
336
    """Match for TCP destination."""
337
338
    name = 'tp_dst'
339
    oxm_field = OxmOfbMatchField.OFPXMT_OFB_TCP_DST
340
341
    def as_of_tlv(self):
342
        """Return a pyof OXM TLV instance."""
343
        value_bytes = self.value.to_bytes(2, 'big')
344
        return OxmTLV(oxm_field=self.oxm_field, oxm_value=value_bytes)
345
346
    @classmethod
347
    def from_of_tlv(cls, tlv):
348
        """Return an instance from a pyof OXM TLV."""
349
        port = int.from_bytes(tlv.oxm_value, 'big')
350
        return cls(port)
351
352
353
class MatchSCTPSrc(MatchField):
354
    """Match for SCTP source."""
355
356
    name = 'sctp_src'
357
    oxm_field = OxmOfbMatchField.OFPXMT_OFB_SCTP_SRC
358
359
    def as_of_tlv(self):
360
        """Return a pyof OXM TLV instance."""
361
        value_bytes = self.value.to_bytes(2, 'big')
362
        return OxmTLV(oxm_field=self.oxm_field, oxm_value=value_bytes)
363
364
    @classmethod
365
    def from_of_tlv(cls, tlv):
366
        """Return an instance from a pyof OXM TLV."""
367
        priority = int.from_bytes(tlv.oxm_value, 'big')
368
        return cls(priority)
369
370
371
class MatchSCTPDst(MatchField):
372
    """Match for SCTP destination."""
373
374
    name = 'sctp_dst'
375
    oxm_field = OxmOfbMatchField.OFPXMT_OFB_SCTP_DST
376
377
    def as_of_tlv(self):
378
        """Return a pyof OXM TLV instance."""
379
        value_bytes = self.value.to_bytes(2, 'big')
380
        return OxmTLV(oxm_field=self.oxm_field, oxm_value=value_bytes)
381
382
    @classmethod
383
    def from_of_tlv(cls, tlv):
384
        """Return an instance from a pyof OXM TLV."""
385
        priority = int.from_bytes(tlv.oxm_value, 'big')
386
        return cls(priority)
387
388
389 View Code Duplication
class MatchARPSPA(MatchField):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
390
    """Match for ARP Sender IP Address."""
391
392
    name = 'arp_spa'
393
    oxm_field = OxmOfbMatchField.OFPXMT_OFB_ARP_SPA
394
395
    def as_of_tlv(self):
396
        """Return a pyof OXM TLV instance."""
397
        ip_addr = IPAddress(self.value)
398
        value_bytes = ip_addr.pack()
399
        if ip_addr.netmask < 32:
400
            value_bytes += mask_to_bytes(ip_addr.netmask, 32)
401
        return OxmTLV(oxm_field=self.oxm_field,
402
                      oxm_hasmask=ip_addr.netmask < 32,
403
                      oxm_value=value_bytes)
404
405
    @classmethod
406
    def from_of_tlv(cls, tlv):
407
        """Return an instance from a pyof OXM TLV."""
408
        ip_address = IPAddress()
409
        ip_address.unpack(tlv.oxm_value)
410
        addr_str = str(ip_address)
411
        value = addr_str
412
        if tlv.oxm_hasmask:
413
            value = f'{addr_str}/{bytes_to_mask(tlv.oxm_value[4:], 32)}'
414
        return cls(value)
415
416
417 View Code Duplication
class MatchARPTPA(MatchField):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
418
    """Match for ARP Target IP Address."""
419
420
    name = 'arp_tpa'
421
    oxm_field = OxmOfbMatchField.OFPXMT_OFB_ARP_TPA
422
423
    def as_of_tlv(self):
424
        """Return a pyof OXM TLV instance."""
425
        ip_addr = IPAddress(self.value)
426
        value_bytes = ip_addr.pack()
427
        if ip_addr.netmask < 32:
428
            value_bytes += mask_to_bytes(ip_addr.netmask, 32)
429
        return OxmTLV(oxm_field=self.oxm_field,
430
                      oxm_hasmask=ip_addr.netmask < 32,
431
                      oxm_value=value_bytes)
432
433
    @classmethod
434
    def from_of_tlv(cls, tlv):
435
        """Return an instance from a pyof OXM TLV."""
436
        ip_address = IPAddress()
437
        ip_address.unpack(tlv.oxm_value)
438
        addr_str = str(ip_address)
439
        value = addr_str
440
        if tlv.oxm_hasmask:
441
            value = f'{addr_str}/{bytes_to_mask(tlv.oxm_value[4:], 32)}'
442
        return cls(value)
443
444
445 View Code Duplication
class MatchARPSHA(MatchField):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
446
    """Match for ARP Sender MAC Address."""
447
448
    name = 'arp_sha'
449
    oxm_field = OxmOfbMatchField.OFPXMT_OFB_ARP_SHA
450
451
    def as_of_tlv(self):
452
        """Return a pyof OXM TLV instance."""
453
        if '/' in self.value:
454
            value, mask = self.value.split('/')
455
            if mask.upper() == 'FF:FF:FF:FF:FF:FF':
456
                mask = None
457
                oxm_hasmask = False
458
            else:
459
                mask = mask.upper()
460
                oxm_hasmask = True
461
        else:
462
            value = self.value
463
            mask = None
464
            oxm_hasmask = False
465
        value_bytes = HWAddress(value).pack()
466
        if mask:
467
            value_bytes += HWAddress(mask).pack()
468
        return OxmTLV(oxm_field=self.oxm_field,
469
                      oxm_hasmask=oxm_hasmask,
470
                      oxm_value=value_bytes)
471
472
    @classmethod
473
    def from_of_tlv(cls, tlv):
474
        """Return an instance from a pyof OXM TLV."""
475
        hw_address = HWAddress()
476
        hw_address.unpack(tlv.oxm_value)
477
        addr_str = str(hw_address)
478
        value = addr_str
479
        if tlv.oxm_hasmask:
480
            hw_mask = HWAddress()
481
            hw_mask.unpack(tlv.oxm_value[6:])
482
            mask_str = str(hw_mask)
483
            value = f'{addr_str}/{mask_str}'
484
        return cls(value)
485
486
487 View Code Duplication
class MatchARPTHA(MatchField):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
488
    """Match for ARP Target MAC Address."""
489
490
    name = 'arp_tha'
491
    oxm_field = OxmOfbMatchField.OFPXMT_OFB_ARP_THA
492
493
    def as_of_tlv(self):
494
        """Return a pyof OXM TLV instance."""
495
        if '/' in self.value:
496
            value, mask = self.value.split('/')
497
            if mask.upper() == 'FF:FF:FF:FF:FF:FF':
498
                mask = None
499
                oxm_hasmask = False
500
            else:
501
                mask = mask.upper()
502
                oxm_hasmask = True
503
        else:
504
            value = self.value
505
            mask = None
506
            oxm_hasmask = False
507
        value_bytes = HWAddress(value).pack()
508
        if mask:
509
            value_bytes += HWAddress(mask).pack()
510
        return OxmTLV(oxm_field=self.oxm_field,
511
                      oxm_hasmask=oxm_hasmask,
512
                      oxm_value=value_bytes)
513
514
    @classmethod
515
    def from_of_tlv(cls, tlv):
516
        """Return an instance from a pyof OXM TLV."""
517
        hw_address = HWAddress()
518
        hw_address.unpack(tlv.oxm_value)
519
        addr_str = str(hw_address)
520
        value = addr_str
521
        if tlv.oxm_hasmask:
522
            hw_mask = HWAddress()
523
            hw_mask.unpack(tlv.oxm_value[6:])
524
            mask_str = str(hw_mask)
525
            value = f'{addr_str}/{mask_str}'
526
        return cls(value)
527
528
529 View Code Duplication
class MatchIPV6Src(MatchField):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
530
    """Match for IPV6 source."""
531
532
    name = 'ipv6_src'
533
    oxm_field = OxmOfbMatchField.OFPXMT_OFB_IPV6_SRC
534
535
    def as_of_tlv(self):
536
        """Return a pyof OXM TLV instance."""
537
        ip_addr = IPV6Address(self.value)
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable IPV6Address does not seem to be defined.
Loading history...
538
        value_bytes = ip_addr.pack()
539
        if ip_addr.netmask < 128:
540
            value_bytes += mask_to_bytes(ip_addr.netmask, 128)
541
        return OxmTLV(oxm_field=self.oxm_field,
542
                      oxm_hasmask=ip_addr.netmask < 128,
543
                      oxm_value=value_bytes)
544
545
    @classmethod
546
    def from_of_tlv(cls, tlv):
547
        """Return an instance from a pyof OXM TLV."""
548
        ip_address = IPV6Address()
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable IPV6Address does not seem to be defined.
Loading history...
549
        ip_address.unpack(tlv.oxm_value)
550
        addr_str = str(ip_address)
551
        value = addr_str
552
        if tlv.oxm_hasmask:
553
            value = f'{addr_str}/{bytes_to_mask(tlv.oxm_value[16:], 128)}'
554
        return cls(value)
555
556
557 View Code Duplication
class MatchIPV6Dst(MatchField):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
558
    """Match for IPV6 destination."""
559
560
    name = 'ipv6_dst'
561
    oxm_field = OxmOfbMatchField.OFPXMT_OFB_IPV6_DST
562
563
    def as_of_tlv(self):
564
        """Return a pyof OXM TLV instance."""
565
        ip_addr = IPV6Address(self.value)
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable IPV6Address does not seem to be defined.
Loading history...
566
        value_bytes = ip_addr.pack()
567
        if ip_addr.netmask < 128:
568
            value_bytes += mask_to_bytes(ip_addr.netmask, 128)
569
        return OxmTLV(oxm_field=self.oxm_field,
570
                      oxm_hasmask=ip_addr.netmask < 128,
571
                      oxm_value=value_bytes)
572
573
    @classmethod
574
    def from_of_tlv(cls, tlv):
575
        """Return an instance from a pyof OXM TLV."""
576
        ip_address = IPV6Address()
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable IPV6Address does not seem to be defined.
Loading history...
577
        ip_address.unpack(tlv.oxm_value)
578
        addr_str = str(ip_address)
579
        value = addr_str
580
        if tlv.oxm_hasmask:
581
            value = f'{addr_str}/{bytes_to_mask(tlv.oxm_value[16:], 128)}'
582
        return cls(value)
583
584
585
class MatchMPLSLabel(MatchField):
586
    """Match for MPLS Label."""
587
588
    name = 'mpls_lab'
589
    oxm_field = OxmOfbMatchField.OFPXMT_OFB_MPLS_LABEL
590
591
    def as_of_tlv(self):
592
        """Return a pyof OXM TLV instance."""
593
        value_bytes = self.value.to_bytes(4, 'big')
594
        return OxmTLV(oxm_field=self.oxm_field, oxm_value=value_bytes)
595
596
    @classmethod
597
    def from_of_tlv(cls, tlv):
598
        """Return an instance from a pyof OXM TLV."""
599
        lab = int.from_bytes(tlv.oxm_value, 'big')
600
        return cls(lab)
601
602
603
class MatchMPLSTC(MatchField):
604
    """Match for MPLS TC."""
605
606
    name = 'mpls_tc'
607
    oxm_field = OxmOfbMatchField.OFPXMT_OFB_MPLS_TC
608
609
    def as_of_tlv(self):
610
        """Return a pyof OXM TLV instance."""
611
        value_bytes = self.value.to_bytes(1, 'big')
612
        return OxmTLV(oxm_field=self.oxm_field, oxm_value=value_bytes)
613
614
    @classmethod
615
    def from_of_tlv(cls, tlv):
616
        """Return an instance from a pyof OXM TLV."""
617
        tc = int.from_bytes(tlv.oxm_value, 'big')
618
        return cls(tc)
619
620
621
class MatchMPLSBOS(MatchField):
622
    """Match for MPLS BOS."""
623
624
    name = 'mpls_bos'
625
    oxm_field = OxmOfbMatchField.OFPXMT_OFP_MPLS_BOS
626
627
    def as_of_tlv(self):
628
        """Return a pyof OXM TLV instance."""
629
        value_bytes = self.value.to_bytes(1, 'big')
630
        return OxmTLV(oxm_field=self.oxm_field, oxm_value=value_bytes)
631
632
    @classmethod
633
    def from_of_tlv(cls, tlv):
634
        """Return an instance from a pyof OXM TLV."""
635
        bos = int.from_bytes(tlv.oxm_value, 'big')
636
        return cls(bos)
637
638
639 View Code Duplication
class MatchMetadata(MatchField):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
640
    """Match for table metadata."""
641
642
    name = 'metadata'
643
    oxm_field = OxmOfbMatchField.OFPXMT_OFB_METADATA
644
645
    def as_of_tlv(self):
646
        """Return a pyof OXM TLV instance."""
647
        try:
648
            value = int(self.value)
649
            mask = None
650
            oxm_hasmask = False
651
        except ValueError:
652
            value, mask = map(int, self.value.split('/'))
653
            oxm_hasmask = True
654
        value_bytes = value.to_bytes(8, 'big')
655
        if mask:
656
            value_bytes += mask.to_bytes(8, 'big')
657
        return OxmTLV(oxm_field=self.oxm_field,
658
                      oxm_hasmask=oxm_hasmask,
659
                      oxm_value=value_bytes)
660
661
    @classmethod
662
    def from_of_tlv(cls, tlv):
663
        """Return an instance from a pyof OXM TLV."""
664
        value = int.from_bytes(tlv.oxm_value[:8], 'big')
665
        if tlv.oxm_hasmask:
666
            metadata_mask = int.from_bytes(tlv.oxm_value[8:], 'big')
667
            value = f'{value}/{metadata_mask}'
668
        return cls(value)
669
670
671 View Code Duplication
class MatchTunnelID(MatchField):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
672
    """Match for tunnel id."""
673
674
    name = 'tun_id'
675
    oxm_field = OxmOfbMatchField.OFPXMT_OFB_TUNNEL_ID
676
677
    def as_of_tlv(self):
678
        """Return a pyof OXM TLV instance."""
679
        try:
680
            value = int(self.value)
681
            mask = None
682
            oxm_hasmask = False
683
        except ValueError:
684
            value, mask = map(int,self.value.split('/'))
685
            oxm_hasmask = True
686
        value_bytes = value.to_bytes(8, 'big')
687
        if mask:
688
            value_bytes += mask.to_bytes(8, 'big')
689
        return OxmTLV(oxm_field=self.oxm_field,
690
                      oxm_hasmask=oxm_hasmask,
691
                      oxm_value=value_bytes)
692
693
    @classmethod
694
    def from_of_tlv(cls, tlv):
695
        """Return an instance from a pyof OXM TLV."""
696
        value = int.from_bytes(tlv.oxm_value[:8], 'big')
697
        if tlv.oxm_hasmask:
698
            tunnel_mask = int.from_bytes(tlv.oxm_value[8:], 'big')
699
            value = f'{value}/{tunnel_mask}'
700
        return cls(value)
701
702
703
class MatchFieldFactory(ABC):
704
    """Create the correct MatchField subclass instance.
705
706
    As OF 1.3 has many match fields and there are many ways to (un)pack their
707
    OxmTLV.oxm_value, this class does all the work of finding the correct
708
    MatchField class and instantiating the corresponding object.
709
    """
710
711
    __classes = {}
712
713
    @classmethod
714
    def from_name(cls, name, value):
715
        """Return the proper object from name and value."""
716
        field_class = cls._get_class(name)
717
        if field_class:
718
            return field_class(value)
719
        return None
720
721
    @classmethod
722
    def from_of_tlv(cls, tlv):
723
        """Return the proper object from a pyof OXM TLV."""
724
        field_class = cls._get_class(tlv.oxm_field)
725
        if field_class:
726
            return field_class.from_of_tlv(tlv)
727
        return None
728
729
    @classmethod
730
    def _get_class(cls, name_or_field):
731
        """Return the proper object from field name or OxmTLV.oxm_field."""
732
        if not cls.__classes:
733
            cls._index_classes()
734
        return cls.__classes.get(name_or_field)
735
736
    @classmethod
737
    def _index_classes(cls):
738
        for subclass in MatchField.__subclasses__():
739
            cls.__classes[subclass.name] = subclass
740
            cls.__classes[subclass.oxm_field] = subclass
741