Test Failed
Pull Request — master (#96)
by
unknown
02:47
created

build.utils.match_field_ip()   A

Complexity

Conditions 1

Size

Total Lines 4
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 2

Importance

Changes 0
Metric Value
eloc 3
dl 0
loc 4
ccs 0
cts 0
cp 0
rs 10
c 0
b 0
f 0
cc 1
nop 2
crap 2
1
"""Utility functions to be used in this Napp"""
2
3 1
import ipaddress
4 1
5 1
import requests
6 1
from kytos.core.retry import before_sleep
7 1
from napps.amlight.sdntrace_cp import settings
8
from requests.exceptions import Timeout
9
from tenacity import (retry, retry_if_exception_type, stop_after_attempt,
10
                      wait_random)
11 1
12
13
@retry(
14
    stop=stop_after_attempt(3),
15
    wait=wait_random(min=0.1, max=0.2),
16 1
    before_sleep=before_sleep,
17
    retry=retry_if_exception_type((Timeout, ConnectionError)))
18 1
def get_stored_flows(dpids: list = None, state: str = "installed"):
19 1
    """Get stored flows from flow_manager napps."""
20
    api_url = f'{settings.FLOW_MANAGER_URL}/stored_flows'
21
    if dpids:
22
        str_dpids = ''
23
        for dpid in dpids:
24 1
            str_dpids += f'&dpid={dpid}'
25 1
        api_url += '/?'+str_dpids[1:]
26 1
    if state:
27 1
        char = '&' if dpids else '/?'
28 1
        api_url += char+f'state={state}'
29 1
    result = requests.get(api_url, timeout=20)
30
    flows_from_manager = result.json()
31
    return flows_from_manager
32 1
33
34
def convert_entries(entries):
35
    """ Transform entries dictionary in a plain dictionary suitable for
36
        matching
37
38
    :param entries: dict
39 1
    :return: plain dict
40 1
    """
41 1
    new_entries = {}
42 1
    for entry in entries['trace'].values():
43 1
        for field, value in entry.items():
44 1
            new_entries[field] = value
45 1
    if 'dl_vlan' in new_entries:
46
        new_entries['dl_vlan'] = [new_entries['dl_vlan']]
47
    return new_entries
48 1
49
50
def convert_list_entries(entries):
51
    """ Transform a list of entries dictionary in a list
52
    of plain dictionary suitable for matching
53
    :param entries: list(dict)
54 1
    :return: list(plain dict)
55 1
    """
56 1
    new_entries = []
57 1
    for entry in entries:
58 1
        new_entry = convert_entries(entry)
59 1
        if new_entry:
60
            new_entries.append(new_entry)
61
    return new_entries
62 1
63
64
def find_endpoint(switch, port):
65
    """ Find where switch/port is connected. If it is another switch,
66 1
    returns the interface it is connected to, otherwise returns None """
67 1
68
    interface = switch.get_interface_by_port_no(port)
69 1
    if not interface:
70 1
        return None
71 1
    if interface and interface.link:
72 1
        if interface == interface.link.endpoint_a:
73 1
            return {'endpoint': interface.link.endpoint_b}
74
        return {'endpoint': interface.link.endpoint_a}
75
    return {'endpoint': None}
76 1
77
78 1
def _prepare_json(trace_result):
79 1
    """Auxiliar function to return the json for REST call."""
80 1
    result = []
81 1
    for trace_step in trace_result:
82 1
        result.append(trace_step['in'])
83 1
    if result:
84
        result[-1]["out"] = trace_result[-1].get("out")
85
    return result
86 1
87
88 1
def prepare_json(trace_result):
89 1
    """Prepare return json for REST call."""
90 1
    result = []
91 1
    if trace_result and isinstance(trace_result[0], list):
92
        for trace in trace_result:
93 1
            result.append(_prepare_json(trace))
94 1
    else:
95
        result = _prepare_json(trace_result)
96
    return {'result': result}
97
98 1
99 1
# pylint: disable=too-many-return-statements
100 1
def _compare_endpoints(endpoint1, endpoint2):
101 1
    if endpoint1['dpid'] != endpoint2['dpid']:
102
        return False
103
    if (
104
        'in_port' not in endpoint1
105
        or 'out_port' not in endpoint2
106 1
        or endpoint1['in_port'] != endpoint2['out_port']
107 1
    ):
108 1
        return False
109 1
    if 'in_vlan' in endpoint1 and 'out_vlan' in endpoint2:
110 1
        if endpoint1['in_vlan'] != endpoint2['out_vlan']:
111 1
            return False
112 1
    elif 'in_vlan' in endpoint1 or 'out_vlan' in endpoint2:
113 1
        return False
114 1
    if 'out_vlan' in endpoint1 and 'in_vlan' in endpoint2:
115 1
        if endpoint1['out_vlan'] != endpoint2['in_vlan']:
116 1
            return False
117 1
    elif 'out_vlan' in endpoint1 or 'in_vlan' in endpoint2:
118
        return False
119
    return True
120 1
121
122 1
def convert_vlan(value):
123 1
    """Auxiliar function to calculate dl_vlan"""
124 1
    if isinstance(value, int):
125 1
        return value, 4095
126
    value, mask = map(int, value.split('/'))
127
    return value, mask
128 1
129
130
def match_field_dl_vlan(value, field_flow):
131
    """ Verify match in dl_vlan.
132 1
    value only takes an int in range [1,4095].
133 1
    0 is not allowed for value. """
134 1
    if not value:
135 1
        return field_flow == 0
136
    value = value[-1]
137
    value_flow, mask_flow = convert_vlan(field_flow)
138
    return value & (mask_flow & 4095) == value_flow & (mask_flow & 4095)
139
140
141
def match_field_ip(field, field_flow):
142
    "Verify match in ip fields"
143
    packet_ip = int(ipaddress.ip_address(field))
144
    return packet_ip & field_flow.netmask == field_flow.address
145