Passed
Push — master ( 353561...cf72e6 )
by Humberto
05:58 queued 03:16
created

kytos.core.interface   F

Complexity

Total Complexity 89

Size/Duplication

Total Lines 481
Duplicated Lines 0 %

Test Coverage

Coverage 71.43%

Importance

Changes 0
Metric Value
eloc 229
dl 0
loc 481
ccs 145
cts 203
cp 0.7143
rs 2
c 0
b 0
f 0
wmc 89

40 Methods

Rating   Name   Duplication   Size   Complexity  
A TAG.__init__() 0 3 1
A Interface.delete_endpoint() 0 9 2
A Interface.add_endpoint() 0 9 2
A Interface.make_tag_available() 0 6 2
A Interface.get_endpoint() 0 14 3
A TAG.from_dict() 0 4 1
A TAG.__repr__() 0 2 1
A Interface.id() 0 9 1
A Interface.__init__() 0 32 1
A Interface.is_tag_available() 0 3 1
A Interface.set_available_tags() 0 12 2
B Interface.update_link() 0 24 7
A Interface.uni() 0 4 1
A Interface.__repr__() 0 2 1
A Interface.get_next_available_tag() 0 11 2
A Interface.__eq__() 0 8 3
A TAG.from_json() 0 4 1
A TAG.__eq__() 0 4 2
A Interface.enable() 0 7 1
A TAG.as_json() 0 3 1
A Interface.use_tag() 0 10 2
A Interface.update_endpoint() 0 10 2
A TAG.as_dict() 0 3 1
B Interface._get_v0x04_speed() 0 14 7
C Interface._get_v0x01_v0x04_speed() 0 13 9
A Interface._is_v0x04() 0 4 1
A UNI.__eq__() 0 4 1
A Interface.as_dict() 0 44 3
A Interface.as_json() 0 20 1
A Interface.set_custom_speed() 0 7 1
A NNI.__init__() 0 2 1
C Interface.speed() 0 37 9
A VNNI.__init__() 0 4 1
A Interface.get_hr_speed() 0 16 4
A UNI.is_valid() 0 5 2
A Interface.get_custom_speed() 0 3 1
A UNI.as_json() 0 3 1
A Interface.get_of_features_speed() 0 17 3
A UNI.__init__() 0 3 1
A UNI.as_dict() 0 5 2

How to fix   Complexity   

Complexity

Complex classes like kytos.core.interface often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""Module with main classes related to Interfaces."""
2 1
import json
3 1
import logging
4 1
from enum import IntEnum
5
6 1
from pyof.v0x01.common.phy_port import Port as PortNo01
7 1
from pyof.v0x01.common.phy_port import PortFeatures as PortFeatures01
8 1
from pyof.v0x04.common.port import PortFeatures as PortFeatures04
9 1
from pyof.v0x04.common.port import PortNo as PortNo04
10
11 1
from kytos.core.common import GenericEntity
12 1
from kytos.core.helpers import now
13
14 1
__all__ = ('Interface',)
15
16 1
LOG = logging.getLogger(__name__)
17
18
19 1
class TAGType(IntEnum):
20
    """Class that represents a TAG Type."""
21
22 1
    VLAN = 1
23 1
    VLAN_QINQ = 2
24 1
    MPLS = 3
25
26
27 1
class TAG:
28
    """Class that represents a TAG."""
29
30 1
    def __init__(self, tag_type, value):
31 1
        self.tag_type = TAGType(tag_type)
32 1
        self.value = value
33
34 1
    def __eq__(self, other):
35 1
        if not other:
36
            return False
37 1
        return self.tag_type == other.tag_type and self.value == other.value
38
39 1
    def as_dict(self):
40
        """Return a dictionary representating a tag object."""
41
        return {'tag_type': self.tag_type, 'value': self.value}
42
43 1
    @classmethod
44
    def from_dict(cls, tag_dict):
45
        """Return a TAG instance from python dictionary."""
46
        return cls(tag_dict.get('tag_type'), tag_dict.get('value'))
47
48 1
    @classmethod
49
    def from_json(cls, tag_json):
50
        """Return a TAG instance from json."""
51
        return cls.from_dict(json.loads(tag_json))
52
53 1
    def as_json(self):
54
        """Return a json representating a tag object."""
55
        return json.dumps(self.as_dict())
56
57 1
    def __repr__(self):
58
        return f"TAG({self.tag_type!r}, {self.value!r})"
59
60
61 1
class Interface(GenericEntity):  # pylint: disable=too-many-instance-attributes
62
    """Interface Class used to abstract the network interfaces."""
63
64
    # pylint: disable=too-many-arguments
65 1
    def __init__(self, name, port_number, switch, address=None, state=None,
66
                 features=None, speed=None):
67
        """Assign the parameters to instance attributes.
