Test Failed
Pull Request — master (#49)
by
unknown
03:08
created

build.utils.format_result()   A

Complexity

Conditions 5

Size

Total Lines 14
Code Lines 13

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 10
CRAP Score 5

Importance

Changes 0
Metric Value
cc 5
eloc 13
nop 1
dl 0
loc 14
rs 9.2833
c 0
b 0
f 0
ccs 10
cts 10
cp 1
crap 5
1
"""Utility functions to be used in this Napp"""
2
3 1
import requests
4 1
from kytos.core import KytosEvent
5
from napps.amlight.sdntrace_cp import settings
6
7 1
8
def get_stored_flows(dpids: list = None, state: str = None):
9
    """Get stored flows from flow_manager napps."""
10
    api_url = f'{settings.FLOW_MANAGER_URL}/stored_flows'
11
    if dpids:
12
        str_dpids = ''
13
        for dpid in dpids:
14 1
            str_dpids += f'&dpid={dpid}'
15 1
        api_url += '/?'+str_dpids[1:]
16 1
    if state:
17 1
        char = '&' if dpids else '/?'
18 1
        api_url += char+f'state={state}'
19 1
    result = requests.get(api_url)
20 1
    flows_from_manager = result.json()
21
    return flows_from_manager
22
23 1
24
def convert_entries(entries):
25
    """ Transform entries dictionary in a plain dictionary suitable for
26
        matching
27
28
    :param entries: dict
29 1
    :return: plain dict
30
    """
31
    new_entries = {}
32 1
    for entry in entries['trace'].values():
33
        for field, value in entry.items():
34
            new_entries[field] = value
35
    if 'dl_vlan' in new_entries:
36 1
        new_entries['dl_vlan'] = [new_entries['dl_vlan']]
37 1
    return new_entries
38 1
39 1
40 1
def convert_list_entries(entries):
41 1
    """ Transform a list of entries dictionary in a list
42
    of plain dictionary suitable for matching
43
    :param entries: list(dict)
44 1
    :return: list(plain dict)
45
    """
46 1
    return [convert_entries(entry) for entry in entries]
47 1
48 1
49 1
def find_endpoint(switch, port):
50
    """ Find where switch/port is connected. If it is another switch,
51
    returns the interface it is connected to, otherwise returns None """
52 1
53
    interface = switch.get_interface_by_port_no(port)
54 1
    if interface.link:
55
        if interface == interface.link.endpoint_a:
56
            return interface.link.endpoint_b
57 1
        return interface.link.endpoint_a
58
    return None
59 1
60 1
61 1
def prepare_list_json(trace_result):
62
    """Prepare return list of json for REST call."""
63 1
    result = []
64 1
    for trace_step in trace_result:
65 1
        result.append(trace_step['in'])
66 1
    return result
67 1
68 1
69 1
def prepare_json(trace_result):
70 1
    """Prepare return json for REST call."""
71
    return {'result': prepare_list_json(trace_result)}
72
73 1
74
def format_result(trace):
75 1
    """Format the result for automate circuit finding"""
76 1
    result = []
77 1
    for step in trace:
78
        new_result = {'dpid': step['in']['dpid'],
79
                      'in_port': step['in']['port']}
80
        if 'out' in step:
81 1
            new_result.update({'out_port': step['out']['port']})
82 1
            if 'vlan' in step['out']:
83 1
                new_result.update({'out_vlan': step['out']['vlan']})
84 1
        if 'vlan' in step['in']:
85 1
            new_result.update({'in_vlan': step['in']['vlan']})
86 1
        result.append(new_result)
87 1
    return result
88 1
89 1
90 1
def clean_circuits(circuits, controller):
91 1
    """Remove sub-circuits."""
92 1
    cleaned_circuits = []
93 1
    event = KytosEvent(name='amlight/kytos_courier.slack_send')
94 1
    content = {
95
        'channel': settings.SLACK_CHANNEL,
96 1
        'source': 'amlight/sdntrace_cp'
97 1
    }
98 1
    for circuit in circuits:
99 1
        sub = False
100
        for other in circuits:
101
            if circuit['circuit'] == other['circuit']:
102
                continue
103
            sub = True
104 1
            for step in circuit['circuit']:
105 1
                if step not in other['circuit']:
106 1
                    sub = False
107 1
                    break
108 1
            if sub:
109
                break
110
        if not sub:
111
            cleaned_circuits.append(circuit)
112 1
113 1
    for circuit in cleaned_circuits:
114 1
        has_return = False
115 1
        for other in cleaned_circuits:
116
            if _compare_endpoints(circuit['circuit'][0],
117
                                  other['circuit'][-1]) \
118
                    and _compare_endpoints(circuit['circuit'][-1],
119
                                           other['circuit'][0]):
120 1
                has_return = True
121 1
        if not has_return:
122 1
            content['m_body'] = f"Circuit {circuit['circuit']} has no way back"
123 1
            event.content['message'] = content
124 1
            controller.buffers.app.put(event)
125 1
    return cleaned_circuits
126 1
127 1
128 1
# pylint: disable=too-many-return-statements
129 1
def _compare_endpoints(endpoint1, endpoint2):
130 1
    if endpoint1['dpid'] != endpoint2['dpid']:
131 1
        return False
132
    if (
133
        'in_port' not in endpoint1
134
        or 'out_port' not in endpoint2
135
        or endpoint1['in_port'] != endpoint2['out_port']
136
    ):
137
        return False
138
    if 'in_vlan' in endpoint1 and 'out_vlan' in endpoint2:
139
        if endpoint1['in_vlan'] != endpoint2['out_vlan']:
140
            return False
141
    elif 'in_vlan' in endpoint1 or 'out_vlan' in endpoint2:
142
        return False
143
    if 'out_vlan' in endpoint1 and 'in_vlan' in endpoint2:
144
        if endpoint1['out_vlan'] != endpoint2['in_vlan']:
145
            return False
146
    elif 'out_vlan' in endpoint1 or 'in_vlan' in endpoint2:
147
        return False
148
    return True
149