build.utils.send_flow_mods_http()   B
last analyzed

Complexity

Conditions 6

Size

Total Lines 39
Code Lines 23

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 27.8133

Importance

Changes 0
Metric Value
cc 6
eloc 23
nop 3
dl 0
loc 39
ccs 2
cts 13
cp 0.1538
crap 27.8133
rs 8.3946
c 0
b 0
f 0
1
"""Utility functions."""
2 1
from typing import Union
3
4 1
import httpx
5 1
from tenacity import (retry, retry_if_exception_type, stop_after_attempt,
6
                      wait_combine, wait_fixed, wait_random)
7
8 1
from kytos.core.common import EntityStatus
9 1
from kytos.core.events import KytosEvent
10 1
from kytos.core.interface import Interface, TAGRange
11 1
from kytos.core.retry import before_sleep
12 1
from napps.kytos.mef_eline import settings
13 1
from napps.kytos.mef_eline.exceptions import FlowModException
14
15
16 1
def map_evc_event_content(evc, **kwargs) -> dict:
17
    """Returns a set of values from evc to be used for content"""
18 1
    return kwargs | {"evc_id": evc.id,
19
                     "id": evc.id,
20
                     "name": evc.name,
21
                     "metadata": evc.metadata,
22
                     "active": evc._active,
23
                     "enabled": evc._enabled,
24
                     "uni_a": evc.uni_a.as_dict(),
25
                     "uni_z": evc.uni_z.as_dict()}
26
27
28 1
def emit_event(controller, name, context="kytos/mef_eline", content=None,
29
               timeout=None):
30
    """Send an event when something happens with an EVC."""
31 1
    event_name = f"{context}.{name}"
32 1
    event = KytosEvent(name=event_name, content=content)
33 1
    controller.buffers.app.put(event, timeout=timeout)
34
35
36 1
def merge_flow_dicts(
37
    dst: dict[str, list], *srcs: dict[str, list]
38
) -> dict[str, list]:
39
    """Merge srcs dict flows into dst."""
40 1
    for src in srcs:
41 1
        for k, v in src.items():
42 1
            if k not in dst:
43 1
                dst[k] = v
44
            else:
45 1
                dst[k].extend(v)
46 1
    return dst
47
48
49 1
async def aemit_event(controller, name, content):
50
    """Send an asynchronous event"""
51 1
    event = KytosEvent(name=name, content=content)
52 1
    await controller.buffers.app.aput(event)
53
54
55 1
def compare_endpoint_trace(endpoint, vlan, trace):
56
    """Compare and endpoint with a trace step."""
57 1
    if vlan and "vlan" in trace:
58 1
        return (
59
            endpoint.switch.dpid == trace["dpid"]
60
            and endpoint.port_number == trace["port"]
61
            and vlan == trace["vlan"]
62
        )
63 1
    return (
64
        endpoint.switch.dpid == trace["dpid"]
65
        and endpoint.port_number == trace["port"]
66
    )
67
68
69 1
def map_dl_vlan(value: Union[str, int]) -> bool:
70
    """Map dl_vlan value with the following criteria:
71
    dl_vlan = untagged or 0 -> None
72
    dl_vlan = any or "4096/4096" -> 1
73
    dl_vlan = "num1/num2" -> int in [1, 4095]"""
74 1
    special_untagged = {"untagged", 0}
75 1
    if value in special_untagged:
76 1
        return None
77 1
    special_any = {"any": 1, "4096/4096": 1}
78 1
    value = special_any.get(value, value)
79 1
    if isinstance(value, int):
80 1
        return value
81 1
    value, mask = map(int, value.split('/'))
82 1
    return value & (mask & 4095)
83
84
85 1
def compare_uni_out_trace(
86
    tag_value: Union[None, int, str],
87
    interface: Interface,
88
    trace: dict
89
) -> bool:
90
    """Check if the trace last step (output) matches the UNI attributes."""
91
    # keep compatibility for old versions of sdntrace-cp
92 1
    if "out" not in trace:
93 1
        return True
94 1
    if not isinstance(trace["out"], dict):
95 1
        return False
96 1
    uni_vlan = map_dl_vlan(tag_value) if tag_value else None
97 1
    return (
98
        interface.port_number == trace["out"].get("port")
99
        and uni_vlan == trace["out"].get("vlan")
100
    )
101
102
103 1
def max_power2_divisor(number: int, limit: int = 4096) -> int:
104
    """Get the max power of 2 that is divisor of number"""
105 1
    while number % limit > 0:
106 1
        limit //= 2
107 1
    return limit
108
109
110 1
def get_vlan_tags_and_masks(tag_ranges: list[list[int]]) -> list[int, str]:
111
    """Get a list of vlan/mask pairs for a given list of ranges."""
112 1
    masks_list = []
113 1
    for start, end in tag_ranges:
114 1
        limit = end + 1
115 1
        while start < limit:
116 1
            divisor = max_power2_divisor(start)
117 1
            while divisor > limit - start:
118 1
                divisor //= 2
119 1
            mask = 4096 - divisor