68
69
        Args:
70
            name (string): name from this interface.
71
            port_number (int): port number from this interface.
72
            switch (:class:`~.core.switch.Switch`): Switch with this interface.
73
            address (|hw_address|): Port address from this interface.
74
            state (|port_stats|): Port Stat from interface. It will be
75
            deprecated.
76
            features (|port_features|): Port feature used to calculate link
77
                utilization from this interface. It will be deprecated.
78
            speed (int, float): Interface speed in bytes per second. Defaults
79
                to what is informed by the switch. Return ``None`` if not set
80
                and switch does not inform the speed.
81
        """
82 1
        self.name = name
83 1
        self.port_number = int(port_number)
84 1
        self.switch = switch
85 1
        self.address = address
86 1
        self.state = state
87 1
        self.features = features
88 1
        self.nni = False
89 1
        self.endpoints = []
90 1
        self.stats = None
91 1
        self.link = None
92 1
        self.lldp = True
93 1
        self._custom_speed = speed
94 1
        self.set_available_tags(range(1, 4096))
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable range does not seem to be defined.
Loading history...
95
96 1
        super().__init__()
97
98 1
    def __repr__(self):
99 1
        return f"Interface('{self.name}', {self.port_number}, {self.switch!r})"
100
101 1
    def __eq__(self, other):
102
        """Compare Interface class with another instance."""
103 1
        if isinstance(other, str):
104
            return self.address == other
105 1
        if isinstance(other, Interface):
106 1
            return self.port_number == other.port_number and \
107
                self.switch.dpid == other.switch.dpid
108
        return False
109
110 1
    @property
111
    def id(self):  # pylint: disable=invalid-name
112
        """Return id from Interface instance.
113
114
        Returns:
115
            string: Interface id.
116
117
        """
118
        return "{}:{}".format(self.switch.dpid, self.port_number)
119
120 1
    @property
121
    def uni(self):
122
        """Return if an interface is a user-to-network Interface."""
123
        return not self.nni
124
125 1
    def set_available_tags(self, iterable):
126
        """Set a range of VLAN tags to be used by this Interface.
127
128
        Args:
129
            iterable ([int]): range of VLANs.
130
        """
131 1
        self.available_tags = []
132
133 1
        for i in iterable:
134 1
            vlan = TAGType.VLAN
135 1
            tag = TAG(vlan, i)
136 1
            self.available_tags.append(tag)
137
138 1
    def enable(self):
139
        """Enable this interface instance.
140
141
        Also enable the switch instance this interface is attached to.
142
        """
143
        self.switch.enable()
144
        self._enabled = True
145
146 1
    def use_tag(self, tag):
147
        """Remove a specific tag from available_tags if it is there.
148
149
        Return False in case the tag is already removed.
150
        """
151 1
        try:
152 1
            self.available_tags.remove(tag)
153 1
        except ValueError:
154 1
            return False
155 1
        return True
156
157 1
    def is_tag_available(self, tag):
158
        """Check if a tag is available."""
159 1
        return tag in self.available_tags
160
161 1
    def get_next_available_tag(self):
162
        """Get the next available tag from the interface.
163
164
        Return the next available tag if exists and remove from the
165
        available tags.
166
        If no tag is available return False.
167
        """
168 1
        try:
169 1
            return self.available_tags.pop()
170 1
        except IndexError:
171 1
            return False
172
173 1
    def make_tag_available(self, tag):
174
        """Add a specific tag in available_tags."""
175 1
        if not self.is_tag_available(tag):
176 1
            self.available_tags.append(tag)
177 1
            return True
178
        return False
179
180 1
    def get_endpoint(self, endpoint):
181
        """Return a tuple with existent endpoint, None otherwise.
182
183
        Args:
184
            endpoint(|hw_address|, :class:`.Interface`): endpoint instance.
185
186
        Returns:
187
            tuple: A tuple with endpoint and time of last update.
188
189
        """
190
        for item in self.endpoints:
191
            if endpoint == item[0]:
192
                return item
193
        return None
194
195 1
    def add_endpoint(self, endpoint):
196
        """Create a new endpoint to Interface instance.
197
198
        Args:
199
            endpoint(|hw_address|, :class:`.Interface`): A target endpoint.
200
        """
201
        exists = self.get_endpoint(endpoint)
202
        if not exists:
203
            self.endpoints.append((endpoint, now()))
204
205 1
    def delete_endpoint(self, endpoint):
206
        """Delete a existent endpoint in Interface instance.
207
208
        Args:
209
            endpoint (|hw_address|, :class:`.Interface`): A target endpoint.
210
        """
211
        exists = self.get_endpoint(endpoint)
212
        if exists:
213
            self.endpoints.remove(exists)
214
215 1
    def update_endpoint(self, endpoint):
216
        """Update or create new endpoint to Interface instance.
