Passed
Pull Request — master (#75)
by Vinicius
03:13
created

build.utils._prepare_json()   A

Complexity

Conditions 3

Size

Total Lines 8
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 7
CRAP Score 3

Importance

Changes 0
Metric Value
cc 3
eloc 7
nop 1
dl 0
loc 8
rs 10
c 0
b 0
f 0
ccs 7
cts 7
cp 1
crap 3
1
"""Utility functions to be used in this Napp"""
2
3 1
import requests
4 1
from kytos.core import KytosEvent
5 1
from napps.amlight.sdntrace_cp import settings
6
7
8 1
def get_stored_flows(dpids: list = None, state: str = "installed"):
9
    """Get stored flows from flow_manager napps."""
10 1
    api_url = f'{settings.FLOW_MANAGER_URL}/stored_flows'
11 1
    if dpids:
12
        str_dpids = ''
13
        for dpid in dpids:
14
            str_dpids += f'&dpid={dpid}'
15
        api_url += '/?'+str_dpids[1:]
16 1
    if state:
17 1
        char = '&' if dpids else '/?'
18 1
        api_url += char+f'state={state}'
19 1
    result = requests.get(api_url)
20 1
    flows_from_manager = result.json()
21 1
    return flows_from_manager
22
23
24 1
def convert_entries(entries):
25
    """ Transform entries dictionary in a plain dictionary suitable for
26
        matching
27
28
    :param entries: dict
29
    :return: plain dict
30
    """
31 1
    new_entries = {}
32 1
    for entry in entries['trace'].values():
33 1
        for field, value in entry.items():
34 1
            new_entries[field] = value
35 1
    if 'dl_vlan' in new_entries:
36 1
        new_entries['dl_vlan'] = [new_entries['dl_vlan']]
37 1
    return new_entries
38
39
40 1
def convert_list_entries(entries):
41
    """ Transform a list of entries dictionary in a list
42
    of plain dictionary suitable for matching
43
    :param entries: list(dict)
44
    :return: list(plain dict)
45
    """
46 1
    return [convert_entries(entry) for entry in entries]
47
48
49 1
def find_endpoint(switch, port):
50
    """ Find where switch/port is connected. If it is another switch,
51
    returns the interface it is connected to, otherwise returns None """
52
53 1
    interface = switch.get_interface_by_port_no(port)
54 1
    if interface.link:
55 1
        if interface == interface.link.endpoint_a:
56 1
            return interface.link.endpoint_b
57 1
        return interface.link.endpoint_a
58 1
    return None
59
60
61 1
def _prepare_json(trace_result):
62
    """Auxiliar function to return the json for REST call."""
63 1
    result = []
64 1
    for trace_step in trace_result:
65 1
        result.append(trace_step['in'])
66 1
    if result:
67 1
        result[-1]["out"] = trace_result[-1].get("out")
68 1
    return result
69
70
71 1
def prepare_json(trace_result):
72
    """Prepare return json for REST call."""
73 1
    result = []
74 1
    if trace_result and isinstance(trace_result[0], list):
75 1
        for trace in trace_result:
76 1
            result.append(_prepare_json(trace))
77
    else:
78 1
        result = _prepare_json(trace_result)
79 1
    return {'result': result}
80
81
82 1
def format_result(trace):
83
    """Format the result for automate circuit finding"""
84 1
    result = []
85 1
    for step in trace:
86 1
        new_result = {'dpid': step['in']['dpid'],
87
                      'in_port': step['in']['port']}
88 1
        if 'out' in step:
89 1
            new_result.update({'out_port': step['out']['port']})
90 1
            if 'vlan' in step['out']:
91 1
                new_result.update({'out_vlan': step['out']['vlan']})
92 1
        if 'vlan' in step['in']:
93 1
            new_result.update({'in_vlan': step['in']['vlan']})
94 1
        result.append(new_result)
95 1
    return result
96
97
98 1
def clean_circuits(circuits, controller):
99
    """Remove sub-circuits."""
100 1
    cleaned_circuits = []
101 1
    event = KytosEvent(name='amlight/kytos_courier.slack_send')
102 1
    content = {
103
        'channel': settings.SLACK_CHANNEL,
104
        'source': 'amlight/sdntrace_cp'
105
    }
106 1
    for circuit in circuits:
107 1
        sub = False
108 1
        for other in circuits:
109 1
            if circuit['circuit'] == other['circuit']:
110 1
                continue
111 1
            sub = True
112 1
            for step in circuit['circuit']:
113 1
                if step not in other['circuit']:
114 1
                    sub = False
115 1
                    break
116 1
            if sub:
117 1
                break
118 1
        if not sub:
119 1
            cleaned_circuits.append(circuit)
120
121 1
    for circuit in cleaned_circuits:
122 1
        has_return = False
123 1
        for other in cleaned_circuits:
124 1
            if _compare_endpoints(circuit['circuit'][0],
125
                                  other['circuit'][-1]) \
126
                    and _compare_endpoints(circuit['circuit'][-1],
127
                                           other['circuit'][0]):
128
                has_return = True
129 1
        if not has_return:
130 1
            content['m_body'] = f"Circuit {circuit['circuit']} has no way back"
131 1
            event.content['message'] = content
132 1
            controller.buffers.app.put(event)
133 1
    return cleaned_circuits
134
135
136
# pylint: disable=too-many-return-statements
137 1
def _compare_endpoints(endpoint1, endpoint2):
138 1
    if endpoint1['dpid'] != endpoint2['dpid']:
139 1
        return False
140 1
    if (
141
        'in_port' not in endpoint1
142
        or 'out_port' not in endpoint2
143
        or endpoint1['in_port'] != endpoint2['out_port']
144
    ):
145 1
        return False
146 1
    if 'in_vlan' in endpoint1 and 'out_vlan' in endpoint2:
147 1
        if endpoint1['in_vlan'] != endpoint2['out_vlan']:
148 1
            return False
149 1
    elif 'in_vlan' in endpoint1 or 'out_vlan' in endpoint2:
150 1
        return False
151 1
    if 'out_vlan' in endpoint1 and 'in_vlan' in endpoint2:
152 1
        if endpoint1['out_vlan'] != endpoint2['in_vlan']:
153 1
            return False
154 1
    elif 'out_vlan' in endpoint1 or 'in_vlan' in endpoint2:
155 1
        return False
156
    return True
157