Passed
Pull Request — master (#95)
by
unknown
08:32 queued 05:44
created

build.utils.clean_circuits()   D

Complexity

Conditions 13

Size

Total Lines 36
Code Lines 33

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 27
CRAP Score 13.0076

Importance

Changes 0
Metric Value
cc 13
eloc 33
nop 2
dl 0
loc 36
rs 4.2
c 0
b 0
f 0
ccs 27
cts 28
cp 0.9643
crap 13.0076

2 Methods

Rating   Name   Duplication   Size   Complexity  
A build.utils.convert_vlan() 0 6 2
A build.utils.match_field_dl_vlan() 0 8 2

How to fix   Complexity   

Complexity

Complex classes like build.utils.clean_circuits() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""Utility functions to be used in this Napp"""
2
3 1
import requests
4 1
from kytos.core.retry import before_sleep
5 1
from napps.amlight.sdntrace_cp import settings
6 1
from requests.exceptions import Timeout
7 1
from tenacity import (retry, retry_if_exception_type, stop_after_attempt,
8
                      wait_random)
9
10
11 1
@retry(
12
    stop=stop_after_attempt(3),
13
    wait=wait_random(min=0.1, max=0.2),
14
    before_sleep=before_sleep,
15
    retry=retry_if_exception_type((Timeout, ConnectionError)))
16 1
def get_stored_flows(dpids: list = None, state: str = "installed"):
17
    """Get stored flows from flow_manager napps."""
18 1
    api_url = f'{settings.FLOW_MANAGER_URL}/stored_flows'
19 1
    if dpids:
20
        str_dpids = ''
21
        for dpid in dpids:
22
            str_dpids += f'&dpid={dpid}'
23
        api_url += '/?'+str_dpids[1:]
24 1
    if state:
25 1
        char = '&' if dpids else '/?'
26 1
        api_url += char+f'state={state}'
27 1
    result = requests.get(api_url, timeout=20)
28 1
    flows_from_manager = result.json()
29 1
    return flows_from_manager
30
31
32 1
def convert_entries(entries):
33
    """ Transform entries dictionary in a plain dictionary suitable for
34
        matching
35
36
    :param entries: dict
37
    :return: plain dict
38
    """
39 1
    new_entries = {}
40 1
    for entry in entries['trace'].values():
41 1
        for field, value in entry.items():
42 1
            new_entries[field] = value
43 1
    if 'dl_vlan' in new_entries:
44 1
        new_entries['dl_vlan'] = [new_entries['dl_vlan']]
45 1
    return new_entries
46
47
48 1
def convert_list_entries(entries):
49
    """ Transform a list of entries dictionary in a list
50
    of plain dictionary suitable for matching
51
    :param entries: list(dict)
52
    :return: list(plain dict)
53
    """
54 1
    new_entries = []
55 1
    for entry in entries:
56 1
        new_entry = convert_entries(entry)
57 1
        if new_entry:
58 1
            new_entries.append(new_entry)
59 1
    return new_entries
60
61
62 1
def find_endpoint(switch, port):
63
    """ Find where switch/port is connected. If it is another switch,
64
    returns the interface it is connected to, otherwise returns None """
65
66 1
    interface = switch.get_interface_by_port_no(port)
67 1
    if not interface:
68
        return None
69 1
    if interface and interface.link:
70 1
        if interface == interface.link.endpoint_a:
71 1
            return {'endpoint': interface.link.endpoint_b}
72 1
        return {'endpoint': interface.link.endpoint_a}
73 1
    return {'endpoint': None}
74
75
76 1
def _prepare_json(trace_result):
77
    """Auxiliar function to return the json for REST call."""
78 1
    result = []
79 1
    for trace_step in trace_result:
80 1
        result.append(trace_step['in'])
81 1
    if result:
82 1
        result[-1]["out"] = trace_result[-1].get("out")
83 1
    return result
84
85
86 1
def prepare_json(trace_result):
87
    """Prepare return json for REST call."""
88 1
    result = []
89 1
    if trace_result and isinstance(trace_result[0], list):
90 1
        for trace in trace_result:
91 1
            result.append(_prepare_json(trace))
92
    else:
93 1
        result = _prepare_json(trace_result)
94 1
    return {'result': result}
95
96
97
# pylint: disable=too-many-return-statements
98 1
def _compare_endpoints(endpoint1, endpoint2):
99 1
    if endpoint1['dpid'] != endpoint2['dpid']:
100 1
        return False
101 1
    if (
102
        'in_port' not in endpoint1
103
        or 'out_port' not in endpoint2
104
        or endpoint1['in_port'] != endpoint2['out_port']
105
    ):
106 1
        return False
107 1
    if 'in_vlan' in endpoint1 and 'out_vlan' in endpoint2:
108 1
        if endpoint1['in_vlan'] != endpoint2['out_vlan']:
109 1
            return False
110 1
    elif 'in_vlan' in endpoint1 or 'out_vlan' in endpoint2:
111 1
        return False
112 1
    if 'out_vlan' in endpoint1 and 'in_vlan' in endpoint2:
113 1
        if endpoint1['out_vlan'] != endpoint2['in_vlan']:
114 1
            return False
115 1
    elif 'out_vlan' in endpoint1 or 'in_vlan' in endpoint2:
116 1
        return False
117 1
    return True
118
119
120 1
def convert_vlan(value):
121
    """Auxiliar function to calculate dl_vlan"""
122 1
    if isinstance(value, int):
123 1
        return value, 4095
124 1
    value, mask = map(int, value.split('/'))
125 1
    return value, mask
126
127
128 1
def match_field_dl_vlan(value, field_flow):
129
    """ Verify match in dl_vlan.
130
    value only takes an int in range [1,4095].
131
    0 is not allowed for value. """
132 1
    if not value:
133 1
        return field_flow == 0
134 1
    value_flow, mask_flow = convert_vlan(field_flow)
135
    return value & (mask_flow & 4095) == value_flow & (mask_flow & 4095)
136