Passed
Push — master ( e18d69...5f36f8 )
by Humberto
02:10
created

build.v0x04.match_fields   B

Complexity

Total Complexity 51

Size/Duplication

Total Lines 391
Duplicated Lines 33.25 %

Test Coverage

Coverage 45.19%

Importance

Changes 0
Metric Value
wmc 51
eloc 238
dl 130
loc 391
ccs 94
cts 208
cp 0.4519
rs 7.92
c 0
b 0
f 0

32 Methods

Rating   Name   Duplication   Size   Complexity  
A MatchField.as_of_tlv() 0 3 1
A MatchDLVLANPCP.from_of_tlv() 0 5 1
A MatchInPort.as_of_tlv() 0 4 1
A MatchDLVLANPCP.as_of_tlv() 0 4 1
A MatchField.oxm_field() 0 5 1
A MatchTCPDst.as_of_tlv() 0 4 1
A MatchField.from_of_tlv() 0 4 1
A MatchTCPSrc.from_of_tlv() 0 5 1
A MatchField.__eq__() 0 7 1
A MatchInPort.from_of_tlv() 0 5 1
A MatchTCPSrc.as_of_tlv() 0 4 1
A MatchNwDst.from_of_tlv() 10 10 2
A MatchFieldFactory.from_of_tlv() 0 7 2
A MatchDLVLAN.from_of_tlv() 0 9 2
A MatchDLSrc.from_of_tlv() 12 13 2
A MatchDLDst.as_of_tlv() 20 20 4
A MatchDLDst.from_of_tlv() 12 13 2
A MatchNwDst.as_of_tlv() 9 9 2
A MatchDLVLAN.as_of_tlv() 0 17 3
A MatchNwProto.as_of_tlv() 0 4 1
A MatchTCPDst.from_of_tlv() 0 5 1
A MatchNwSrc.from_of_tlv() 10 10 2
A MatchDLType.from_of_tlv() 0 5 1
A MatchField.name() 0 5 1
A MatchNwProto.from_of_tlv() 0 5 1
A MatchDLType.as_of_tlv() 0 4 1
A MatchNwSrc.as_of_tlv() 9 9 2
A MatchField.__init__() 0 3 1
A MatchFieldFactory._index_classes() 0 5 2
A MatchFieldFactory.from_name() 0 7 2
A MatchFieldFactory._get_class() 0 6 2
A MatchDLSrc.as_of_tlv() 20 20 4

How to fix   Duplicated Code    Complexity   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

Complexity

 Tip:   Before tackling complexity, make sure that you eliminate any duplication first. This often can reduce the size of classes significantly.

Complex classes like build.v0x04.match_fields often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""OpenFlow 1.3 OXM match fields.
2
3
Flow's match is very different from OF 1.0. Instead of always having all
4
fields, there's a variable list of match fields and each one is an Openflow
5
eXtended Match Type-Length-Value (OXM TLV) element.
6
7
This module provides high-level Python classes for OXM TLV fields in order to
8
make the OF 1.3 match fields easy to use and to be coded.
9
"""
10 1
from abc import ABC, abstractmethod
11
12 1
from pyof.foundation.basic_types import HWAddress, IPAddress
13 1
from pyof.v0x04.common.flow_match import OxmOfbMatchField, OxmTLV, VlanId
14
15 1
from napps.kytos.of_core.v0x04.utils import bytes_to_mask, mask_to_bytes
16
17
18 1
class MatchField(ABC):
19
    """Base class for match fields. Abstract OXM TLVs of python-openflow.
20
21
    Just extend this class and you will be forced to define the required
22
    low-level attributes and methods below:
23
24
    * "name" attribute (field name to be displayed in JSON);
25
    * "oxm_field" attribute (``OxmOfbMatchField`` enum);
26
    * Method to return a pyof OxmTLV;
27
    * Method to create an instance from an OxmTLV.
28
    """
29
30 1
    def __init__(self, value):
31
        """Define match field value."""
32 1
        self.value = value
33
34 1
    @property
35 1
    @classmethod
36 1
    @abstractmethod
37
    def name(cls):
38
        """Define a name to be displayed in JSON.
39
40
        It can be overriden just by a class attibute.
41
        """
42
43 1
    @property
44 1
    @classmethod
45 1
    @abstractmethod
46
    def oxm_field(cls):
47
        """Define this subclass ``OxmOfbMatchField`` value.
48
49
        It can be overriden just by as a class attibute.
