Passed
Pull Request — master (#82)
by
unknown
03:10
created

build.v0x04.match_fields   C

Complexity

Total Complexity 56

Size/Duplication

Total Lines 411
Duplicated Lines 34.31 %

Test Coverage

Coverage 35.14%

Importance

Changes 0
Metric Value
wmc 56
eloc 252
dl 141
loc 411
ccs 78
cts 222
cp 0.3514
rs 5.5199
c 0
b 0
f 0

32 Methods

Rating   Name   Duplication   Size   Complexity  
A MatchDLVLANPCP.as_of_tlv() 0 4 1
A MatchFieldFactory._index_classes() 0 5 2
A MatchField.oxm_field() 0 5 1
A MatchField.__init__() 0 3 1
A MatchTCPDst.as_of_tlv() 0 4 1
A MatchTCPSrc.from_of_tlv() 0 5 1
A MatchField.from_of_tlv() 0 4 1
A MatchFieldFactory.from_name() 0 7 2
A MatchField.__eq__() 0 7 1
A MatchInPort.from_of_tlv() 0 5 1
A MatchTCPSrc.as_of_tlv() 0 4 1
A MatchField.as_of_tlv() 0 3 1
A MatchNwDst.from_of_tlv() 9 9 2
A MatchFieldFactory.from_of_tlv() 0 7 2
A MatchDLVLAN.from_of_tlv() 0 9 2
A MatchDLSrc.from_of_tlv() 12 13 2
A MatchDLDst.as_of_tlv() 20 20 4
A MatchDLDst.from_of_tlv() 12 13 2
A MatchNwDst.as_of_tlv() 9 9 2
A MatchDLVLAN.as_of_tlv() 0 17 3
A MatchNwProto.as_of_tlv() 0 4 1
A MatchDLVLANPCP.from_of_tlv() 0 5 1
A MatchTCPDst.from_of_tlv() 0 5 1
A MatchNwSrc.from_of_tlv() 9 9 2
A MatchDLType.from_of_tlv() 0 5 1
A MatchInPort.as_of_tlv() 0 4 1
A MatchField.name() 0 5 1
A MatchFieldFactory._get_class() 0 6 2
A MatchNwProto.from_of_tlv() 0 5 1
A MatchDLType.as_of_tlv() 0 4 1
A MatchNwSrc.as_of_tlv() 9 9 2
A MatchDLSrc.as_of_tlv() 20 20 4

2 Functions

Rating   Name   Duplication   Size   Complexity  
A mask_to_bytes() 0 7 2
A bytes_to_mask() 13 13 3

How to fix   Duplicated Code    Complexity   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

Complexity

 Tip:   Before tackling complexity, make sure that you eliminate any duplication first. This often can reduce the size of classes significantly.

Complex classes like build.v0x04.match_fields often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""OpenFlow 1.3 OXM match fields.
2
3
Flow's match is very different from OF 1.0. Instead of always having all
4
fields, there's a variable list of match fields and each one is an Openflow
5
eXtended Match Type-Length-Value (OXM TLV) element.
6
7
This module provides high-level Python classes for OXM TLV fields in order to
8
make the OF 1.3 match fields easy to use and to be coded.
9
"""
10 1
from abc import ABC, abstractmethod
11
12 1
from pyof.foundation.basic_types import HWAddress, IPAddress
13 1
from pyof.v0x04.common.flow_match import OxmOfbMatchField, OxmTLV, VlanId
14
15
16 1
def mask_to_bytes(mask, size):
17
    """Return the mask in bytes."""
18
    bits = 0
19
    for i in range(size-mask, size):
20
        bits |= (1 << i)
21
    tobytes = bits.to_bytes(size//8, 'big')
22
    return tobytes
23
24
25 1 View Code Duplication
def bytes_to_mask(tobytes, size):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
26
    """Return the mask in string."""
27
    int_mask = int.from_bytes(tobytes, 'big')
28
    bits = bin(int_mask)
29
    strbits = str(bits)
30
    strbits = strbits[2:]
31
    netmask = 0
32
    for i in range(size):
33
        if strbits[i] == '1':
34
            netmask += 1
35
        else:
36
            break
37
    return netmask
38
39
40 1
class MatchField(ABC):
41
    """Base class for match fields. Abstract OXM TLVs of python-openflow.
42
43
    Just extend this class and you will be forced to define the required
44
    low-level attributes and methods below:
45
46
    * "name" attribute (field name to be displayed in JSON);
47
    * "oxm_field" attribute (``OxmOfbMatchField`` enum);
48
    * Method to return a pyof OxmTLV;
49
    * Method to create an instance from an OxmTLV.
50
    """
51
52 1
    def __init__(self, value):
53
        """Define match field value."""
54
        self.value = value
55
56 1
    @property
57 1
    @classmethod
58 1
    @abstractmethod
59
    def name(cls):
60
        """Define a name to be displayed in JSON.
61
62
        It can be overriden just by a class attibute.
63
        """
64
65 1
    @property
66 1
    @classmethod
67 1
    @abstractmethod
68
    def oxm_field(cls):
69
        """Define this subclass ``OxmOfbMatchField`` value.
70
71
        It can be overriden just by as a class attibute.
72
        """
73
74 1
    @abstractmethod
75
    def as_of_tlv(self):
76
        """Return a pyof OXM TLV instance."""
77
78 1
    @classmethod
79 1
    @abstractmethod
80
    def from_of_tlv(cls, tlv):
81
        """Return an instance from a pyof OXM TLV."""
82
83 1
    def __eq__(self, other):
84
        """Two objects are equal if their values are the same.
85
86
        The oxm_field equality is checked indirectly when comparing whether
87
        the objects are instances of the same class.
88
        """
89
        return isinstance(other, self.__class__) and other.value == self.value
90
91
92 1
class MatchDLVLAN(MatchField):
93
    """Match for datalink VLAN ID."""
94
95 1
    name = 'dl_vlan'
96 1
    oxm_field = OxmOfbMatchField.OFPXMT_OFB_VLAN_VID
97
98 1
    def as_of_tlv(self):
99
        """Return a pyof OXM TLV instance."""
100
        try:
101
            value = int(self.value)
102
            mask = None
103
            oxm_hasmask = False
104
        except ValueError:
105
            value, mask = map(int, self.value.split('/'))
106
            oxm_hasmask = True
107
        value = value | VlanId.OFPVID_PRESENT
108
        value_bytes = value.to_bytes(2, 'big')
109
        if mask:
110
            mask = mask | VlanId.OFPVID_PRESENT
111
            value_bytes += mask.to_bytes(2, 'big')
112
        return OxmTLV(oxm_field=self.oxm_field,
113
                      oxm_hasmask=oxm_hasmask,
114
                      oxm_value=value_bytes)
115
116 1
    @classmethod
117
    def from_of_tlv(cls, tlv):
118
        """Return an instance from a pyof OXM TLV."""
119
        vlan_id = int.from_bytes(tlv.oxm_value[:2], 'big') & 4095
120
        value = vlan_id
121
        if tlv.oxm_hasmask:
122
            vlan_mask = int.from_bytes(tlv.oxm_value[2:], 'big') & 4095
123
            value = f'{vlan_id}/{vlan_mask}'
124
        return cls(value)
125
126
127 1
class MatchDLVLANPCP(MatchField):
128
    """Match for VLAN Priority Code Point."""
129
130 1
    name = 'dl_vlan_pcp'
131 1
    oxm_field = OxmOfbMatchField.OFPXMT_OFB_VLAN_PCP
132
133 1
    def as_of_tlv(self):
134
        """Return a pyof OXM TLV instance."""
135
        value_bytes = self.value.to_bytes(1, 'big')
136
        return OxmTLV(oxm_field=self.oxm_field, oxm_value=value_bytes)
137
138 1
    @classmethod
139
    def from_of_tlv(cls, tlv):
140
        """Return an instance from a pyof OXM TLV."""
141
        priority = int.from_bytes(tlv.oxm_value, 'big')
142
        return cls(priority)
143
144
145 1 View Code Duplication
class MatchDLSrc(MatchField):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
146
    """Match for datalink source."""
147
148 1
    name = 'dl_src'
149 1
    oxm_field = OxmOfbMatchField.OFPXMT_OFB_ETH_SRC
150
151 1
    def as_of_tlv(self):
152
        """Return a pyof OXM TLV instance."""
153
        if '/' in self.value:
154
            value, mask = self.value.split('/')
155
            if mask.upper() == 'FF:FF:FF:FF:FF:FF':
156
                mask = None
157
                oxm_hasmask = False
158
            else:
159
                mask = mask.upper()
160
                oxm_hasmask = True
161
        else:
162
            value = self.value
163
            mask = None
164
            oxm_hasmask = False
165
        value_bytes = HWAddress(value).pack()
166
        if mask:
167
            value_bytes += HWAddress(mask).pack()
168
        return OxmTLV(oxm_field=self.oxm_field,
169
                      oxm_hasmask=oxm_hasmask,
170
                      oxm_value=value_bytes)
171
172 1
    @classmethod
173
    def from_of_tlv(cls, tlv):
174
        """Return an instance from a pyof OXM TLV."""
175
        hw_address = HWAddress()
176
        hw_address.unpack(tlv.oxm_value)
177
        addr_str = str(hw_address)
178
        value = addr_str
179
        if tlv.oxm_hasmask:
180
            hw_mask = HWAddress()
181
            hw_mask.unpack(tlv.oxm_value[6:])
182
            mask_str = str(hw_mask)
183
            value = f'{addr_str}/{mask_str}'
184
        return cls(value)
185
186
187 1 View Code Duplication
class MatchDLDst(MatchField):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
188
    """Match for datalink destination."""
189
190 1
    name = 'dl_dst'
191 1
    oxm_field = OxmOfbMatchField.OFPXMT_OFB_ETH_DST
192
193 1
    def as_of_tlv(self):
194
        """Return a pyof OXM TLV instance."""
195
        if '/' in self.value:
196
            value, mask = self.value.split('/')
197
            if mask.upper() == 'FF:FF:FF:FF:FF:FF':
198
                mask = None
199
                oxm_hasmask = False
200
            else:
201
                mask = mask.upper()
202
                oxm_hasmask = True
203
        else:
204
            value = self.value
205
            mask = None
206
            oxm_hasmask = False
207
        value_bytes = HWAddress(value).pack()
208
        if mask:
209
            value_bytes += HWAddress(mask).pack()
210
        return OxmTLV(oxm_field=self.oxm_field,
211
                      oxm_hasmask=oxm_hasmask,
212
                      oxm_value=value_bytes)
213
214 1
    @classmethod
215
    def from_of_tlv(cls, tlv):
216
        """Return an instance from a pyof OXM TLV."""
217
        hw_address = HWAddress()
218
        hw_address.unpack(tlv.oxm_value)
219
        addr_str = str(hw_address)
220
        value = addr_str
221
        if tlv.oxm_hasmask:
222
            hw_mask = HWAddress()
223
            hw_mask.unpack(tlv.oxm_value[6:])
224
            mask_str = str(hw_mask)
225
            value = f'{addr_str}/{mask_str}'
226
        return cls(value)
227
228
229 1
class MatchDLType(MatchField):
230
    """Match for datalink type."""
231
232 1
    name = 'dl_type'
233 1
    oxm_field = OxmOfbMatchField.OFPXMT_OFB_ETH_TYPE
234
235 1
    def as_of_tlv(self):
236
        """Return a pyof OXM TLV instance."""
237
        value_bytes = self.value.to_bytes(2, 'big')
238
        return OxmTLV(oxm_field=self.oxm_field, oxm_value=value_bytes)
239
240 1
    @classmethod
241
    def from_of_tlv(cls, tlv):
242
        """Return an instance from a pyof OXM TLV."""
243
        port = int.from_bytes(tlv.oxm_value, 'big')
244
        return cls(port)
245
246
247 1 View Code Duplication
class MatchNwSrc(MatchField):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
248
    """Match for IPV4 source."""
249
250 1
    name = 'nw_src'
251 1
    oxm_field = OxmOfbMatchField.OFPXMT_OFB_IPV4_SRC
252
253 1
    def as_of_tlv(self):
254
        """Return a pyof OXM TLV instance."""
255
        ip_addr = IPAddress(self.value)
256
        value_bytes = ip_addr.pack()
257
        if ip_addr.netmask < 32:
258
            value_bytes += mask_to_bytes(ip_addr.netmask, 32)
259
        return OxmTLV(oxm_field=self.oxm_field,
260
                      oxm_hasmask=ip_addr.netmask < 32,
261
                      oxm_value=value_bytes)
262
263 1
    @classmethod
264
    def from_of_tlv(cls, tlv):
265
        """Return an instance from a pyof OXM TLV."""
266
        ip_address = IPAddress()
267
        ip_address.unpack(tlv.oxm_value)
268
        value = str(ip_address)
269
        if tlv.oxm_hasmask:
270
            value = f'{value}/{bytes_to_mask(tlv.oxm_value[4:], 32)}'
271
        return cls(value)
272
273
274 1 View Code Duplication
class MatchNwDst(MatchField):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
275
    """Match for IPV4 destination."""
276
277 1
    name = 'nw_dst'
278 1
    oxm_field = OxmOfbMatchField.OFPXMT_OFB_IPV4_DST
279
280 1
    def as_of_tlv(self):
281
        """Return a pyof OXM TLV instance."""
282
        ip_addr = IPAddress(self.value)
283
        value_bytes = ip_addr.pack()
284
        if ip_addr.netmask < 32:
285
            value_bytes += mask_to_bytes(ip_addr.netmask, 32)
286
        return OxmTLV(oxm_field=self.oxm_field,
287
                      oxm_hasmask=ip_addr.netmask < 32,
288
                      oxm_value=value_bytes)
289
290 1
    @classmethod
291
    def from_of_tlv(cls, tlv):
292
        """Return an instance from a pyof OXM TLV."""
293
        ip_address = IPAddress()
294
        ip_address.unpack(tlv.oxm_value)
295
        value = str(ip_address)
296
        if tlv.oxm_hasmask:
297
            value = f'{value}/{bytes_to_mask(tlv.oxm_value[4:], 32)}'
298
        return cls(value)
299
300
301 1
class MatchNwProto(MatchField):
302
    """Match for IP protocol."""
303
304 1
    name = 'nw_proto'
305 1
    oxm_field = OxmOfbMatchField.OFPXMT_OFB_IP_PROTO
306
307 1
    def as_of_tlv(self):
308
        """Return a pyof OXM TLV instance."""
309
        value_bytes = self.value.to_bytes(1, 'big')
310
        return OxmTLV(oxm_field=self.oxm_field, oxm_value=value_bytes)
311
312 1
    @classmethod
313
    def from_of_tlv(cls, tlv):
314
        """Return an instance from a pyof OXM TLV."""
315
        priority = int.from_bytes(tlv.oxm_value, 'big')
316
        return cls(priority)
317
318
319 1
class MatchInPort(MatchField):
320
    """Match for input port."""
321
322 1
    name = 'in_port'
323 1
    oxm_field = OxmOfbMatchField.OFPXMT_OFB_IN_PORT
324
325 1
    def as_of_tlv(self):
326
        """Return a pyof OXM TLV instance."""
327
        value_bytes = self.value.to_bytes(4, 'big')
328
        return OxmTLV(oxm_field=self.oxm_field, oxm_value=value_bytes)
329
330 1
    @classmethod
331
    def from_of_tlv(cls, tlv):
332
        """Return an instance from a pyof OXM TLV."""
333
        port = int.from_bytes(tlv.oxm_value, 'big')
334
        return cls(port)
335
336
337 1
class MatchTCPSrc(MatchField):
338
    """Match for TCP source."""
339
340 1
    name = 'tp_src'
341 1
    oxm_field = OxmOfbMatchField.OFPXMT_OFB_TCP_SRC
342
343 1
    def as_of_tlv(self):
344
        """Return a pyof OXM TLV instance."""
345
        value_bytes = self.value.to_bytes(2, 'big')
346
        return OxmTLV(oxm_field=self.oxm_field, oxm_value=value_bytes)
347
348 1
    @classmethod
349
    def from_of_tlv(cls, tlv):
350
        """Return an instance from a pyof OXM TLV."""
351
        port = int.from_bytes(tlv.oxm_value, 'big')
352
        return cls(port)
353
354
355 1
class MatchTCPDst(MatchField):
356
    """Match for TCP destination."""
357
358 1
    name = 'tp_dst'
359 1
    oxm_field = OxmOfbMatchField.OFPXMT_OFB_TCP_DST
360
361 1
    def as_of_tlv(self):
362
        """Return a pyof OXM TLV instance."""
363
        value_bytes = self.value.to_bytes(2, 'big')
364
        return OxmTLV(oxm_field=self.oxm_field, oxm_value=value_bytes)
365
366 1
    @classmethod
367
    def from_of_tlv(cls, tlv):
368
        """Return an instance from a pyof OXM TLV."""
369
        port = int.from_bytes(tlv.oxm_value, 'big')
370
        return cls(port)
371
372
373 1
class MatchFieldFactory(ABC):
374
    """Create the correct MatchField subclass instance.
375
376
    As OF 1.3 has many match fields and there are many ways to (un)pack their
377
    OxmTLV.oxm_value, this class does all the work of finding the correct
378
    MatchField class and instantiating the corresponding object.
379
    """
380
381 1
    __classes = {}
382
383 1
    @classmethod
384
    def from_name(cls, name, value):
385
        """Return the proper object from name and value."""
386
        field_class = cls._get_class(name)
387
        if field_class:
388
            return field_class(value)
389
        return None
390
391 1
    @classmethod
392
    def from_of_tlv(cls, tlv):
393
        """Return the proper object from a pyof OXM TLV."""
394
        field_class = cls._get_class(tlv.oxm_field)
395
        if field_class:
396
            return field_class.from_of_tlv(tlv)
397
        return None
398
399 1
    @classmethod
400
    def _get_class(cls, name_or_field):
401
        """Return the proper object from field name or OxmTLV.oxm_field."""
402
        if not cls.__classes:
403
            cls._index_classes()
404
        return cls.__classes.get(name_or_field)
405
406 1
    @classmethod
407
    def _index_classes(cls):
408
        for subclass in MatchField.__subclasses__():
409
            cls.__classes[subclass.name] = subclass
410
            cls.__classes[subclass.oxm_field] = subclass
411