Test Failed
Pull Request — master (#47)
by
unknown
06:35
created

build.utils.prepare_list_json()   A

Complexity

Conditions 2

Size

Total Lines 6
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 4
CRAP Score 2

Importance

Changes 0
Metric Value
cc 2
eloc 5
nop 1
dl 0
loc 6
rs 10
c 0
b 0
f 0
ccs 4
cts 4
cp 1
crap 2
1
"""Utility functions to be used in this Napp"""
2
3 1
from kytos.core import KytosEvent
4 1
from napps.amlight.sdntrace_cp import settings
5
6 1
TRANSLATE_NAMES = {
7
    'dl_src': 'eth_src',
8
    'dl_dst': 'eth_dst',
9
    'dl_type': 'eth_type',
10
    'dl_vlan': 'vlan_vid',
11
    'nw_src': 'ip4_src',
12
    'nw_dst': 'ip4_dst',
13
    'nw_tos': 'ip_tos',
14
    'nw_proto': 'ip_proto',
15
}
16
17
18 1
def convert_entries(entries):
19
    """ Transform entries dictionary in a plain dictionary suitable for
20
        matching
21
22
    :param entries: dict
23
    :return: plain dict
24
    """
25 1
    new_entries = {}
26 1
    for entry in entries['trace'].values():
27 1
        for field, value in entry.items():
28 1
            if field in TRANSLATE_NAMES:
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable TRANSLATE_NAMES does not seem to be defined.
Loading history...
29 1
                new_entries[TRANSLATE_NAMES[field]] = value
30
            else:
31 1
                new_entries[field] = value
32 1
    if 'vlan_vid' in new_entries:
33 1
        new_entries['vlan_vid'] = [new_entries['vlan_vid']]
34 1
    return new_entries
35
36
37 1
def convert_list_entries(entries):
38
    """ Transform a list of entries dictionary in a list
39
    of plain dictionary suitable for matching
40
    :param entries: list(dict)
41 1
    :return: list(plain dict)
42 1
    """
43 1
    return [convert_entries(entry) for entry in entries]
44 1
45 1
46 1
def find_endpoint(switch, port):
47
    """ Find where switch/port is connected. If it is another switch,
48
    returns the interface it is connected to, otherwise returns None """
49 1
50
    interface = switch.get_interface_by_port_no(port)
51 1
    if interface.link:
52 1
        if interface == interface.link.endpoint_a:
53 1
            return interface.link.endpoint_b
54 1
        return interface.link.endpoint_a
55
    return None
56
57 1
58
def prepare_list_json(trace_result):
59 1
    """Prepare return list of json for REST call."""
60 1
    result = []
61 1
    for trace_step in trace_result:
62
        result.append(trace_step['in'])
63 1
    return result
64 1
65 1
66 1
def prepare_json(trace_result):
67 1
    """Prepare return json for REST call."""
68 1
    return {'result': prepare_list_json(trace_result)}
69 1
70 1
71
def format_result(trace):
72
    """Format the result for automate circuit finding"""
73 1
    result = []
74
    for step in trace:
75 1
        new_result = {'dpid': step['in']['dpid'],
76 1
                      'in_port': step['in']['port']}
77 1
        if 'out' in step:
78
            new_result.update({'out_port': step['out']['port']})
79
            if 'vlan' in step['out']:
80
                new_result.update({'out_vlan': step['out']['vlan']})
81 1
        if 'vlan' in step['in']:
82 1
            new_result.update({'in_vlan': step['in']['vlan']})
83 1
        result.append(new_result)
84 1
    return result
85 1
86 1
87 1
def clean_circuits(circuits, controller):
88 1
    """Remove sub-circuits."""
89 1
    cleaned_circuits = []
90 1
    event = KytosEvent(name='amlight/kytos_courier.slack_send')
91 1
    content = {
92 1
        'channel': settings.SLACK_CHANNEL,
93 1
        'source': 'amlight/sdntrace_cp'
94 1
    }
95
    for circuit in circuits:
96 1
        sub = False
97 1
        for other in circuits:
98 1
            if circuit['circuit'] == other['circuit']:
99 1
                continue
100
            sub = True
101
            for step in circuit['circuit']:
102
                if step not in other['circuit']:
103
                    sub = False
104 1
                    break
105 1
            if sub:
106 1
                break
107 1
        if not sub:
108 1
            cleaned_circuits.append(circuit)
109
110
    for circuit in cleaned_circuits:
111
        has_return = False
112 1
        for other in cleaned_circuits:
113 1
            if _compare_endpoints(circuit['circuit'][0],
114 1
                                  other['circuit'][-1]) \
115 1
                    and _compare_endpoints(circuit['circuit'][-1],
116
                                           other['circuit'][0]):
117
                has_return = True
118
        if not has_return:
119
            content['m_body'] = f"Circuit {circuit['circuit']} has no way back"
120 1
            event.content['message'] = content
121 1
            controller.buffers.app.put(event)
122 1
    return cleaned_circuits
123 1
124 1
125 1
# pylint: disable=too-many-return-statements
126 1
def _compare_endpoints(endpoint1, endpoint2):
127 1
    if endpoint1['dpid'] != endpoint2['dpid']:
128 1
        return False
129 1
    if (
130 1
        'in_port' not in endpoint1
131 1
        or 'out_port' not in endpoint2
132
        or endpoint1['in_port'] != endpoint2['out_port']
133
    ):
134
        return False
135
    if 'in_vlan' in endpoint1 and 'out_vlan' in endpoint2:
136
        if endpoint1['in_vlan'] != endpoint2['out_vlan']:
137
            return False
138
    elif 'in_vlan' in endpoint1 or 'out_vlan' in endpoint2:
139
        return False
140
    if 'out_vlan' in endpoint1 and 'in_vlan' in endpoint2:
141
        if endpoint1['out_vlan'] != endpoint2['in_vlan']:
142
            return False
143
    elif 'out_vlan' in endpoint1 or 'in_vlan' in endpoint2:
144
        return False
145
    return True
146