120 1
            if mask == 4095:
121 1
                masks_list.append(start)
122
            else:
123 1
                masks_list.append(f"{start}/{mask}")
124 1
            start += divisor
125 1
    return masks_list
126
127
128 1
def make_uni_list(list_circuits: list) -> list:
129
    """Make uni list to be sent to sdntrace"""
130 1
    uni_list = []
131 1
    for circuit in list_circuits:
132 1
        if isinstance(circuit.uni_a.user_tag, TAGRange):
133
            # TAGRange value from uni_a and uni_z are currently mirrored
134 1
            mask_list = (circuit.uni_a.user_tag.mask_list or
135
                         circuit.uni_z.user_tag.mask_list)
136 1
            for mask in mask_list:
137 1
                uni_list.append((circuit.uni_a.interface, mask))
138 1
                uni_list.append((circuit.uni_z.interface, mask))
139
        else:
140 1
            tag_a = None
141 1
            if circuit.uni_a.user_tag:
142 1
                tag_a = circuit.uni_a.user_tag.value
143 1
            uni_list.append(
144
                (circuit.uni_a.interface, tag_a)
145
            )
146 1
            tag_z = None
147 1
            if circuit.uni_z.user_tag:
148 1
                tag_z = circuit.uni_z.user_tag.value
149 1
            uni_list.append(
150
                (circuit.uni_z.interface, tag_z)
151
            )
152 1
    return uni_list
153
154
155 1
def send_flow_mods_event(
156
    controller, flow_dict: dict[str, list], action: str, force=True
157
):
158
    """Send flow mods to be deleted or install to flow_manager
159
     through an event"""
160
    for dpid, flows in flow_dict.items():
161
        emit_event(
162
            controller,
163
            context="kytos.flow_manager",
164
            name=f"flows.{action}",
165
            content={
166
                "dpid": dpid,
167
                "flow_dict": {"flows": flows},
168
                "force": force,
169
            },
170
        )
171
172
173 1
@retry(
174
    stop=stop_after_attempt(3),
175
    wait=wait_combine(wait_fixed(3), wait_random(min=2, max=7)),
176
    retry=retry_if_exception_type(FlowModException),
177
    before_sleep=before_sleep,
178
    reraise=True,
179
)
180 1
def send_flow_mods_http(
181
    flow_dict: dict[str, list],
182
    action: str, force=True
183
):
184
    """
185
    Send a flow_mod list to a specific switch.
186
187
    Args:
188
        dpid(str): The target of flows (i.e. Switch.id).
189
        flow_mods(dict): Python dictionary with flow_mods.
190
        command(str): By default is 'flows'. To remove a flow is 'remove'.
191
        force(bool): True to send via consistency check in case of errors.
192
        by_switch(bool): True to send to 'flows_by_switch' request instead.
193
    """
194
    endpoint = f"{settings.MANAGER_URL}/flows_by_switch/?force={force}"
195
196
    formatted_dict = {
197
        dpid: {"flows": flows}
198
        for (dpid, flows) in flow_dict.items()
199
    }
200
201
    try:
202
        if action == "install":
203
            res = httpx.post(endpoint, json=formatted_dict, timeout=30)
204
        elif action == "delete":
205
            res = httpx.request(
206
                "DELETE", endpoint, json=formatted_dict, timeout=30
207
            )
208
    except httpx.RequestError as err:
209
        raise FlowModException(str(err)) from err
210
    if res.is_server_error or res.status_code >= 400:
0 ignored issues
show
introduced by
The variable res does not seem to be defined for all execution paths.
Loading history...
211
        raise FlowModException(res.text)
212
213
214 1
def prepare_delete_flow(evc_flows: dict[str, list[dict]]):
215
    """Create flow mods suited for flow deletion."""
216 1
    dpid_flows: dict[str, list[dict]] = {}
217
218 1
    if not evc_flows:
219
        return dpid_flows
220
221 1
    for dpid, flows in evc_flows.items():
222 1
        dpid_flows.setdefault(dpid, [])
223 1
        for flow in flows:
224 1
            dpid_flows[dpid].append({
225
                "cookie": flow["cookie"],
226
                "match": flow["match"],
227
                "owner": "mef_eline",
228
                "cookie_mask": int(0xffffffffffffffff)
229
            })
230 1
    return dpid_flows
231
232
233 1
def _does_uni_affect_evc(evc, interface: Interface, link_event: str) -> bool:
234
    """Check if an interface flap is affecting an EVC UNI."""
235 1
    interface_a = evc.uni_a.interface
236 1
    interface_z = evc.uni_z.interface
237 1
    interface_affected = interface in (interface_a, interface_z)
238 1
    interface_down = (
239
        interface_a.status != EntityStatus.UP
240
        or interface_z.status != EntityStatus.UP
241
    )
242 1
    if link_event == "up":
243 1
        return (not evc.is_active() and interface_affected
244
                and not interface_down)
245 1
    if link_event == "down":
246 1
        return evc.is_active() and interface_affected and interface_down
247
    return False
248