50
        """
51
52 1
    @abstractmethod
53
    def as_of_tlv(self):
54
        """Return a pyof OXM TLV instance."""
55
56 1
    @classmethod
57 1
    @abstractmethod
58
    def from_of_tlv(cls, tlv):
59
        """Return an instance from a pyof OXM TLV."""
60
61 1
    def __eq__(self, other):
62
        """Two objects are equal if their values are the same.
63
64
        The oxm_field equality is checked indirectly when comparing whether
65
        the objects are instances of the same class.
66
        """
67
        return isinstance(other, self.__class__) and other.value == self.value
68
69
70 1
class MatchDLVLAN(MatchField):
71
    """Match for datalink VLAN ID."""
72
73 1
    name = 'dl_vlan'
74 1
    oxm_field = OxmOfbMatchField.OFPXMT_OFB_VLAN_VID
75
76 1
    def as_of_tlv(self):
77
        """Return a pyof OXM TLV instance."""
78
        try:
79
            value = int(self.value)
80
            mask = None
81
            oxm_hasmask = False
82
        except ValueError:
83
            value, mask = map(int, self.value.split('/'))
84
            oxm_hasmask = True
85
        value = value | VlanId.OFPVID_PRESENT
86
        value_bytes = value.to_bytes(2, 'big')
87
        if mask:
88
            mask = mask | VlanId.OFPVID_PRESENT
89
            value_bytes += mask.to_bytes(2, 'big')
90
        return OxmTLV(oxm_field=self.oxm_field,
91
                      oxm_hasmask=oxm_hasmask,
92
                      oxm_value=value_bytes)
93
94 1
    @classmethod
95
    def from_of_tlv(cls, tlv):
96
        """Return an instance from a pyof OXM TLV."""
97
        vlan_id = int.from_bytes(tlv.oxm_value[:2], 'big') & 4095
98
        value = vlan_id
99
        if tlv.oxm_hasmask:
100
            vlan_mask = int.from_bytes(tlv.oxm_value[2:], 'big') & 4095
101
            value = f'{vlan_id}/{vlan_mask}'
102
        return cls(value)
103
104
105 1
class MatchDLVLANPCP(MatchField):
106
    """Match for VLAN Priority Code Point."""
107
108 1
    name = 'dl_vlan_pcp'
109 1
    oxm_field = OxmOfbMatchField.OFPXMT_OFB_VLAN_PCP
110
111 1
    def as_of_tlv(self):
112
        """Return a pyof OXM TLV instance."""
113
        value_bytes = self.value.to_bytes(1, 'big')
114
        return OxmTLV(oxm_field=self.oxm_field, oxm_value=value_bytes)
115
116 1
    @classmethod
117
    def from_of_tlv(cls, tlv):
118
        """Return an instance from a pyof OXM TLV."""
119
        priority = int.from_bytes(tlv.oxm_value, 'big')
120
        return cls(priority)
121
122
123 1 View Code Duplication
class MatchDLSrc(MatchField):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
124
    """Match for datalink source."""
125
126 1
    name = 'dl_src'
127 1
    oxm_field = OxmOfbMatchField.OFPXMT_OFB_ETH_SRC
128
129 1
    def as_of_tlv(self):
130
        """Return a pyof OXM TLV instance."""
131 1
        if '/' in self.value:
132
            value, mask = self.value.split('/')
133
            if mask.upper() == 'FF:FF:FF:FF:FF:FF':
134
                mask = None
135
                oxm_hasmask = False
136
            else:
137
                mask = mask.upper()
138
                oxm_hasmask = True
139
        else:
140 1
            value = self.value
141 1
            mask = None
142 1
            oxm_hasmask = False
143 1
        value_bytes = HWAddress(value).pack()
144 1
        if mask:
145
            value_bytes += HWAddress(mask).pack()
146 1
        return OxmTLV(oxm_field=self.oxm_field,
147
                      oxm_hasmask=oxm_hasmask,
148
                      oxm_value=value_bytes)
149
150 1
    @classmethod
151
    def from_of_tlv(cls, tlv):
152
        """Return an instance from a pyof OXM TLV."""
153
        hw_address = HWAddress()
154
        hw_address.unpack(tlv.oxm_value)
155
        addr_str = str(hw_address)
156
        value = addr_str
157
        if tlv.oxm_hasmask:
158
            hw_mask = HWAddress()
159
            hw_mask.unpack(tlv.oxm_value[6:])
160
            mask_str = str(hw_mask)
161
            value = f'{addr_str}/{mask_str}'
162
        return cls(value)
163
164
165 1 View Code Duplication
class MatchDLDst(MatchField):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
166
    """Match for datalink destination."""
167
168 1
    name = 'dl_dst'
169 1
    oxm_field = OxmOfbMatchField.OFPXMT_OFB_ETH_DST
170
171 1
    def as_of_tlv(self):
172
        """Return a pyof OXM TLV instance."""
173
        if '/' in self.value:
174
            value, mask = self.value.split('/')
175
            if mask.upper() == 'FF:FF:FF:FF:FF:FF':
176
                mask = None
177
                oxm_hasmask = False
178
            else:
179
                mask = mask.upper()
180
                oxm_hasmask = True
181
        else:
182
            value = self.value
183
            mask = None
184
            oxm_hasmask = False
185
        value_bytes = HWAddress(value).pack()
186
        if mask:
187
            value_bytes += HWAddress(mask).pack()
188
        return OxmTLV(oxm_field=self.oxm_field,
189
                      oxm_hasmask=oxm_hasmask,
190
                      oxm_value=value_bytes)
191
192 1
    @classmethod
193
    def from_of_tlv(cls, tlv):
194
        """Return an instance from a pyof OXM TLV."""
195
        hw_address = HWAddress()
196
        hw_address.unpack(tlv.oxm_value)
197
        addr_str = str(hw_address)
198
        value = addr_str
199
        if tlv.oxm_hasmask:
200
            hw_mask = HWAddress()
201
            hw_mask.unpack(tlv.oxm_value[6:])
202
            mask_str = str(hw_mask)
203
            value = f'{addr_str}/{mask_str}'
204
        return cls(value)
205
206
207 1
class MatchDLType(MatchField):
208
    """Match for datalink type."""
209
210 1
    name = 'dl_type'
211 1
    oxm_field = OxmOfbMatchField.OFPXMT_OFB_ETH_TYPE
212
213 1
    def as_of_tlv(self):
214
        """Return a pyof OXM TLV instance."""
215
        value_bytes = self.value.to_bytes(2, 'big')
216
        return OxmTLV(oxm_field=self.oxm_field, oxm_value=value_bytes)
217
218 1
    @classmethod
219
    def from_of_tlv(cls, tlv):
220
        """Return an instance from a pyof OXM TLV."""
221
        port = int.from_bytes(tlv.oxm_value, 'big')
222
        return cls(port)
223
224
225 1 View Code Duplication
class MatchNwSrc(MatchField):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
226
    """Match for IPV4 source."""
227
228 1
    name = 'nw_src'
229 1
    oxm_field = OxmOfbMatchField.OFPXMT_OFB_IPV4_SRC
230
231 1
    def as_of_tlv(self):
232
        """Return a pyof OXM TLV instance."""
233
        ip_addr = IPAddress(self.value)
234
        value_bytes = ip_addr.pack()
235
        if ip_addr.netmask < 32:
236
            value_bytes += mask_to_bytes(ip_addr.netmask, 32)
237
        return OxmTLV(oxm_field=self.oxm_field,
238
                      oxm_hasmask=ip_addr.netmask < 32,
239
                      oxm_value=value_bytes)
240
241 1
    @classmethod
242
    def from_of_tlv(cls, tlv):
243
        """Return an instance from a pyof OXM TLV."""
244
        ip_address = IPAddress()
245
        ip_address.unpack(tlv.oxm_value)
246
        addr_str = str(ip_address)
247
        value = addr_str
248
        if tlv.oxm_hasmask:
249
            value = f'{addr_str}/{bytes_to_mask(tlv.oxm_value[4:], 32)}'
250
        return cls(value)
251
252
253 1 View Code Duplication
class MatchNwDst(MatchField):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
254
    """Match for IPV4 destination."""
255
256 1
    name = 'nw_dst'
257 1
    oxm_field = OxmOfbMatchField.OFPXMT_OFB_IPV4_DST
258
259 1
    def as_of_tlv(self):
260
        """Return a pyof OXM TLV instance."""
261
        ip_addr = IPAddress(self.value)
262
        value_bytes = ip_addr.pack()
263
        if ip_addr.netmask < 32:
264
            value_bytes += mask_to_bytes(ip_addr.netmask, 32)
265
        return OxmTLV(oxm_field=self.oxm_field,
266
                      oxm_hasmask=ip_addr.netmask < 32,
267
                      oxm_value=value_bytes)
268
269 1
    @classmethod
270
    def from_of_tlv(cls, tlv):
271
        """Return an instance from a pyof OXM TLV."""
272
        ip_address = IPAddress()
273
        ip_address.unpack(tlv.oxm_value)
274
        addr_str = str(ip_address)
275
        value = addr_str
276
        if tlv.oxm_hasmask:
277
            value = f'{addr_str}/{bytes_to_mask(tlv.oxm_value[4:], 32)}'
278
        return cls(value)
279
280
281 1
class MatchNwProto(MatchField):
282
    """Match for IP protocol."""
283
284 1
    name = 'nw_proto'
285 1
    oxm_field = OxmOfbMatchField.OFPXMT_OFB_IP_PROTO
286
287 1
    def as_of_tlv(self):
288
        """Return a pyof OXM TLV instance."""
289
        value_bytes = self.value.to_bytes(1, 'big')
290
        return OxmTLV(oxm_field=self.oxm_field, oxm_value=value_bytes)
291
292 1
    @classmethod
293
    def from_of_tlv(cls, tlv):
294
        """Return an instance from a pyof OXM TLV."""
295
        priority = int.from_bytes(tlv.oxm_value, 'big')
296
        return cls(priority)
297
298
299 1
class MatchInPort(MatchField):
300
    """Match for input port."""
301
302 1
    name = 'in_port'
303 1
    oxm_field = OxmOfbMatchField.OFPXMT_OFB_IN_PORT
304
305 1
    def as_of_tlv(self):
306
        """Return a pyof OXM TLV instance."""
307
        value_bytes = self.value.to_bytes(4, 'big')
308
        return OxmTLV(oxm_field=self.oxm_field, oxm_value=value_bytes)
309
310 1
    @classmethod
311
    def from_of_tlv(cls, tlv):
312
        """Return an instance from a pyof OXM TLV."""
313
        port = int.from_bytes(tlv.oxm_value, 'big')
314
        return cls(port)
315
316
317 1
class MatchTCPSrc(MatchField):
318
    """Match for TCP source."""
319
320 1
    name = 'tp_src'
321 1
    oxm_field = OxmOfbMatchField.OFPXMT_OFB_TCP_SRC
322
323 1
    def as_of_tlv(self):
324
        """Return a pyof OXM TLV instance."""
325
        value_bytes = self.value.to_bytes(2, 'big')
326
        return OxmTLV(oxm_field=self.oxm_field, oxm_value=value_bytes)
327
328 1
    @classmethod
329
    def from_of_tlv(cls, tlv):
330
        """Return an instance from a pyof OXM TLV."""
331
        port = int.from_bytes(tlv.oxm_value, 'big')
332
        return cls(port)
333
334
335 1
class MatchTCPDst(MatchField):
336
    """Match for TCP destination."""
337
338 1
    name = 'tp_dst'
339 1
    oxm_field = OxmOfbMatchField.OFPXMT_OFB_TCP_DST
340
341 1
    def as_of_tlv(self):
342
        """Return a pyof OXM TLV instance."""
343
        value_bytes = self.value.to_bytes(2, 'big')
344
        return OxmTLV(oxm_field=self.oxm_field, oxm_value=value_bytes)
345
346 1
    @classmethod
347
    def from_of_tlv(cls, tlv):
348
        """Return an instance from a pyof OXM TLV."""
349
        port = int.from_bytes(tlv.oxm_value, 'big')
350
        return cls(port)
351
352
353 1
class MatchFieldFactory(ABC):
354
    """Create the correct MatchField subclass instance.