217
218
        Args:
219
            endpoint(|hw_address|, :class:`.Interface`): A target endpoint.
220
        """
221
        exists = self.get_endpoint(endpoint)
222
        if exists:
223
            self.delete_endpoint(endpoint)
224
        self.add_endpoint(endpoint)
225
226 1
    def update_link(self, link):
227
        """Update link for this interface in a consistent way.
228
229
        Verify of the other endpoint of the link has the same Link information
230
        attached to it, and change it if necessary.
231
232
        Warning: This method can potentially change information of other
233
        Interface instances. Use it with caution.
234
        """
235
        if self not in (link.endpoint_a, link.endpoint_b):
236
            return False
237
238
        if self.link is None or self.link != link:
239
            self.link = link
240
241
        if link.endpoint_a == self:
242
            endpoint = link.endpoint_b
243
        else:
244
            endpoint = link.endpoint_a
245
246
        if endpoint.link is None or endpoint.link != link:
247
            endpoint.link = link
248
249
        return True
250
251 1
    @property
252
    def speed(self):
253
        """Return the link speed in bytes per second, None otherwise.
254
255
        If the switch was disconnected, we have :attr:`features` and speed is
256
        still returned for common values between v0x01 and v0x04. For specific
257
        v0x04 values (40 Gbps, 100 Gbps and 1 Tbps), the connection must be
258
        active so we can make sure the protocol version is v0x04.
259
260
        Returns:
261
            int, None: Link speed in bytes per second or ``None``.
262
263
        """
264 1
        speed = self.get_of_features_speed()
265
266 1
        if speed is not None:
267 1
            return speed
268
269 1
        if self._custom_speed is not None:
270
            return self._custom_speed
271
272 1
        if self._is_v0x04() and self.port_number == PortNo04.OFPP_LOCAL:
273
            return 0
274
275 1
        if not self._is_v0x04() and self.port_number == PortNo01.OFPP_LOCAL:
276
            return 0
277
278
        # Warn unknown speed
279
        # Use shorter switch ID with its beginning and end
280 1
        if isinstance(self.switch.id, str) and len(self.switch.id) > 20:
281
            switch_id = self.switch.id[:3] + '...' + self.switch.id[-3:]
282
        else:
283 1
            switch_id = self.switch.id
284 1
        LOG.warning("Couldn't get port %s speed, sw %s, feats %s",
285
                    self.port_number, switch_id, self.features)
286
287 1
        return None
288
289 1
    def set_custom_speed(self, bytes_per_second):
290
        """Set a speed that overrides switch OpenFlow information.
291
292
        If ``None`` is given, :attr:`speed` becomes the one given by the
293
        switch.
294
        """
295 1
        self._custom_speed = bytes_per_second
296
297 1
    def get_custom_speed(self):
298
        """Return custom speed or ``None`` if not set."""
299
        return self._custom_speed
300
301 1
    def get_of_features_speed(self):
302
        """Return the link speed in bytes per second, None otherwise.
303
304
        If the switch was disconnected, we have :attr:`features` and speed is
305
        still returned for common values between v0x01 and v0x04. For specific
306
        v0x04 values (40 Gbps, 100 Gbps and 1 Tbps), the connection must be
307
        active so we can make sure the protocol version is v0x04.
308
309
        Returns:
310
            int, None: Link speed in bytes per second or ``None``.
311
312
        """
313 1
        speed = self._get_v0x01_v0x04_speed()
314
        # Don't use switch.is_connected() because we can have the protocol
315 1
        if speed is None and self._is_v0x04():
316 1
            speed = self._get_v0x04_speed()
317 1
        return speed
318
319 1
    def _is_v0x04(self):
320
        """Whether the switch is connected using OpenFlow 1.3."""
321 1
        return self.switch.is_connected() and \
322
            self.switch.connection.protocol.version == 0x04
323
324 1
    def _get_v0x01_v0x04_speed(self):
325
        """Check against all values of v0x01. They're part of v0x04."""
326 1
        fts = self.features
327 1
        pfts = PortFeatures01
328 1
        if fts and fts & pfts.OFPPF_10GB_FD:
329 1
            return 10 * 10**9 / 8
330 1
        if fts and fts & (pfts.OFPPF_1GB_HD | pfts.OFPPF_1GB_FD):
331 1
            return 10**9 / 8
332 1
        if fts and fts & (pfts.OFPPF_100MB_HD | pfts.OFPPF_100MB_FD):
333 1
            return 100 * 10**6 / 8
334 1
        if fts and fts & (pfts.OFPPF_10MB_HD | pfts.OFPPF_10MB_FD):
335 1
            return 10 * 10**6 / 8
