Passed
Pull Request — master (#47)
by
unknown
03:09
created

build.utils.find_endpoint()   A

Complexity

Conditions 3

Size

Total Lines 10
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 7
CRAP Score 3

Importance

Changes 0
Metric Value
cc 3
eloc 7
nop 2
dl 0
loc 10
rs 10
c 0
b 0
f 0
ccs 7
cts 7
cp 1
crap 3
1
"""Utility functions to be used in this Napp"""
2
3 1
from kytos.core import KytosEvent
4 1
from napps.amlight.sdntrace_cp import settings
5
6
7 1
def convert_entries(entries):
8
    """ Transform entries dictionary in a plain dictionary suitable for
9
        matching
10
11
    :param entries: dict
12
    :return: plain dict
13
    """
14 1
    new_entries = {}
15 1
    for entry in entries['trace'].values():
16 1
        for field, value in entry.items():
17 1
            new_entries[field] = value
18 1
    if 'dl_vlan' in new_entries:
19 1
        new_entries['dl_vlan'] = [new_entries['dl_vlan']]
20 1
    return new_entries
21
22
23 1
def convert_list_entries(entries):
24
    """ Transform a list of entries dictionary in a list
25
    of plain dictionary suitable for matching
26
    :param entries: list(dict)
27
    :return: list(plain dict)
28
    """
29 1
    return [convert_entries(entry) for entry in entries]
30
31
32 1
def find_endpoint(switch, port):
33
    """ Find where switch/port is connected. If it is another switch,
34
    returns the interface it is connected to, otherwise returns None """
35
36 1
    interface = switch.get_interface_by_port_no(port)
37 1
    if interface.link:
38 1
        if interface == interface.link.endpoint_a:
39 1
            return interface.link.endpoint_b
40 1
        return interface.link.endpoint_a
41 1
    return None
42
43
44 1
def prepare_list_json(trace_result):
45
    """Prepare return list of json for REST call."""
46 1
    result = []
47 1
    for trace_step in trace_result:
48 1
        result.append(trace_step['in'])
49 1
    return result
50
51
52 1
def prepare_json(trace_result):
53
    """Prepare return json for REST call."""
54 1
    return {'result': prepare_list_json(trace_result)}
55
56
57 1
def format_result(trace):
58
    """Format the result for automate circuit finding"""
59 1
    result = []
60 1
    for step in trace:
61 1
        new_result = {'dpid': step['in']['dpid'],
62
                      'in_port': step['in']['port']}
63 1
        if 'out' in step:
64 1
            new_result.update({'out_port': step['out']['port']})
65 1
            if 'vlan' in step['out']:
66 1
                new_result.update({'out_vlan': step['out']['vlan']})
67 1
        if 'vlan' in step['in']:
68 1
            new_result.update({'in_vlan': step['in']['vlan']})
69 1
        result.append(new_result)
70 1
    return result
71
72
73 1
def clean_circuits(circuits, controller):
74
    """Remove sub-circuits."""
75 1
    cleaned_circuits = []
76 1
    event = KytosEvent(name='amlight/kytos_courier.slack_send')
77 1
    content = {
78
        'channel': settings.SLACK_CHANNEL,
79
        'source': 'amlight/sdntrace_cp'
80
    }
81 1
    for circuit in circuits:
82 1
        sub = False
83 1
        for other in circuits:
84 1
            if circuit['circuit'] == other['circuit']:
85 1
                continue
86 1
            sub = True
87 1
            for step in circuit['circuit']:
88 1
                if step not in other['circuit']:
89 1
                    sub = False
90 1
                    break
91 1
            if sub:
92 1
                break
93 1
        if not sub:
94 1
            cleaned_circuits.append(circuit)
95
96 1
    for circuit in cleaned_circuits:
97 1
        has_return = False
98 1
        for other in cleaned_circuits:
99 1
            if _compare_endpoints(circuit['circuit'][0],
100
                                  other['circuit'][-1]) \
101
                    and _compare_endpoints(circuit['circuit'][-1],
102
                                           other['circuit'][0]):
103
                has_return = True
104 1
        if not has_return:
105 1
            content['m_body'] = f"Circuit {circuit['circuit']} has no way back"
106 1
            event.content['message'] = content
107 1
            controller.buffers.app.put(event)
108 1
    return cleaned_circuits
109
110
111
# pylint: disable=too-many-return-statements
112 1
def _compare_endpoints(endpoint1, endpoint2):
113 1
    if endpoint1['dpid'] != endpoint2['dpid']:
114 1
        return False
115 1
    if (
116
        'in_port' not in endpoint1
117
        or 'out_port' not in endpoint2
118
        or endpoint1['in_port'] != endpoint2['out_port']
119
    ):
120 1
        return False
121 1
    if 'in_vlan' in endpoint1 and 'out_vlan' in endpoint2:
122 1
        if endpoint1['in_vlan'] != endpoint2['out_vlan']:
123 1
            return False
124 1
    elif 'in_vlan' in endpoint1 or 'out_vlan' in endpoint2:
125 1
        return False
126 1
    if 'out_vlan' in endpoint1 and 'in_vlan' in endpoint2:
127 1
        if endpoint1['out_vlan'] != endpoint2['in_vlan']:
128 1
            return False
129 1
    elif 'out_vlan' in endpoint1 or 'in_vlan' in endpoint2:
130 1
        return False
131
    return True
132