Completed
Pull Request — master (#82)
by
unknown
02:36
created

build.v0x04.match_fields   C

Complexity

Total Complexity 56

Size/Duplication

Total Lines 401
Duplicated Lines 30.92 %

Test Coverage

Coverage 52.41%

Importance

Changes 0
Metric Value
wmc 56
eloc 245
dl 124
loc 401
ccs 76
cts 145
cp 0.5241
rs 5.5199
c 0
b 0
f 0

32 Methods

Rating   Name   Duplication   Size   Complexity  
A MatchFieldFactory._index_classes() 0 5 2
A MatchField.oxm_field() 0 5 1
A MatchField.__init__() 0 3 1
A MatchTCPDst.as_of_tlv() 0 4 1
A MatchTCPSrc.from_of_tlv() 0 5 1
A MatchField.from_of_tlv() 0 4 1
A MatchFieldFactory.from_name() 0 7 2
A MatchField.__eq__() 0 7 1
A MatchInPort.from_of_tlv() 0 5 1
A MatchTCPSrc.as_of_tlv() 0 4 1
A MatchField.as_of_tlv() 0 3 1
A MatchNwDst.from_of_tlv() 9 9 2
A MatchFieldFactory.from_of_tlv() 0 7 2
A MatchDLVLAN.from_of_tlv() 0 9 2
A MatchDLSrc.from_of_tlv() 13 13 2
A MatchDLDst.as_of_tlv() 19 19 4
A MatchDLDst.from_of_tlv() 13 13 2
A MatchNwDst.as_of_tlv() 7 7 2
A MatchDLVLAN.as_of_tlv() 0 16 3
A MatchNwProto.as_of_tlv() 0 4 1
A MatchDLVLANPCP.from_of_tlv() 0 5 1
A MatchTCPDst.from_of_tlv() 0 5 1
A MatchNwSrc.from_of_tlv() 9 9 2
A MatchDLType.from_of_tlv() 0 5 1
A MatchInPort.as_of_tlv() 0 4 1
A MatchField.name() 0 5 1
A MatchDLVLANPCP.as_of_tlv() 0 4 1
A MatchFieldFactory._get_class() 0 6 2
A MatchNwProto.from_of_tlv() 0 5 1
A MatchDLType.as_of_tlv() 0 4 1
A MatchNwSrc.as_of_tlv() 7 7 2
A MatchDLSrc.as_of_tlv() 19 19 4

2 Functions

Rating   Name   Duplication   Size   Complexity  
A bytes_to_mask() 0 12 3
A mask_to_bytes() 0 6 2

How to fix   Duplicated Code    Complexity   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

Complexity

 Tip:   Before tackling complexity, make sure that you eliminate any duplication first. This often can reduce the size of classes significantly.

Complex classes like build.v0x04.match_fields often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""OpenFlow 1.3 OXM match fields.
2
3
Flow's match is very different from OF 1.0. Instead of always having all
4
fields, there's a variable list of match fields and each one is an Openflow
5
eXtended Match Type-Length-Value (OXM TLV) element.
6
7
This module provides high-level Python classes for OXM TLV fields in order to
8
make the OF 1.3 match fields easy to use and to be coded.
9
"""
10 1
from abc import ABC, abstractmethod
11
12 1
from pyof.foundation.basic_types import HWAddress, IPAddress
13 1
from pyof.v0x04.common.flow_match import OxmOfbMatchField, OxmTLV, VlanId
14
15
16 1
def mask_to_bytes(mask, size):
17
    bits = 0
18
    for i in range(size-mask,size):
19
        bits |= (1 << i)
20
    tobytes = bits.to_bytes(size//8, 'big')
21
    return tobytes
22
23
24
def bytes_to_mask(tobytes, size):
25
    int_mask = int.from_bytes(tobytes, 'big')
26
    bits = bin(int_mask)
27
    strbits = str(bits)
28 1
    strbits = strbits[2:]
29
    netmask = 0
30
    for i in range(size):
31
        if strbits[i] == '1':
32 1
            netmask += 1
33 1
        else:
34 1
            break
35
    return netmask
36
37
38
class MatchField(ABC):
39
    """Base class for match fields. Abstract OXM TLVs of python-openflow.
40
41 1
    Just extend this class and you will be forced to define the required
42 1
    low-level attributes and methods below:
43 1
44
    * "name" attribute (field name to be displayed in JSON);
45
    * "oxm_field" attribute (``OxmOfbMatchField`` enum);
46
    * Method to return a pyof OxmTLV;
47
    * Method to create an instance from an OxmTLV.
48
    """
49
50 1
    def __init__(self, value):
51
        """Define match field value."""
52
        self.value = value
53
54 1
    @property
55 1
    @classmethod
56
    @abstractmethod
57
    def name(cls):
58
        """Define a name to be displayed in JSON.
59 1
60
        It can be overriden just by a class attibute.
61
        """
62
63
    @property
64
    @classmethod
65
    @abstractmethod
66
    def oxm_field(cls):
67
        """Define this subclass ``OxmOfbMatchField`` value.
68 1
69
        It can be overriden just by as a class attibute.
70
        """
71 1
72 1
    @abstractmethod
73
    def as_of_tlv(self):
74 1
        """Return a pyof OXM TLV instance."""
75
76
    @classmethod
77
    @abstractmethod
78
    def from_of_tlv(cls, tlv):
79
        """Return an instance from a pyof OXM TLV."""
80 1
81
    def __eq__(self, other):
82
        """Two objects are equal if their values are the same.
83
84
        The oxm_field equality is checked indirectly when comparing whether
85
        the objects are instances of the same class.
86
        """
87 1
        return isinstance(other, self.__class__) and other.value == self.value
88
89
90 1
class MatchDLVLAN(MatchField):
91 1
    """Match for datalink VLAN ID."""
92
93 1
    name = 'dl_vlan'
94
    oxm_field = OxmOfbMatchField.OFPXMT_OFB_VLAN_VID
95
96
    def as_of_tlv(self):
97
        """Return a pyof OXM TLV instance."""
98 1
        try:
99
            value = int(self.value)
100
            mask = None
101
            oxm_hasmask = False
102
        except ValueError:
103
            value, mask = map(int, self.value.split('/'))
104
            oxm_hasmask = True
105 1
        value = value | VlanId.OFPVID_PRESENT
106
        value_bytes = value.to_bytes(2, 'big')
107
        if mask:
108 1
            mask = mask | VlanId.OFPVID_PRESENT
109 1
            value_bytes += mask.to_bytes(2, 'big')
110
        return OxmTLV(oxm_field=self.oxm_field, oxm_hasmask=oxm_hasmask, 
111 1
                        oxm_value=value_bytes)
112
113
    @classmethod
114
    def from_of_tlv(cls, tlv):
115
        """Return an instance from a pyof OXM TLV."""
116 1
        vlan_id = int.from_bytes(tlv.oxm_value[:2], 'big') & 4095
117
        value = vlan_id
118
        if tlv.oxm_hasmask:
119
            vlan_mask = int.from_bytes(tlv.oxm_value[2:], 'big') & 4095
120
            value = f'{vlan_id}/{vlan_mask}'
121
        return cls(value)
122
123
124
class MatchDLVLANPCP(MatchField):
125 1
    """Match for VLAN Priority Code Point."""
126
127
    name = 'dl_vlan_pcp'
128 1
    oxm_field = OxmOfbMatchField.OFPXMT_OFB_VLAN_PCP
129 1
130
    def as_of_tlv(self):
131 1
        """Return a pyof OXM TLV instance."""
132
        value_bytes = self.value.to_bytes(1, 'big')
133
        return OxmTLV(oxm_field=self.oxm_field, oxm_value=value_bytes)
134
135
    @classmethod
136 1
    def from_of_tlv(cls, tlv):
137
        """Return an instance from a pyof OXM TLV."""
138
        priority = int.from_bytes(tlv.oxm_value, 'big')
139
        return cls(priority)
140
141
142 View Code Duplication
class MatchDLSrc(MatchField):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
143
    """Match for datalink source."""
144
145 1
    name = 'dl_src'
146
    oxm_field = OxmOfbMatchField.OFPXMT_OFB_ETH_SRC
147
148 1
    def as_of_tlv(self):
149 1
        """Return a pyof OXM TLV instance."""
150
        if '/' in self.value:
151 1
            value, mask = self.value.split('/')
152
            if mask.upper() == 'FF:FF:FF:FF:FF:FF':
153
                mask = None
154
                oxm_hasmask = False
155
            else:
156 1
                mask = mask.upper()
157
                oxm_hasmask = True
158
        else:
159
            value = self.value
160
            mask = None
161
            oxm_hasmask = False
162
        value_bytes = HWAddress(value).pack()
163 1
        if mask:
164
            value_bytes += HWAddress(mask).pack()
165
        return OxmTLV(oxm_field=self.oxm_field, oxm_hasmask=oxm_hasmask, 
166 1
                        oxm_value=value_bytes)
167 1
168
    @classmethod
169 1
    def from_of_tlv(cls, tlv):
170
        """Return an instance from a pyof OXM TLV."""
171
        hw_address = HWAddress()
172
        hw_address.unpack(tlv.oxm_value)
173
        addr_str = str(hw_address)
174 1
        value = addr_str
175
        if tlv.oxm_hasmask:
176
            hw_mask = HWAddress()
177
            hw_mask.unpack(tlv.oxm_value[6:])
178
            mask_str = str(hw_mask)
179
            value = f'{addr_str}/{mask_str}'
180
        return cls(value)
181
182
183 1 View Code Duplication
class MatchDLDst(MatchField):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
184
    """Match for datalink destination."""
185
186 1
    name = 'dl_dst'
187 1
    oxm_field = OxmOfbMatchField.OFPXMT_OFB_ETH_DST
188
189 1
    def as_of_tlv(self):
190
        """Return a pyof OXM TLV instance."""
191
        if '/' in self.value:
192
            value, mask = self.value.split('/')
193
            if mask.upper() == 'FF:FF:FF:FF:FF:FF':
194 1
                mask = None
195
                oxm_hasmask = False
196
            else:
197
                mask = mask.upper()
198
                oxm_hasmask = True
199
        else:
200
            value = self.value
201
            mask = None
202
            oxm_hasmask = False
203 1
        value_bytes = HWAddress(value).pack()
204
        if mask:
205
            value_bytes += HWAddress(mask).pack()
206 1
        return OxmTLV(oxm_field=self.oxm_field, oxm_hasmask=oxm_hasmask, 
207 1
                        oxm_value=value_bytes)
208
209 1
    @classmethod
210
    def from_of_tlv(cls, tlv):
211
        """Return an instance from a pyof OXM TLV."""
212
        hw_address = HWAddress()
213
        hw_address.unpack(tlv.oxm_value)
214 1
        addr_str = str(hw_address)
215
        value = addr_str
216
        if tlv.oxm_hasmask:
217
            hw_mask = HWAddress()
218
            hw_mask.unpack(tlv.oxm_value[6:])
219
            mask_str = str(hw_mask)
220
            value = f'{addr_str}/{mask_str}'
221 1
        return cls(value)
222
223
224 1
class MatchDLType(MatchField):
225 1
    """Match for datalink type."""
226
227 1
    name = 'dl_type'
228
    oxm_field = OxmOfbMatchField.OFPXMT_OFB_ETH_TYPE
229
230
    def as_of_tlv(self):
231
        """Return a pyof OXM TLV instance."""
232 1
        value_bytes = self.value.to_bytes(2, 'big')
233
        return OxmTLV(oxm_field=self.oxm_field, oxm_value=value_bytes)
234
235
    @classmethod
236
    def from_of_tlv(cls, tlv):
237
        """Return an instance from a pyof OXM TLV."""
238
        port = int.from_bytes(tlv.oxm_value, 'big')
239 1
        return cls(port)
240
241
242 1 View Code Duplication
class MatchNwSrc(MatchField):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
243 1
    """Match for IPV4 source."""
244
245 1
    name = 'nw_src'
246
    oxm_field = OxmOfbMatchField.OFPXMT_OFB_IPV4_SRC
247
248
    def as_of_tlv(self):
249
        """Return a pyof OXM TLV instance."""
250 1
        ip = IPAddress(self.value)
251
        value_bytes = ip.pack()
252
        if ip.netmask < 32:
253
            value_bytes += mask_to_bytes(ip.netmask, 32)
254
        return OxmTLV(oxm_field=self.oxm_field, oxm_hasmask = ip.netmask < 32, oxm_value=value_bytes)
255
256
    @classmethod
257 1
    def from_of_tlv(cls, tlv):
258
        """Return an instance from a pyof OXM TLV."""
259
        ip_address = IPAddress()
260 1
        ip_address.unpack(tlv.oxm_value)
261 1
        value = str(ip_address)
262
        if tlv.oxm_hasmask:
263 1
            value = f'{value}/{bytes_to_mask(tlv.oxm_value[4:], 32)}'
264
        return cls(value)
265
266 View Code Duplication
class MatchNwDst(MatchField):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
267
    """Match for IPV4 destination."""
268 1
269
    name = 'nw_dst'
270
    oxm_field = OxmOfbMatchField.OFPXMT_OFB_IPV4_DST
271
272
    def as_of_tlv(self):
273
        """Return a pyof OXM TLV instance."""
274
        ip = IPAddress(self.value)
275 1
        value_bytes = ip.pack()
276
        if ip.netmask < 32:
277
            value_bytes += mask_to_bytes(ip.netmask, 32)
278
        return OxmTLV(oxm_field=self.oxm_field, oxm_hasmask = ip.netmask < 32, oxm_value=value_bytes)
279
280
    @classmethod
281
    def from_of_tlv(cls, tlv):
282
        """Return an instance from a pyof OXM TLV."""
283 1
        ip_address = IPAddress()
284
        ip_address.unpack(tlv.oxm_value)
285 1
        value = str(ip_address)
286
        if tlv.oxm_hasmask:
287
            value = f'{value}/{bytes_to_mask(tlv.oxm_value[4:], 32)}'
288
        return cls(value)
289
290
291
class MatchNwProto(MatchField):
292
    """Match for IP protocol."""
293 1
294
    name = 'nw_proto'
295
    oxm_field = OxmOfbMatchField.OFPXMT_OFB_IP_PROTO
296
297
    def as_of_tlv(self):
298
        """Return a pyof OXM TLV instance."""
299
        value_bytes = self.value.to_bytes(1, 'big')
300
        return OxmTLV(oxm_field=self.oxm_field, oxm_value=value_bytes)
301 1
302
    @classmethod
303
    def from_of_tlv(cls, tlv):
304
        """Return an instance from a pyof OXM TLV."""
305
        priority = int.from_bytes(tlv.oxm_value, 'big')
306
        return cls(priority)
307
308 1
309
class MatchInPort(MatchField):
310
    """Match for input port."""
311
312
    name = 'in_port'
313
    oxm_field = OxmOfbMatchField.OFPXMT_OFB_IN_PORT
314
315
    def as_of_tlv(self):
316
        """Return a pyof OXM TLV instance."""
317
        value_bytes = self.value.to_bytes(4, 'big')
318
        return OxmTLV(oxm_field=self.oxm_field, oxm_value=value_bytes)
319
320
    @classmethod
321
    def from_of_tlv(cls, tlv):
322
        """Return an instance from a pyof OXM TLV."""
323
        port = int.from_bytes(tlv.oxm_value, 'big')
324
        return cls(port)
325
326
327
class MatchTCPSrc(MatchField):
328
    """Match for TCP source."""
329
330
    name = 'tp_src'
331
    oxm_field = OxmOfbMatchField.OFPXMT_OFB_TCP_SRC
332
333
    def as_of_tlv(self):
334
        """Return a pyof OXM TLV instance."""
335
        value_bytes = self.value.to_bytes(2, 'big')
336
        return OxmTLV(oxm_field=self.oxm_field, oxm_value=value_bytes)
337
338
    @classmethod
339
    def from_of_tlv(cls, tlv):
340
        """Return an instance from a pyof OXM TLV."""
341
        port = int.from_bytes(tlv.oxm_value, 'big')
342
        return cls(port)
343
344
345
class MatchTCPDst(MatchField):
346
    """Match for TCP destination."""
347
348
    name = 'tp_dst'
349
    oxm_field = OxmOfbMatchField.OFPXMT_OFB_TCP_DST
350
351
    def as_of_tlv(self):
352
        """Return a pyof OXM TLV instance."""
353
        value_bytes = self.value.to_bytes(2, 'big')
354
        return OxmTLV(oxm_field=self.oxm_field, oxm_value=value_bytes)
355
356
    @classmethod
357
    def from_of_tlv(cls, tlv):
358
        """Return an instance from a pyof OXM TLV."""
359
        port = int.from_bytes(tlv.oxm_value, 'big')
360
        return cls(port)
361
362
363
class MatchFieldFactory(ABC):
364
    """Create the correct MatchField subclass instance.
365
366
    As OF 1.3 has many match fields and there are many ways to (un)pack their
367
    OxmTLV.oxm_value, this class does all the work of finding the correct
368
    MatchField class and instantiating the corresponding object.
369
    """
370
371
    __classes = {}
372
373
    @classmethod
374
    def from_name(cls, name, value):
375
        """Return the proper object from name and value."""
376
        field_class = cls._get_class(name)
377
        if field_class:
378
            return field_class(value)
379
        return None
380
381
    @classmethod
382
    def from_of_tlv(cls, tlv):
383
        """Return the proper object from a pyof OXM TLV."""
384
        field_class = cls._get_class(tlv.oxm_field)
385
        if field_class:
386
            return field_class.from_of_tlv(tlv)
387
        return None
388
389
    @classmethod
390
    def _get_class(cls, name_or_field):
391
        """Return the proper object from field name or OxmTLV.oxm_field."""
392
        if not cls.__classes:
393
            cls._index_classes()
394
        return cls.__classes.get(name_or_field)
395
396
    @classmethod
397
    def _index_classes(cls):
398
        for subclass in MatchField.__subclasses__():
399
            cls.__classes[subclass.name] = subclass
400
            cls.__classes[subclass.oxm_field] = subclass
401