336 1
        return None
337
338 1
    def _get_v0x04_speed(self):
339
        """Check against higher enums of v0x04.
340
341
        Must be called after :meth:`get_v0x01_speed` returns ``None``.
342
        """
343 1
        fts = self.features
344 1
        pfts = PortFeatures04
345 1
        if fts and fts & pfts.OFPPF_1TB_FD:
346 1
            return 10**12 / 8
347 1
        if fts and fts & pfts.OFPPF_100GB_FD:
348 1
            return 100 * 10**9 / 8
349 1
        if fts and fts & pfts.OFPPF_40GB_FD:
350 1
            return 40 * 10**9 / 8
351 1
        return None
352
353 1
    def get_hr_speed(self):
354
        """Return Human-Readable string for link speed.
355
356
        Returns:
357
            string: String with link speed. e.g: '350 Gbps' or '350 Mbps'.
358
359
        """
360 1
        speed = self.speed
361 1
        if speed is None:
362 1
            return ''
363 1
        speed *= 8
364 1
        if speed == 10**12:
365 1
            return '1 Tbps'
366 1
        if speed >= 10**9:
367 1
            return '{} Gbps'.format(round(speed / 10**9))
368 1
        return '{} Mbps'.format(round(speed / 10**6))
369
370 1
    def as_dict(self):
371
        """Return a dictionary with Interface attributes.
372
373
        Speed is in bytes/sec. Example of output (100 Gbps):
374
375
        .. code-block:: python3
376
377
            {'id': '00:00:00:00:00:00:00:01:2',
378
             'name': 'eth01',
379
             'port_number': 2,
380
             'mac': '00:7e:04:3b:c2:a6',
381
             'switch': '00:00:00:00:00:00:00:01',
382
             'type': 'interface',
383
             'nni': False,
384
             'uni': True,
385
             'speed': 12500000000,
386
             'metadata': {},
387
             'lldp': True,
388
             'active': True,
389
             'enabled': False,
390
             'link': ""
391
            }
392
393
        Returns:
394
            dict: Dictionary filled with interface attributes.
395
396
        """
397
        iface_dict = {'id': self.id,
398
                      'name': self.name,
399
                      'port_number': self.port_number,
400
                      'mac': self.address,
401
                      'switch': self.switch.dpid,
402
                      'type': 'interface',
403
                      'nni': self.nni,
404
                      'uni': self.uni,
405
                      'speed': self.speed,
406
                      'metadata': self.metadata,
407
                      'lldp': self.lldp,
408
                      'active': self.is_active(),
409
                      'enabled': self.is_enabled(),
410
                      'link': self.link.id if self.link else ""}
411
        if self.stats:
412
            iface_dict['stats'] = self.stats.as_dict()
413
        return iface_dict
414
415 1
    def as_json(self):
416
        """Return a json with Interfaces attributes.
417
418
        Example of output:
419
420
        .. code-block:: json
421
422
            {"mac": "00:7e:04:3b:c2:a6",
423
             "switch": "00:00:00:00:00:00:00:01",
424
             "type": "interface",
425
             "name": "eth01",
426
             "id": "00:00:00:00:00:00:00:01:2",
427
             "port_number": 2,
428
             "speed": "350 Mbps"}
429
430
        Returns:
431
            string: Json filled with interface attributes.
432
433
        """
434
        return json.dumps(self.as_dict())
435
436
437 1
class UNI:
438
    """Class that represents an User-to-Network Interface."""
439
440 1
    def __init__(self, interface, user_tag=None):
441
        self.user_tag = user_tag
442
        self.interface = interface
443
444 1
    def __eq__(self, other):
445
        """Override the default implementation."""
446
        return (self.user_tag == other.user_tag and
447
                self.interface == other.interface)
448
449 1
    def is_valid(self):
450
        """Check if TAG is possible for this interface TAG pool."""
451
        if self.user_tag:
452
            return self.interface.is_tag_available(self.user_tag)
453
        return True
454
455 1
    def as_dict(self):
456
        """Return a dict representating a UNI object."""
457
        return {
458
            'interface_id': self.interface.id,
459
            'tag': self.user_tag.as_dict() if self.user_tag else None
460
            }
461
462 1
    def as_json(self):
463
        """Return a json representating a UNI object."""
464
        return json.dumps(self.as_dict())
465
466
467 1
class NNI:
468
    """Class that represents an Network-to-Network Interface."""
469
470 1
    def __init__(self, interface):
471
        self.interface = interface
472
473
474 1
class VNNI(NNI):
475
    """Class that represents an Virtual Network-to-Network Interface."""
476
477 1
    def __init__(self, service_tag, *args, **kwargs):
478
        self.service_tag = service_tag
479
480
        super().__init__(*args, **kwargs)
481