Test Failed
Pull Request — master (#89)
by Humberto
02:20
created

build.v0x04.match_fields.MatchField.oxm_field()   A

Complexity

Conditions 1

Size

Total Lines 5
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 3
CRAP Score 1

Importance

Changes 0
Metric Value
cc 1
eloc 4
nop 1
dl 0
loc 5
ccs 3
cts 3
cp 1
crap 1
rs 10
c 0
b 0
f 0
1
"""OpenFlow 1.3 OXM match fields.
2
3
Flow's match is very different from OF 1.0. Instead of always having all
4
fields, there's a variable list of match fields and each one is an Openflow
5
eXtended Match Type-Length-Value (OXM TLV) element.
6
7
This module provides high-level Python classes for OXM TLV fields in order to
8
make the OF 1.3 match fields easy to use and to be coded.
9
"""
10 1
11
from pyof.foundation.basic_types import HWAddress, IPAddress
12 1
from pyof.v0x04.common.flow_match import OxmOfbMatchField, OxmTLV, VlanId
13 1
14
# pylint: disable=unused-import
15 1
from napps.kytos.of_core.v0x04.match_fields_base import (
16
    MatchField, MatchFieldFactory)
17
from napps.kytos.of_core.v0x04.match_fields_ipv6 import (
18 1
    MatchIPv6Src, MatchIPv6Dst, MatchIPv6FLabel, MatchICMPV6Type,
19
    MatchICMPV6Code, MatchNDTarget, MatchNDSLL, MatchNDTLL,
20
    MatchEXTHDR)
21
# pylint: enable=unused-import
22
from napps.kytos.of_core.v0x04.utils import bytes_to_mask, mask_to_bytes
23
24
25
class MatchDLVLAN(MatchField):
26
    """Match for datalink VLAN ID."""
27
28
    name = 'dl_vlan'
29
    oxm_field = OxmOfbMatchField.OFPXMT_OFB_VLAN_VID
30 1
31
    def as_of_tlv(self):
32 1
        """Return a pyof OXM TLV instance."""
33
        try:
34 1
            value = int(self.value)
35 1
            mask = None
36 1
            oxm_hasmask = False
37
        except ValueError:
38
            value, mask = map(int, self.value.split('/'))
39
            oxm_hasmask = True
40
        value = value | VlanId.OFPVID_PRESENT
41
        value_bytes = value.to_bytes(2, 'big')
42
        if mask:
43 1
            mask = mask | VlanId.OFPVID_PRESENT
44 1
            value_bytes += mask.to_bytes(2, 'big')
45 1
        return OxmTLV(oxm_field=self.oxm_field,
46
                      oxm_hasmask=oxm_hasmask,
47
                      oxm_value=value_bytes)
48
49
    @classmethod
50
    def from_of_tlv(cls, tlv):
51
        """Return an instance from a pyof OXM TLV."""
52 1
        vlan_id = int.from_bytes(tlv.oxm_value[:2], 'big') & 4095
53
        value = vlan_id
54
        if tlv.oxm_hasmask:
55
            vlan_mask = int.from_bytes(tlv.oxm_value[2:], 'big') & 4095
56 1
            value = f'{vlan_id}/{vlan_mask}'
57 1
        return cls(value)
58
59
60
class MatchDLVLANPCP(MatchField):
61 1
    """Match for VLAN Priority Code Point."""
62
63
    name = 'dl_vlan_pcp'
64
    oxm_field = OxmOfbMatchField.OFPXMT_OFB_VLAN_PCP
65
66
    def as_of_tlv(self):
67
        """Return a pyof OXM TLV instance."""
68
        value_bytes = self.value.to_bytes(1, 'big')
69
        return OxmTLV(oxm_field=self.oxm_field, oxm_value=value_bytes)
70 1
71
    @classmethod
72
    def from_of_tlv(cls, tlv):
73 1
        """Return an instance from a pyof OXM TLV."""
74 1
        priority = int.from_bytes(tlv.oxm_value, 'big')
75
        return cls(priority)
76 1
77
78 View Code Duplication
class MatchDLSrc(MatchField):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
79
    """Match for datalink source."""
80
81
    name = 'dl_src'
82
    oxm_field = OxmOfbMatchField.OFPXMT_OFB_ETH_SRC
83
84
    def as_of_tlv(self):
85
        """Return a pyof OXM TLV instance."""
86
        if '/' in self.value:
87
            value, mask = self.value.split('/')
88
            mask = mask.upper()
89
            if mask == 'FF:FF:FF:FF:FF:FF':
90
                mask = None
91
                oxm_hasmask = False
92
            else:
93
                oxm_hasmask = True
94 1
        else:
95
            value = self.value
96
            mask = None
97
            oxm_hasmask = False
98
        value_bytes = HWAddress(value).pack()
99
        if mask:
100
            value_bytes += HWAddress(mask).pack()
101
        return OxmTLV(oxm_field=self.oxm_field,
102
                      oxm_hasmask=oxm_hasmask,
103
                      oxm_value=value_bytes)
104
105 1
    @classmethod
106
    def from_of_tlv(cls, tlv):
107
        """Return an instance from a pyof OXM TLV."""
108 1
        hw_address = HWAddress()
109 1
        hw_address.unpack(tlv.oxm_value)
110
        addr_str = str(hw_address)
111 1
        value = addr_str
112
        if tlv.oxm_hasmask:
113
            hw_mask = HWAddress()
114
            hw_mask.unpack(tlv.oxm_value[6:])
115
            mask_str = str(hw_mask)
116 1
            value = f'{addr_str}/{mask_str}'
117
        return cls(value)
118
119
120 View Code Duplication
class MatchDLDst(MatchField):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
121
    """Match for datalink destination."""
122
123 1
    name = 'dl_dst'
124
    oxm_field = OxmOfbMatchField.OFPXMT_OFB_ETH_DST
125
126 1
    def as_of_tlv(self):
127 1
        """Return a pyof OXM TLV instance."""
128
        if '/' in self.value:
129 1
            value, mask = self.value.split('/')
130
            mask = mask.upper()
131 1
            if mask == 'FF:FF:FF:FF:FF:FF':
132
                mask = None
133
                oxm_hasmask = False
134
            else:
135
                oxm_hasmask = True
136
        else:
137
            value = self.value
138
            mask = None
139
            oxm_hasmask = False
140 1
        value_bytes = HWAddress(value).pack()
141 1
        if mask:
142 1
            value_bytes += HWAddress(mask).pack()
143 1
        return OxmTLV(oxm_field=self.oxm_field,
144 1
                      oxm_hasmask=oxm_hasmask,
145
                      oxm_value=value_bytes)
146 1
147
    @classmethod
148
    def from_of_tlv(cls, tlv):
149
        """Return an instance from a pyof OXM TLV."""
150 1
        hw_address = HWAddress()
151
        hw_address.unpack(tlv.oxm_value)
152
        addr_str = str(hw_address)
153
        value = addr_str
154
        if tlv.oxm_hasmask:
155
            hw_mask = HWAddress()
156
            hw_mask.unpack(tlv.oxm_value[6:])
157
            mask_str = str(hw_mask)
158
            value = f'{addr_str}/{mask_str}'
159
        return cls(value)
160
161
162
class MatchDLType(MatchField):
163
    """Match for datalink type."""
164
165 1
    name = 'dl_type'
166
    oxm_field = OxmOfbMatchField.OFPXMT_OFB_ETH_TYPE
167
168 1
    def as_of_tlv(self):
169 1
        """Return a pyof OXM TLV instance."""
170
        value_bytes = self.value.to_bytes(2, 'big')
171 1
        return OxmTLV(oxm_field=self.oxm_field, oxm_value=value_bytes)
172
173
    @classmethod
174
    def from_of_tlv(cls, tlv):
175
        """Return an instance from a pyof OXM TLV."""
176
        port = int.from_bytes(tlv.oxm_value, 'big')
177
        return cls(port)
178
179
180 View Code Duplication
class MatchNwSrc(MatchField):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
181
    """Match for IPV4 source."""
182
183
    name = 'nw_src'
184
    oxm_field = OxmOfbMatchField.OFPXMT_OFB_IPV4_SRC
185
186
    def as_of_tlv(self):
187
        """Return a pyof OXM TLV instance."""
188
        ip_addr = IPAddress(self.value)
189
        value_bytes = ip_addr.pack()
190
        if ip_addr.netmask < 32:
191
            value_bytes += mask_to_bytes(ip_addr.netmask, 32)
192 1
        return OxmTLV(oxm_field=self.oxm_field,
193
                      oxm_hasmask=ip_addr.netmask < 32,
194
                      oxm_value=value_bytes)
195
196
    @classmethod
197
    def from_of_tlv(cls, tlv):
198
        """Return an instance from a pyof OXM TLV."""
199
        ip_address = IPAddress()
200
        ip_address.unpack(tlv.oxm_value)
201
        addr_str = str(ip_address)
202
        value = addr_str
203
        if tlv.oxm_hasmask:
204
            value = f'{addr_str}/{bytes_to_mask(tlv.oxm_value[4:], 32)}'
205
        return cls(value)
206
207 1
208 View Code Duplication
class MatchNwDst(MatchField):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
209
    """Match for IPV4 destination."""
210 1
211 1
    name = 'nw_dst'
212
    oxm_field = OxmOfbMatchField.OFPXMT_OFB_IPV4_DST
213 1
214
    def as_of_tlv(self):
215
        """Return a pyof OXM TLV instance."""
216
        ip_addr = IPAddress(self.value)
217
        value_bytes = ip_addr.pack()
218 1
        if ip_addr.netmask < 32:
219
            value_bytes += mask_to_bytes(ip_addr.netmask, 32)
220
        return OxmTLV(oxm_field=self.oxm_field,
221
                      oxm_hasmask=ip_addr.netmask < 32,
222
                      oxm_value=value_bytes)
223
224
    @classmethod
225 1
    def from_of_tlv(cls, tlv):
226
        """Return an instance from a pyof OXM TLV."""
227
        ip_address = IPAddress()
228 1
        ip_address.unpack(tlv.oxm_value)
229 1
        addr_str = str(ip_address)
230
        value = addr_str
231 1
        if tlv.oxm_hasmask:
232
            value = f'{addr_str}/{bytes_to_mask(tlv.oxm_value[4:], 32)}'
233
        return cls(value)
234
235
236
class MatchNwProto(MatchField):
237
    """Match for IP protocol."""
238
239
    name = 'nw_proto'
240
    oxm_field = OxmOfbMatchField.OFPXMT_OFB_IP_PROTO
241 1
242
    def as_of_tlv(self):
243
        """Return a pyof OXM TLV instance."""
244
        value_bytes = self.value.to_bytes(1, 'big')
245
        return OxmTLV(oxm_field=self.oxm_field, oxm_value=value_bytes)
246
247
    @classmethod
248
    def from_of_tlv(cls, tlv):
249
        """Return an instance from a pyof OXM TLV."""
250
        priority = int.from_bytes(tlv.oxm_value, 'big')
251
        return cls(priority)
252
253 1
254
class MatchInPort(MatchField):
255
    """Match for input port."""
256 1
257 1
    name = 'in_port'
258
    oxm_field = OxmOfbMatchField.OFPXMT_OFB_IN_PORT
259 1
260
    def as_of_tlv(self):
261
        """Return a pyof OXM TLV instance."""
262
        value_bytes = self.value.to_bytes(4, 'big')
263
        return OxmTLV(oxm_field=self.oxm_field, oxm_value=value_bytes)
264
265
    @classmethod
266
    def from_of_tlv(cls, tlv):
267
        """Return an instance from a pyof OXM TLV."""
268
        port = int.from_bytes(tlv.oxm_value, 'big')
269 1
        return cls(port)
270
271
272
class MatchTCPSrc(MatchField):
273
    """Match for TCP source."""
274
275
    name = 'tp_src'
276
    oxm_field = OxmOfbMatchField.OFPXMT_OFB_TCP_SRC
277
278
    def as_of_tlv(self):
279
        """Return a pyof OXM TLV instance."""
280
        value_bytes = self.value.to_bytes(2, 'big')
281 1
        return OxmTLV(oxm_field=self.oxm_field, oxm_value=value_bytes)
282
283
    @classmethod
284 1
    def from_of_tlv(cls, tlv):
285 1
        """Return an instance from a pyof OXM TLV."""
286
        port = int.from_bytes(tlv.oxm_value, 'big')
287 1
        return cls(port)
288
289
290
class MatchTCPDst(MatchField):
291
    """Match for TCP destination."""
292 1
293
    name = 'tp_dst'
294
    oxm_field = OxmOfbMatchField.OFPXMT_OFB_TCP_DST
295
296
    def as_of_tlv(self):
297
        """Return a pyof OXM TLV instance."""
298
        value_bytes = self.value.to_bytes(2, 'big')
299 1
        return OxmTLV(oxm_field=self.oxm_field, oxm_value=value_bytes)
300
301
    @classmethod
302 1
    def from_of_tlv(cls, tlv):
303 1
        """Return an instance from a pyof OXM TLV."""
304
        port = int.from_bytes(tlv.oxm_value, 'big')
305 1
        return cls(port)
306
307
308
class MatchInPhyPort(MatchField):
309
    """Match for physical input port."""
310 1
311
    name = 'in_phy_port'
312
    oxm_field = OxmOfbMatchField.OFPXMT_OFB_IN_PHY_PORT
313
314
    def as_of_tlv(self):
315
        """Return a pyof OXM TLV instance."""
316
        value_bytes = self.value.to_bytes(4, 'big')
317 1
        return OxmTLV(oxm_field=self.oxm_field, oxm_value=value_bytes)
318
319
    @classmethod
320 1
    def from_of_tlv(cls, tlv):
321 1
        """Return an instance from a pyof OXM TLV."""
322
        port = int.from_bytes(tlv.oxm_value, 'big')
323 1
        return cls(port)
324
325
326
class MatchIPDSCP(MatchField):
327
    """Match for IP DSCP."""
328 1
329
    name = 'ip_dscp'
330
    oxm_field = OxmOfbMatchField.OFPXMT_OFB_IP_DSCP
331
332
    def as_of_tlv(self):
333
        """Return a pyof OXM TLV instance."""
334
        value_bytes = self.value.to_bytes(1, 'big')
335 1
        return OxmTLV(oxm_field=self.oxm_field, oxm_value=value_bytes)
336
337
    @classmethod
338 1
    def from_of_tlv(cls, tlv):
339 1
        """Return an instance from a pyof OXM TLV."""
340
        value = int.from_bytes(tlv.oxm_value, 'big')
341 1
        return cls(value)
342
343
344
class MatchIPECN(MatchField):
345
    """Match for IP ECN."""
346 1
347
    name = 'ip_ecn'
348
    oxm_field = OxmOfbMatchField.OFPXMT_OFB_IP_ECN
349
350
    def as_of_tlv(self):
351
        """Return a pyof OXM TLV instance."""
352
        value_bytes = self.value.to_bytes(1, 'big')
353 1
        return OxmTLV(oxm_field=self.oxm_field, oxm_value=value_bytes)
354
355
    @classmethod
356 1
    def from_of_tlv(cls, tlv):
357 1
        """Return an instance from a pyof OXM TLV."""
358
        value = int.from_bytes(tlv.oxm_value, 'big')
359 1
        return cls(value)
360
361
362
class MatchUDPSrc(MatchField):
363
    """Match for UDP source."""
364 1
365
    name = 'udp_src'
366
    oxm_field = OxmOfbMatchField.OFPXMT_OFB_UDP_SRC
367
368
    def as_of_tlv(self):
369
        """Return a pyof OXM TLV instance."""
370
        value_bytes = self.value.to_bytes(2, 'big')
371 1
        return OxmTLV(oxm_field=self.oxm_field, oxm_value=value_bytes)
372
373
    @classmethod
374 1
    def from_of_tlv(cls, tlv):
375 1
        """Return an instance from a pyof OXM TLV."""
376
        port = int.from_bytes(tlv.oxm_value, 'big')
377 1
        return cls(port)
378
379
380
class MatchUDPDst(MatchField):
381
    """Match for UDP destination."""
382 1
383
    name = 'udp_dst'
384
    oxm_field = OxmOfbMatchField.OFPXMT_OFB_UDP_DST
385
386
    def as_of_tlv(self):
387
        """Return a pyof OXM TLV instance."""
388
        value_bytes = self.value.to_bytes(2, 'big')
389 1
        return OxmTLV(oxm_field=self.oxm_field, oxm_value=value_bytes)
390
391
    @classmethod
392 1
    def from_of_tlv(cls, tlv):
393 1
        """Return an instance from a pyof OXM TLV."""
394
        port = int.from_bytes(tlv.oxm_value, 'big')
395 1
        return cls(port)
396
397
398
class MatchSCTPSrc(MatchField):
399
    """Match for SCTP source."""
400 1
401
    name = 'sctp_src'
402
    oxm_field = OxmOfbMatchField.OFPXMT_OFB_SCTP_SRC
403
404
    def as_of_tlv(self):
405
        """Return a pyof OXM TLV instance."""
406
        value_bytes = self.value.to_bytes(2, 'big')
407 1
        return OxmTLV(oxm_field=self.oxm_field, oxm_value=value_bytes)
408
409
    @classmethod
410 1
    def from_of_tlv(cls, tlv):
411 1
        """Return an instance from a pyof OXM TLV."""
412
        value = int.from_bytes(tlv.oxm_value, 'big')
413 1
        return cls(value)
414
415
416
class MatchSCTPDst(MatchField):
417
    """Match for SCTP destination."""
418 1
419
    name = 'sctp_dst'
420
    oxm_field = OxmOfbMatchField.OFPXMT_OFB_SCTP_DST
421
422
    def as_of_tlv(self):
423
        """Return a pyof OXM TLV instance."""
424
        value_bytes = self.value.to_bytes(2, 'big')
425 1
        return OxmTLV(oxm_field=self.oxm_field, oxm_value=value_bytes)
426
427
    @classmethod
428 1
    def from_of_tlv(cls, tlv):
429 1
        """Return an instance from a pyof OXM TLV."""
430
        value = int.from_bytes(tlv.oxm_value, 'big')
431 1
        return cls(value)
432
433
434
class MatchICMPV4Type(MatchField):
435
    """Match for ICMPV4 type."""
436 1
437
    name = 'icmpv4_type'
438
    oxm_field = OxmOfbMatchField.OFPXMT_OFB_ICMPV4_TYPE
439
440
    def as_of_tlv(self):
441
        """Return a pyof OXM TLV instance."""
442
        value_bytes = self.value.to_bytes(1, 'big')
443 1
        return OxmTLV(oxm_field=self.oxm_field, oxm_value=value_bytes)
444
445
    @classmethod
446 1
    def from_of_tlv(cls, tlv):
447 1
        """Return an instance from a pyof OXM TLV."""
448
        port = int.from_bytes(tlv.oxm_value, 'big')
449 1
        return cls(port)
450
451
452
class MatchICMPV4Code(MatchField):
453
    """Match for ICMPV4 code."""
454 1
455
    name = 'icmpv4_code'
456
    oxm_field = OxmOfbMatchField.OFPXMT_OFB_ICMPV4_CODE
457
458
    def as_of_tlv(self):
459
        """Return a pyof OXM TLV instance."""
460
        value_bytes = self.value.to_bytes(1, 'big')
461 1
        return OxmTLV(oxm_field=self.oxm_field, oxm_value=value_bytes)
462
463
    @classmethod
464 1
    def from_of_tlv(cls, tlv):
465 1
        """Return an instance from a pyof OXM TLV."""
466
        priority = int.from_bytes(tlv.oxm_value, 'big')
467 1
        return cls(priority)
468
469
470
class MatchARPOP(MatchField):
471
    """Match for ARP opcode."""
472 1
473
    name = 'arp_op'
474
    oxm_field = OxmOfbMatchField.OFPXMT_OFB_ARP_OP
475
476
    def as_of_tlv(self):
477
        """Return a pyof OXM TLV instance."""
478
        value_bytes = self.value.to_bytes(2, 'big')
479 1
        return OxmTLV(oxm_field=self.oxm_field, oxm_value=value_bytes)
480
481
    @classmethod
482 1
    def from_of_tlv(cls, tlv):
483 1
        """Return an instance from a pyof OXM TLV."""
484
        opcode = int.from_bytes(tlv.oxm_value, 'big')
485 1
        return cls(opcode)
486
487
488 View Code Duplication
class MatchARPSPA(MatchField):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
489
    """Match for ARP Sender IP Address."""
490 1
491
    name = 'arp_spa'
492
    oxm_field = OxmOfbMatchField.OFPXMT_OFB_ARP_SPA
493
494
    def as_of_tlv(self):
495
        """Return a pyof OXM TLV instance."""
496
        ip_addr = IPAddress(self.value)
497 1
        value_bytes = ip_addr.pack()
498
        if ip_addr.netmask < 32:
499
            value_bytes += mask_to_bytes(ip_addr.netmask, 32)
500 1
        return OxmTLV(oxm_field=self.oxm_field,
501 1
                      oxm_hasmask=ip_addr.netmask < 32,
502
                      oxm_value=value_bytes)
503 1
504
    @classmethod
505
    def from_of_tlv(cls, tlv):
506
        """Return an instance from a pyof OXM TLV."""
507
        ip_address = IPAddress()
508
        ip_address.unpack(tlv.oxm_value)
509
        addr_str = str(ip_address)
510
        value = addr_str
511
        if tlv.oxm_hasmask:
512
            value = f'{addr_str}/{bytes_to_mask(tlv.oxm_value[4:], 32)}'
513
        return cls(value)
514
515
516 View Code Duplication
class MatchARPTPA(MatchField):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
517
    """Match for ARP Target IP Address."""
518
519 1
    name = 'arp_tpa'
520
    oxm_field = OxmOfbMatchField.OFPXMT_OFB_ARP_TPA
521
522
    def as_of_tlv(self):
523
        """Return a pyof OXM TLV instance."""
524
        ip_addr = IPAddress(self.value)
525
        value_bytes = ip_addr.pack()
526
        if ip_addr.netmask < 32:
527
            value_bytes += mask_to_bytes(ip_addr.netmask, 32)
528
        return OxmTLV(oxm_field=self.oxm_field,
529 1
                      oxm_hasmask=ip_addr.netmask < 32,
530
                      oxm_value=value_bytes)
531
532 1
    @classmethod
533 1
    def from_of_tlv(cls, tlv):
534
        """Return an instance from a pyof OXM TLV."""
535 1
        ip_address = IPAddress()
536
        ip_address.unpack(tlv.oxm_value)
537
        addr_str = str(ip_address)
538
        value = addr_str
539
        if tlv.oxm_hasmask:
540 1
            value = f'{addr_str}/{bytes_to_mask(tlv.oxm_value[4:], 32)}'
541
        return cls(value)
542
543
544 View Code Duplication
class MatchARPSHA(MatchField):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
545
    """Match for ARP Sender MAC Address."""
546
547 1
    name = 'arp_sha'
548
    oxm_field = OxmOfbMatchField.OFPXMT_OFB_ARP_SHA
549
550 1
    def as_of_tlv(self):
551 1
        """Return a pyof OXM TLV instance."""
552
        if '/' in self.value:
553 1
            value, mask = self.value.split('/')
554
            mask = mask.upper()
555
            if mask == 'FF:FF:FF:FF:FF:FF':
556
                mask = None
557
                oxm_hasmask = False
558 1
            else:
559
                oxm_hasmask = True
560
        else:
561
            value = self.value
562
            mask = None
563
            oxm_hasmask = False
564
        value_bytes = HWAddress(value).pack()
565 1
        if mask:
566
            value_bytes += HWAddress(mask).pack()
567
        return OxmTLV(oxm_field=self.oxm_field,
568 1
                      oxm_hasmask=oxm_hasmask,
569 1
                      oxm_value=value_bytes)
570
571 1
    @classmethod
572
    def from_of_tlv(cls, tlv):
573
        """Return an instance from a pyof OXM TLV."""
574
        hw_address = HWAddress()
575
        hw_address.unpack(tlv.oxm_value)
576 1
        addr_str = str(hw_address)
577
        value = addr_str
578
        if tlv.oxm_hasmask:
579
            hw_mask = HWAddress()
580
            hw_mask.unpack(tlv.oxm_value[6:])
581
            mask_str = str(hw_mask)
582
            value = f'{addr_str}/{mask_str}'
583 1
        return cls(value)
584
585
586 1 View Code Duplication
class MatchARPTHA(MatchField):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
587 1
    """Match for ARP Target MAC Address."""
588
589 1
    name = 'arp_tha'
590
    oxm_field = OxmOfbMatchField.OFPXMT_OFB_ARP_THA
591
592
    def as_of_tlv(self):
593
        """Return a pyof OXM TLV instance."""
594 1
        if '/' in self.value:
595
            value, mask = self.value.split('/')
596
            mask = mask.upper()
597
            if mask == 'FF:FF:FF:FF:FF:FF':
598
                mask = None
599
                oxm_hasmask = False
600
            else:
601 1
                oxm_hasmask = True
602
        else:
603
            value = self.value
604 1
            mask = None
605 1
            oxm_hasmask = False
606
        value_bytes = HWAddress(value).pack()
607 1
        if mask:
608
            value_bytes += HWAddress(mask).pack()
609
        return OxmTLV(oxm_field=self.oxm_field,
610
                      oxm_hasmask=oxm_hasmask,
611
                      oxm_value=value_bytes)
612 1
613
    @classmethod
614
    def from_of_tlv(cls, tlv):
615
        """Return an instance from a pyof OXM TLV."""
616
        hw_address = HWAddress()
617
        hw_address.unpack(tlv.oxm_value)
618
        addr_str = str(hw_address)
619 1
        value = addr_str
620
        if tlv.oxm_hasmask:
621
            hw_mask = HWAddress()
622 1
            hw_mask.unpack(tlv.oxm_value[6:])
623 1
            mask_str = str(hw_mask)
624
            value = f'{addr_str}/{mask_str}'
625 1
        return cls(value)
626
627
628
class MatchMPLSLabel(MatchField):
629
    """Match for MPLS Label."""
630
631
    name = 'mpls_lab'
632
    oxm_field = OxmOfbMatchField.OFPXMT_OFB_MPLS_LABEL
633
634
    def as_of_tlv(self):
635
        """Return a pyof OXM TLV instance."""
636
        value_bytes = self.value.to_bytes(4, 'big')
637
        return OxmTLV(oxm_field=self.oxm_field, oxm_value=value_bytes)
638
639
    @classmethod
640
    def from_of_tlv(cls, tlv):
641 1
        """Return an instance from a pyof OXM TLV."""
642
        lab = int.from_bytes(tlv.oxm_value, 'big')
643
        return cls(lab)
644
645
646
class MatchMPLSTC(MatchField):
647
    """Match for MPLS TC."""
648
649
    name = 'mpls_tc'
650
    oxm_field = OxmOfbMatchField.OFPXMT_OFB_MPLS_TC
651 1
652
    def as_of_tlv(self):
653
        """Return a pyof OXM TLV instance."""
654 1
        value_bytes = self.value.to_bytes(1, 'big')
655 1
        return OxmTLV(oxm_field=self.oxm_field, oxm_value=value_bytes)
656
657 1
    @classmethod
658
    def from_of_tlv(cls, tlv):
659
        """Return an instance from a pyof OXM TLV."""
660
        value = int.from_bytes(tlv.oxm_value, 'big')
661
        return cls(value)
662
663
664
class MatchMPLSBOS(MatchField):
665
    """Match for MPLS BOS."""
666
667
    name = 'mpls_bos'
668
    oxm_field = OxmOfbMatchField.OFPXMT_OFP_MPLS_BOS
669
670
    def as_of_tlv(self):
671
        """Return a pyof OXM TLV instance."""
672
        value_bytes = self.value.to_bytes(1, 'big')
673 1
        return OxmTLV(oxm_field=self.oxm_field, oxm_value=value_bytes)
674
675
    @classmethod
676
    def from_of_tlv(cls, tlv):
677
        """Return an instance from a pyof OXM TLV."""
678
        bos = int.from_bytes(tlv.oxm_value, 'big')
679
        return cls(bos)
680
681
682 View Code Duplication
class MatchPBBISID(MatchField):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
683 1
    """Match for PBB ISID."""
684
685
    name = 'pbb_isid'
686
    oxm_field = OxmOfbMatchField.OFPXMT_OFB_PBB_ISID
687
688
    def as_of_tlv(self):
689
        """Return a pyof OXM TLV instance."""
690
        try:
691 1
            value = int(self.value)
692
            mask = None
693 1
            oxm_hasmask = False
694
        except ValueError:
695
            value, mask = map(int, self.value.split('/'))
696 1
            oxm_hasmask = True
697 1
        value_bytes = value.to_bytes(3, 'big')
698 1
        if mask:
699
            value_bytes += mask.to_bytes(3, 'big')
700
        return OxmTLV(oxm_field=self.oxm_field,
701 1
                      oxm_hasmask=oxm_hasmask,
702
                      oxm_value=value_bytes)
703
704
    @classmethod
705
    def from_of_tlv(cls, tlv):
706
        """Return an instance from a pyof OXM TLV."""
707
        value = int.from_bytes(tlv.oxm_value[:3], 'big')
708
        if tlv.oxm_hasmask:
709 1
            pbb_isid_mask = int.from_bytes(tlv.oxm_value[3:], 'big')
710
            value = f'{value}/{pbb_isid_mask}'
711
        return cls(value)
712 1
713 1
714 1
715
716 1 View Code Duplication
class MatchMetadata(MatchField):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
717
    """Match for table metadata."""
718 1
719 1
    name = 'metadata'
720 1
    oxm_field = OxmOfbMatchField.OFPXMT_OFB_METADATA
721
722
    def as_of_tlv(self):
723
        """Return a pyof OXM TLV instance."""
724
        try:
725
            value = int(self.value)
726
            mask = None
727
            oxm_hasmask = False
728
        except ValueError:
729
            value, mask = map(int, self.value.split('/'))
730
            oxm_hasmask = True
731
        value_bytes = value.to_bytes(8, 'big')
732
        if mask:
733
            value_bytes += mask.to_bytes(8, 'big')
734
        return OxmTLV(oxm_field=self.oxm_field,
735
                      oxm_hasmask=oxm_hasmask,
736
                      oxm_value=value_bytes)
737
738
    @classmethod
739
    def from_of_tlv(cls, tlv):
740
        """Return an instance from a pyof OXM TLV."""
741
        value = int.from_bytes(tlv.oxm_value[:8], 'big')
742
        if tlv.oxm_hasmask:
743
            metadata_mask = int.from_bytes(tlv.oxm_value[8:], 'big')
744
            value = f'{value}/{metadata_mask}'
745
        return cls(value)
746
747
748 View Code Duplication
class MatchTUNNELID(MatchField):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
749
    """Match for tunnel id."""
750
751
    name = 'tun_id'
752
    oxm_field = OxmOfbMatchField.OFPXMT_OFB_TUNNEL_ID
753
754
    def as_of_tlv(self):
755
        """Return a pyof OXM TLV instance."""
756
        try:
757
            value = int(self.value)
758
            mask = None
759
            oxm_hasmask = False
760
        except ValueError:
761
            value, mask = map(int, self.value.split('/'))
762
            oxm_hasmask = True
763
        value_bytes = value.to_bytes(8, 'big')
764
        if mask:
765
            value_bytes += mask.to_bytes(8, 'big')
766
        return OxmTLV(oxm_field=self.oxm_field,
767
                      oxm_hasmask=oxm_hasmask,
768
                      oxm_value=value_bytes)
769
770
    @classmethod
771
    def from_of_tlv(cls, tlv):
772
        """Return an instance from a pyof OXM TLV."""
773
        value = int.from_bytes(tlv.oxm_value[:8], 'big')
774
        if tlv.oxm_hasmask:
775
            tunnel_mask = int.from_bytes(tlv.oxm_value[8:], 'big')
776
            value = f'{value}/{tunnel_mask}'
777
        return cls(value)
778