Passed
Pull Request — master (#95)
by
unknown
02:56
created

build.utils._prepare_json()   A

Complexity

Conditions 3

Size

Total Lines 8
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 7
CRAP Score 3

Importance

Changes 0
Metric Value
cc 3
eloc 7
nop 1
dl 0
loc 8
rs 10
c 0
b 0
f 0
ccs 7
cts 7
cp 1
crap 3
1
"""Utility functions to be used in this Napp"""
2
3 1
import requests
4 1
from kytos.core import log
5 1
from napps.amlight.sdntrace_cp import settings
6 1
from tenacity import RetryError, retry, stop_after_delay
7
8
9 1
@retry(stop=stop_after_delay(30))
10 1
def get_stored_flows(dpids: list = None, state: str = "installed"):
11
    """Get stored flows from flow_manager napps."""
12 1
    api_url = f'{settings.FLOW_MANAGER_URL}/stored_flows'
13 1
    if dpids:
14
        str_dpids = ''
15
        for dpid in dpids:
16
            str_dpids += f'&dpid={dpid}'
17
        api_url += '/?'+str_dpids[1:]
18 1
    if state:
19 1
        char = '&' if dpids else '/?'
20 1
        api_url += char+f'state={state}'
21 1
    try:
22 1
        result = requests.get(api_url)
23
    except RetryError as exception:
24
        log.error(f"Failed to get stored flows: {exception}")
25
        raise
26 1
    flows_from_manager = result.json()
27 1
    return flows_from_manager
28
29
30 1
def convert_entries(entries):
31
    """ Transform entries dictionary in a plain dictionary suitable for
32
        matching
33
34
    :param entries: dict
35
    :return: plain dict
36
    """
37 1
    new_entries = {}
38 1
    for entry in entries['trace'].values():
39 1
        for field, value in entry.items():
40 1
            new_entries[field] = value
41 1
    if 'dl_vlan' in new_entries:
42 1
        new_entries['dl_vlan'] = [new_entries['dl_vlan']]
43 1
    return new_entries
44
45
46 1
def convert_list_entries(entries):
47
    """ Transform a list of entries dictionary in a list
48
    of plain dictionary suitable for matching
49
    :param entries: list(dict)
50
    :return: list(plain dict)
51
    """
52 1
    new_entries = []
53 1
    for entry in entries:
54 1
        new_entry = convert_entries(entry)
55 1
        if new_entry:
56 1
            new_entries.append(new_entry)
57 1
    return new_entries
58
59
60 1
def find_endpoint(switch, port):
61
    """ Find where switch/port is connected. If it is another switch,
62
    returns the interface it is connected to, otherwise returns None """
63
64 1
    interface = switch.get_interface_by_port_no(port)
65 1
    if not interface:
66
        return None
67 1
    if interface and interface.link:
68 1
        if interface == interface.link.endpoint_a:
69 1
            return {'endpoint': interface.link.endpoint_b}
70 1
        return {'endpoint': interface.link.endpoint_a}
71 1
    return {'endpoint': None}
72
73
74 1
def _prepare_json(trace_result):
75
    """Auxiliar function to return the json for REST call."""
76 1
    result = []
77 1
    for trace_step in trace_result:
78 1
        result.append(trace_step['in'])
79 1
    if result:
80 1
        result[-1]["out"] = trace_result[-1].get("out")
81 1
    return result
82
83
84 1
def prepare_json(trace_result):
85
    """Prepare return json for REST call."""
86 1
    result = []
87 1
    if trace_result and isinstance(trace_result[0], list):
88 1
        for trace in trace_result:
89 1
            result.append(_prepare_json(trace))
90
    else:
91 1
        result = _prepare_json(trace_result)
92 1
    return {'result': result}
93
94
95
# pylint: disable=too-many-return-statements
96 1
def _compare_endpoints(endpoint1, endpoint2):
97 1
    if endpoint1['dpid'] != endpoint2['dpid']:
98 1
        return False
99 1
    if (
100
        'in_port' not in endpoint1
101
        or 'out_port' not in endpoint2
102
        or endpoint1['in_port'] != endpoint2['out_port']
103
    ):
104 1
        return False
105 1
    if 'in_vlan' in endpoint1 and 'out_vlan' in endpoint2:
106 1
        if endpoint1['in_vlan'] != endpoint2['out_vlan']:
107 1
            return False
108 1
    elif 'in_vlan' in endpoint1 or 'out_vlan' in endpoint2:
109 1
        return False
110 1
    if 'out_vlan' in endpoint1 and 'in_vlan' in endpoint2:
111 1
        if endpoint1['out_vlan'] != endpoint2['in_vlan']:
112 1
            return False
113 1
    elif 'out_vlan' in endpoint1 or 'in_vlan' in endpoint2:
114 1
        return False
115 1
    return True
116
117
118 1
def convert_vlan(value):
119
    """Auxiliar function to calculate dl_vlan"""
120 1
    if isinstance(value, int):
121 1
        return value, 4095
122 1
    value, mask = map(int, value.split('/'))
123 1
    return value, mask
124
125
126 1
def match_field_dl_vlan(value, field_flow):
127
    """ Verify match in dl_vlan.
128
    value only takes an int in range [1,4095].
129
    0 is not allowed for value. """
130 1
    if not value:
131 1
        return field_flow == 0
132 1
    value_flow, mask_flow = convert_vlan(field_flow)
133
    return value & (mask_flow & 4095) == value_flow & (mask_flow & 4095)
134