355
356
    As OF 1.3 has many match fields and there are many ways to (un)pack their
357
    OxmTLV.oxm_value, this class does all the work of finding the correct
358
    MatchField class and instantiating the corresponding object.
359
    """
360
361 1
    __classes = {}
362
363 1
    @classmethod
364
    def from_name(cls, name, value):
365
        """Return the proper object from name and value."""
366 1
        field_class = cls._get_class(name)
367 1
        if field_class:
368 1
            return field_class(value)
369
        return None
370
371 1
    @classmethod
372
    def from_of_tlv(cls, tlv):
373
        """Return the proper object from a pyof OXM TLV."""
374
        field_class = cls._get_class(tlv.oxm_field)
375
        if field_class:
376
            return field_class.from_of_tlv(tlv)
377
        return None
378
379 1
    @classmethod
380
    def _get_class(cls, name_or_field):
381
        """Return the proper object from field name or OxmTLV.oxm_field."""
382 1
        if not cls.__classes:
383 1
            cls._index_classes()
384 1
        return cls.__classes.get(name_or_field)
385
386 1
    @classmethod
387
    def _index_classes(cls):
388 1
        for subclass in MatchField.__subclasses__():
389 1
            cls.__classes[subclass.name] = subclass
390
            cls.__classes[subclass.oxm_field] = subclass
391