Test Failed
Pull Request — master (#82)
by
unknown
01:53
created

build.v0x04.match_fields   B

Complexity

Total Complexity 51

Size/Duplication

Total Lines 390
Duplicated Lines 33.33 %

Test Coverage

Coverage 52.41%

Importance

Changes 0
Metric Value
wmc 51
eloc 238
dl 130
loc 390
ccs 76
cts 145
cp 0.5241
rs 7.92
c 0
b 0
f 0

32 Methods

Rating   Name   Duplication   Size   Complexity  
A MatchFieldFactory._index_classes() 0 5 2
A MatchField.oxm_field() 0 5 1
A MatchField.__init__() 0 3 1
A MatchTCPDst.as_of_tlv() 0 4 1
A MatchField.from_of_tlv() 0 4 1
A MatchTCPSrc.from_of_tlv() 0 5 1
A MatchFieldFactory.from_name() 0 7 2
A MatchField.__eq__() 0 7 1
A MatchInPort.from_of_tlv() 0 5 1
A MatchTCPSrc.as_of_tlv() 0 4 1
A MatchField.as_of_tlv() 0 3 1
A MatchNwDst.from_of_tlv() 10 10 2
A MatchFieldFactory.from_of_tlv() 0 7 2
A MatchDLVLAN.from_of_tlv() 0 9 2
A MatchDLSrc.from_of_tlv() 12 13 2
A MatchDLDst.as_of_tlv() 20 20 4
A MatchDLDst.from_of_tlv() 12 13 2
A MatchNwDst.as_of_tlv() 9 9 2
A MatchDLVLAN.as_of_tlv() 0 17 3
A MatchNwProto.as_of_tlv() 0 4 1
A MatchDLVLANPCP.from_of_tlv() 0 5 1
A MatchTCPDst.from_of_tlv() 0 5 1
A MatchNwSrc.from_of_tlv() 10 10 2
A MatchDLType.from_of_tlv() 0 5 1
A MatchInPort.as_of_tlv() 0 4 1
A MatchField.name() 0 5 1
A MatchDLVLANPCP.as_of_tlv() 0 4 1
A MatchFieldFactory._get_class() 0 6 2
A MatchNwProto.from_of_tlv() 0 5 1
A MatchDLType.as_of_tlv() 0 4 1
A MatchNwSrc.as_of_tlv() 9 9 2
A MatchDLSrc.as_of_tlv() 20 20 4

How to fix   Duplicated Code    Complexity   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

Complexity

 Tip:   Before tackling complexity, make sure that you eliminate any duplication first. This often can reduce the size of classes significantly.

Complex classes like build.v0x04.match_fields often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""OpenFlow 1.3 OXM match fields.
2
3
Flow's match is very different from OF 1.0. Instead of always having all
4
fields, there's a variable list of match fields and each one is an Openflow
5
eXtended Match Type-Length-Value (OXM TLV) element.
6
7
This module provides high-level Python classes for OXM TLV fields in order to
8
make the OF 1.3 match fields easy to use and to be coded.
9
"""
10 1
from abc import ABC, abstractmethod
11
12 1
from pyof.foundation.basic_types import HWAddress, IPAddress
13 1
from pyof.v0x04.common.flow_match import OxmOfbMatchField, OxmTLV, VlanId
14
from napps.kytos.of_core.v0x04.utils import mask_to_bytes, bytes_to_mask
15
16 1
17
class MatchField(ABC):
18
    """Base class for match fields. Abstract OXM TLVs of python-openflow.
19
20
    Just extend this class and you will be forced to define the required
21
    low-level attributes and methods below:
22
23
    * "name" attribute (field name to be displayed in JSON);
24
    * "oxm_field" attribute (``OxmOfbMatchField`` enum);
25
    * Method to return a pyof OxmTLV;
26
    * Method to create an instance from an OxmTLV.
27
    """
28 1
29
    def __init__(self, value):
30
        """Define match field value."""
31
        self.value = value
32 1
33 1
    @property
34 1
    @classmethod
35
    @abstractmethod
36
    def name(cls):
37
        """Define a name to be displayed in JSON.
38
39
        It can be overriden just by a class attibute.
40
        """
41 1
42 1
    @property
43 1
    @classmethod
44
    @abstractmethod
45
    def oxm_field(cls):
46
        """Define this subclass ``OxmOfbMatchField`` value.
47
48
        It can be overriden just by as a class attibute.
49
        """
50 1
51
    @abstractmethod
52
    def as_of_tlv(self):
53
        """Return a pyof OXM TLV instance."""
54 1
55 1
    @classmethod
56
    @abstractmethod
57
    def from_of_tlv(cls, tlv):
58
        """Return an instance from a pyof OXM TLV."""
59 1
60
    def __eq__(self, other):
61
        """Two objects are equal if their values are the same.
62
63
        The oxm_field equality is checked indirectly when comparing whether
64
        the objects are instances of the same class.
65
        """
66
        return isinstance(other, self.__class__) and other.value == self.value
67
68 1
69
class MatchDLVLAN(MatchField):
70
    """Match for datalink VLAN ID."""
71 1
72 1
    name = 'dl_vlan'
73
    oxm_field = OxmOfbMatchField.OFPXMT_OFB_VLAN_VID
74 1
75
    def as_of_tlv(self):
76
        """Return a pyof OXM TLV instance."""
77
        try:
78
            value = int(self.value)
79
            mask = None
80 1
            oxm_hasmask = False
81
        except ValueError:
82
            value, mask = map(int, self.value.split('/'))
83
            oxm_hasmask = True
84
        value = value | VlanId.OFPVID_PRESENT
85
        value_bytes = value.to_bytes(2, 'big')
86
        if mask:
87 1
            mask = mask | VlanId.OFPVID_PRESENT
88
            value_bytes += mask.to_bytes(2, 'big')
89
        return OxmTLV(oxm_field=self.oxm_field,
90 1
                      oxm_hasmask=oxm_hasmask,
91 1
                      oxm_value=value_bytes)
92
93 1
    @classmethod
94
    def from_of_tlv(cls, tlv):
95
        """Return an instance from a pyof OXM TLV."""
96
        vlan_id = int.from_bytes(tlv.oxm_value[:2], 'big') & 4095
97
        value = vlan_id
98 1
        if tlv.oxm_hasmask:
99
            vlan_mask = int.from_bytes(tlv.oxm_value[2:], 'big') & 4095
100
            value = f'{vlan_id}/{vlan_mask}'
101
        return cls(value)
102
103
104
class MatchDLVLANPCP(MatchField):
105 1
    """Match for VLAN Priority Code Point."""
106
107
    name = 'dl_vlan_pcp'
108 1
    oxm_field = OxmOfbMatchField.OFPXMT_OFB_VLAN_PCP
109 1
110
    def as_of_tlv(self):
111 1
        """Return a pyof OXM TLV instance."""
112
        value_bytes = self.value.to_bytes(1, 'big')
113
        return OxmTLV(oxm_field=self.oxm_field, oxm_value=value_bytes)
114
115
    @classmethod
116 1
    def from_of_tlv(cls, tlv):
117
        """Return an instance from a pyof OXM TLV."""
118
        priority = int.from_bytes(tlv.oxm_value, 'big')
119
        return cls(priority)
120
121
122 View Code Duplication
class MatchDLSrc(MatchField):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
123
    """Match for datalink source."""
124
125 1
    name = 'dl_src'
126
    oxm_field = OxmOfbMatchField.OFPXMT_OFB_ETH_SRC
127
128 1
    def as_of_tlv(self):
129 1
        """Return a pyof OXM TLV instance."""
130
        if '/' in self.value:
131 1
            value, mask = self.value.split('/')
132
            if mask.upper() == 'FF:FF:FF:FF:FF:FF':
133
                mask = None
134
                oxm_hasmask = False
135
            else:
136 1
                mask = mask.upper()
137
                oxm_hasmask = True
138
        else:
139
            value = self.value
140
            mask = None
141
            oxm_hasmask = False
142
        value_bytes = HWAddress(value).pack()
143
        if mask:
144
            value_bytes += HWAddress(mask).pack()
145 1
        return OxmTLV(oxm_field=self.oxm_field,
146
                      oxm_hasmask=oxm_hasmask,
147
                      oxm_value=value_bytes)
148 1
149 1
    @classmethod
150
    def from_of_tlv(cls, tlv):
151 1
        """Return an instance from a pyof OXM TLV."""
152
        hw_address = HWAddress()
153
        hw_address.unpack(tlv.oxm_value)
154
        addr_str = str(hw_address)
155
        value = addr_str
156 1
        if tlv.oxm_hasmask:
157
            hw_mask = HWAddress()
158
            hw_mask.unpack(tlv.oxm_value[6:])
159
            mask_str = str(hw_mask)
160
            value = f'{addr_str}/{mask_str}'
161
        return cls(value)
162
163 1
164 View Code Duplication
class MatchDLDst(MatchField):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
165
    """Match for datalink destination."""
166 1
167 1
    name = 'dl_dst'
168
    oxm_field = OxmOfbMatchField.OFPXMT_OFB_ETH_DST
169 1
170
    def as_of_tlv(self):
171
        """Return a pyof OXM TLV instance."""
172
        if '/' in self.value:
173
            value, mask = self.value.split('/')
174 1
            if mask.upper() == 'FF:FF:FF:FF:FF:FF':
175
                mask = None
176
                oxm_hasmask = False
177
            else:
178
                mask = mask.upper()
179
                oxm_hasmask = True
180
        else:
181
            value = self.value
182
            mask = None
183 1
            oxm_hasmask = False
184
        value_bytes = HWAddress(value).pack()
185
        if mask:
186 1
            value_bytes += HWAddress(mask).pack()
187 1
        return OxmTLV(oxm_field=self.oxm_field,
188
                      oxm_hasmask=oxm_hasmask,
189 1
                      oxm_value=value_bytes)
190
191
    @classmethod
192
    def from_of_tlv(cls, tlv):
193
        """Return an instance from a pyof OXM TLV."""
194 1
        hw_address = HWAddress()
195
        hw_address.unpack(tlv.oxm_value)
196
        addr_str = str(hw_address)
197
        value = addr_str
198
        if tlv.oxm_hasmask:
199
            hw_mask = HWAddress()
200
            hw_mask.unpack(tlv.oxm_value[6:])
201
            mask_str = str(hw_mask)
202
            value = f'{addr_str}/{mask_str}'
203 1
        return cls(value)
204
205
206 1
class MatchDLType(MatchField):
207 1
    """Match for datalink type."""
208
209 1
    name = 'dl_type'
210
    oxm_field = OxmOfbMatchField.OFPXMT_OFB_ETH_TYPE
211
212
    def as_of_tlv(self):
213
        """Return a pyof OXM TLV instance."""
214 1
        value_bytes = self.value.to_bytes(2, 'big')
215
        return OxmTLV(oxm_field=self.oxm_field, oxm_value=value_bytes)
216
217
    @classmethod
218
    def from_of_tlv(cls, tlv):
219
        """Return an instance from a pyof OXM TLV."""
220
        port = int.from_bytes(tlv.oxm_value, 'big')
221 1
        return cls(port)
222
223
224 1 View Code Duplication
class MatchNwSrc(MatchField):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
225 1
    """Match for IPV4 source."""
226
227 1
    name = 'nw_src'
228
    oxm_field = OxmOfbMatchField.OFPXMT_OFB_IPV4_SRC
229
230
    def as_of_tlv(self):
231
        """Return a pyof OXM TLV instance."""
232 1
        ip_addr = IPAddress(self.value)
233
        value_bytes = ip_addr.pack()
234
        if ip_addr.netmask < 32:
235
            value_bytes += mask_to_bytes(ip_addr.netmask, 32)
236
        return OxmTLV(oxm_field=self.oxm_field,
237
                      oxm_hasmask=ip_addr.netmask < 32,
238
                      oxm_value=value_bytes)
239 1
240
    @classmethod
241
    def from_of_tlv(cls, tlv):
242 1
        """Return an instance from a pyof OXM TLV."""
243 1
        ip_address = IPAddress()
244
        ip_address.unpack(tlv.oxm_value)
245 1
        addr_str = str(ip_address)
246
        value = addr_str
247
        if tlv.oxm_hasmask:
248
            value = f'{addr_str}/{bytes_to_mask(tlv.oxm_value[4:], 32)}'
249
        return cls(value)
250 1
251
252 View Code Duplication
class MatchNwDst(MatchField):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
253
    """Match for IPV4 destination."""
254
255
    name = 'nw_dst'
256
    oxm_field = OxmOfbMatchField.OFPXMT_OFB_IPV4_DST
257 1
258
    def as_of_tlv(self):
259
        """Return a pyof OXM TLV instance."""
260 1
        ip_addr = IPAddress(self.value)
261 1
        value_bytes = ip_addr.pack()
262
        if ip_addr.netmask < 32:
263 1
            value_bytes += mask_to_bytes(ip_addr.netmask, 32)
264
        return OxmTLV(oxm_field=self.oxm_field,
265
                      oxm_hasmask=ip_addr.netmask < 32,
266
                      oxm_value=value_bytes)
267
268 1
    @classmethod
269
    def from_of_tlv(cls, tlv):
270
        """Return an instance from a pyof OXM TLV."""
271
        ip_address = IPAddress()
272
        ip_address.unpack(tlv.oxm_value)
273
        addr_str = str(ip_address)
274
        value = addr_str
275 1
        if tlv.oxm_hasmask:
276
            value = f'{addr_str}/{bytes_to_mask(tlv.oxm_value[4:], 32)}'
277
        return cls(value)
278
279
280
class MatchNwProto(MatchField):
281
    """Match for IP protocol."""
282
283 1
    name = 'nw_proto'
284
    oxm_field = OxmOfbMatchField.OFPXMT_OFB_IP_PROTO
285 1
286
    def as_of_tlv(self):
287
        """Return a pyof OXM TLV instance."""
288
        value_bytes = self.value.to_bytes(1, 'big')
289
        return OxmTLV(oxm_field=self.oxm_field, oxm_value=value_bytes)
290
291
    @classmethod
292
    def from_of_tlv(cls, tlv):
293 1
        """Return an instance from a pyof OXM TLV."""
294
        priority = int.from_bytes(tlv.oxm_value, 'big')
295
        return cls(priority)
296
297
298
class MatchInPort(MatchField):
299
    """Match for input port."""
300
301 1
    name = 'in_port'
302
    oxm_field = OxmOfbMatchField.OFPXMT_OFB_IN_PORT
303
304
    def as_of_tlv(self):
305
        """Return a pyof OXM TLV instance."""
306
        value_bytes = self.value.to_bytes(4, 'big')
307
        return OxmTLV(oxm_field=self.oxm_field, oxm_value=value_bytes)
308 1
309
    @classmethod
310
    def from_of_tlv(cls, tlv):
311
        """Return an instance from a pyof OXM TLV."""
312
        port = int.from_bytes(tlv.oxm_value, 'big')
313
        return cls(port)
314
315
316
class MatchTCPSrc(MatchField):
317
    """Match for TCP source."""
318
319
    name = 'tp_src'
320
    oxm_field = OxmOfbMatchField.OFPXMT_OFB_TCP_SRC
321
322
    def as_of_tlv(self):
323
        """Return a pyof OXM TLV instance."""
324
        value_bytes = self.value.to_bytes(2, 'big')
325
        return OxmTLV(oxm_field=self.oxm_field, oxm_value=value_bytes)
326
327
    @classmethod
328
    def from_of_tlv(cls, tlv):
329
        """Return an instance from a pyof OXM TLV."""
330
        port = int.from_bytes(tlv.oxm_value, 'big')
331
        return cls(port)
332
333
334
class MatchTCPDst(MatchField):
335
    """Match for TCP destination."""
336
337
    name = 'tp_dst'
338
    oxm_field = OxmOfbMatchField.OFPXMT_OFB_TCP_DST
339
340
    def as_of_tlv(self):
341
        """Return a pyof OXM TLV instance."""
342
        value_bytes = self.value.to_bytes(2, 'big')
343
        return OxmTLV(oxm_field=self.oxm_field, oxm_value=value_bytes)
344
345
    @classmethod
346
    def from_of_tlv(cls, tlv):
347
        """Return an instance from a pyof OXM TLV."""
348
        port = int.from_bytes(tlv.oxm_value, 'big')
349
        return cls(port)
350
351
352
class MatchFieldFactory(ABC):
353
    """Create the correct MatchField subclass instance.
354
355
    As OF 1.3 has many match fields and there are many ways to (un)pack their
356
    OxmTLV.oxm_value, this class does all the work of finding the correct
357
    MatchField class and instantiating the corresponding object.
358
    """
359
360
    __classes = {}
361
362
    @classmethod
363
    def from_name(cls, name, value):
364
        """Return the proper object from name and value."""
365
        field_class = cls._get_class(name)
366
        if field_class:
367
            return field_class(value)
368
        return None
369
370
    @classmethod
371
    def from_of_tlv(cls, tlv):
372
        """Return the proper object from a pyof OXM TLV."""
373
        field_class = cls._get_class(tlv.oxm_field)
374
        if field_class:
375
            return field_class.from_of_tlv(tlv)
376
        return None
377
378
    @classmethod
379
    def _get_class(cls, name_or_field):
380
        """Return the proper object from field name or OxmTLV.oxm_field."""
381
        if not cls.__classes:
382
            cls._index_classes()
383
        return cls.__classes.get(name_or_field)
384
385
    @classmethod
386
    def _index_classes(cls):
387
        for subclass in MatchField.__subclasses__():
388
            cls.__classes[subclass.name] = subclass
389
            cls.__classes[subclass.oxm_field] = subclass
390