Code

< 40 %
40-60 %
> 60 %
1
"""Utility functions to be used in this Napp"""
2
# pylint: disable=consider-using-join
3 1
import ipaddress
4
5 1
import httpx
6 1
from kytos.core.retry import before_sleep
7 1
from napps.amlight.sdntrace_cp import settings
8 1
from tenacity import (retry, retry_if_exception_type, stop_after_attempt,
9
                      wait_random)
10
11
12 1
@retry(
13
    stop=stop_after_attempt(3),
14
    wait=wait_random(min=0.1, max=0.2),
15
    before_sleep=before_sleep,
16
    retry=retry_if_exception_type((httpx.RequestError, ConnectionError)))
17 1
def get_stored_flows(dpids: list = None, state: str = "installed"):
18
    """Get stored flows from flow_manager napps."""
19 1
    api_url = f'{settings.FLOW_MANAGER_URL}/stored_flows'
20 1
    if dpids:
21
        str_dpids = ''
22
        for dpid in dpids:
23
            str_dpids += f'&dpid={dpid}'
24
        api_url += '/?'+str_dpids[1:]
25 1
    if state:
26 1
        char = '&' if dpids else '/?'
27 1
        api_url += char+f'state={state}'
28 1
    result = httpx.get(api_url, timeout=20)
29 1
    flows_from_manager = result.json()
30 1
    return flows_from_manager
31
32
33 1
def convert_entries(entries):
34
    """ Transform entries dictionary in a plain dictionary suitable for
35
        matching
36
37
    :param entries: dict
38
    :return: plain dict
39
    """
40 1
    new_entries = {}
41 1
    for entry in entries['trace'].values():
42 1
        for field, value in entry.items():
43 1
            new_entries[field] = value
44 1
    if 'dl_vlan' in new_entries:
45 1
        new_entries['dl_vlan'] = [new_entries['dl_vlan']]
46 1
    return new_entries
47
48
49 1
def convert_list_entries(entries):
50
    """ Transform a list of entries dictionary in a list
51
    of plain dictionary suitable for matching
52
    :param entries: list(dict)
53
    :return: list(plain dict)
54
    """
55 1
    new_entries = []
56 1
    for entry in entries:
57 1
        new_entry = convert_entries(entry)
58 1
        if new_entry:
59 1
            new_entries.append(new_entry)
60 1
    return new_entries
61
62
63 1
def find_endpoint(switch, port):
64
    """ Find where switch/port is connected. If it is another switch,
65
    returns the interface it is connected to, otherwise returns None """
66
67 1
    interface = switch.get_interface_by_port_no(port)
68 1
    if not interface:
69
        return None
70 1
    if interface and interface.link:
71 1
        if interface == interface.link.endpoint_a:
72 1
            return {'endpoint': interface.link.endpoint_b}
73 1
        return {'endpoint': interface.link.endpoint_a}
74 1
    return {'endpoint': None}
75
76
77 1
def _prepare_json(trace_result):
78
    """Auxiliar function to return the json for REST call."""
79 1
    result = []
80 1
    for trace_step in trace_result:
81 1
        result.append(trace_step['in'])
82 1
    if result:
83 1
        result[-1]["out"] = trace_result[-1].get("out")
84 1
    return result
85
86
87 1
def prepare_json(trace_result):
88
    """Prepare return json for REST call."""
89 1
    result = []
90 1
    if trace_result and isinstance(trace_result[0], list):
91 1
        for trace in trace_result:
92 1
            result.append(_prepare_json(trace))
93
    else:
94 1
        result = _prepare_json(trace_result)
95 1
    return {'result': result}
96
97
98
# pylint: disable=too-many-return-statements
99 1
def _compare_endpoints(endpoint1, endpoint2):
100 1
    if endpoint1['dpid'] != endpoint2['dpid']:
101 1
        return False
102 1
    if (
103
        'in_port' not in endpoint1
104
        or 'out_port' not in endpoint2
105
        or endpoint1['in_port'] != endpoint2['out_port']
106
    ):
107 1
        return False
108 1
    if 'in_vlan' in endpoint1 and 'out_vlan' in endpoint2:
109 1
        if endpoint1['in_vlan'] != endpoint2['out_vlan']:
110 1
            return False
111 1
    elif 'in_vlan' in endpoint1 or 'out_vlan' in endpoint2:
112 1
        return False
113 1
    if 'out_vlan' in endpoint1 and 'in_vlan' in endpoint2:
114 1
        if endpoint1['out_vlan'] != endpoint2['in_vlan']:
115 1
            return False
116 1
    elif 'out_vlan' in endpoint1 or 'in_vlan' in endpoint2:
117 1
        return False
118 1
    return True
119
120
121 1
def convert_vlan(value):
122
    """Auxiliar function to calculate dl_vlan"""
123 1
    if isinstance(value, int):
124 1
        return value, 4095
125 1
    value, mask = map(int, value.split('/'))
126 1
    return value, mask
127
128
129 1
def match_field_dl_vlan(value, field_flow):
130
    """ Verify match in dl_vlan.
131
    value only takes an int in range [1,4095].
132
    0 is not allowed for value. """
133 1
    if not value:
134 1
        return field_flow == 0
135 1
    value = value[-1]
136 1
    value_flow, mask_flow = convert_vlan(field_flow)
137 1
    return value & (mask_flow & 4095) == value_flow & (mask_flow & 4095)
138
139
140 1
def match_field_ip(field, field_flow):
141
    "Verify match in ip fields"
142 1
    packet_address = ipaddress.ip_address(field)
143 1
    flow_network = ipaddress.ip_network(field_flow, strict=False)
144
    return packet_address in flow_network
145