Issues (28)

utils.py (1 issue)

Severity
1
"""Utility functions."""
2 1
from typing import Union
3
4 1
import httpx
5 1
from tenacity import (retry, retry_if_exception_type, stop_after_attempt,
6
                      wait_combine, wait_fixed, wait_random)
7
8 1
from kytos.core.common import EntityStatus
9 1
from kytos.core.events import KytosEvent
10 1
from kytos.core.interface import UNI, Interface, TAGRange
11 1
from kytos.core.retry import before_sleep
12 1
from napps.kytos.mef_eline import settings
13 1
from napps.kytos.mef_eline.exceptions import DisabledSwitch, FlowModException
14
15
16 1
def map_evc_event_content(evc, **kwargs) -> dict:
17
    """Returns a set of values from evc to be used for content"""
18 1
    return kwargs | {"evc_id": evc.id,
19
                     "id": evc.id,
20
                     "name": evc.name,
21
                     "metadata": evc.metadata,
22
                     "active": evc._active,
23
                     "enabled": evc._enabled,
24
                     "uni_a": evc.uni_a.as_dict(),
25
                     "uni_z": evc.uni_z.as_dict()}
26
27
28 1
def emit_event(controller, name, context="kytos/mef_eline", content=None,
29
               timeout=None):
30
    """Send an event when something happens with an EVC."""
31 1
    event_name = f"{context}.{name}"
32 1
    event = KytosEvent(name=event_name, content=content)
33 1
    controller.buffers.app.put(event, timeout=timeout)
34
35
36 1
def merge_flow_dicts(
37
    dst: dict[str, list], *srcs: dict[str, list]
38
) -> dict[str, list]:
39
    """Merge srcs dict flows into dst."""
40 1
    for src in srcs:
41 1
        for k, v in src.items():
42 1
            if k not in dst:
43 1
                dst[k] = v
44
            else:
45 1
                dst[k].extend(v)
46 1
    return dst
47
48
49 1
async def aemit_event(controller, name, content):
50
    """Send an asynchronous event"""
51 1
    event = KytosEvent(name=name, content=content)
52 1
    await controller.buffers.app.aput(event)
53
54
55 1
def compare_endpoint_trace(endpoint, vlan, trace):
56
    """Compare and endpoint with a trace step."""
57 1
    if vlan and "vlan" in trace:
58 1
        return (
59
            endpoint.switch.dpid == trace["dpid"]
60
            and endpoint.port_number == trace["port"]
61
            and vlan == trace["vlan"]
62
        )
63 1
    return (
64
        endpoint.switch.dpid == trace["dpid"]
65
        and endpoint.port_number == trace["port"]
66
    )
67
68
69 1
def map_dl_vlan(value: Union[str, int]) -> bool:
70
    """Map dl_vlan value with the following criteria:
71
    dl_vlan = untagged or 0 -> None
72
    dl_vlan = any or "4096/4096" -> 1
73
    dl_vlan = "num1/num2" -> int in [1, 4095]"""
74 1
    special_untagged = {"untagged", 0}
75 1
    if value in special_untagged:
76 1
        return None
77 1
    special_any = {"any": 1, "4096/4096": 1}
78 1
    value = special_any.get(value, value)
79 1
    if isinstance(value, int):
80 1
        return value
81 1
    value, mask = map(int, value.split('/'))
82 1
    return value & (mask & 4095)
83
84
85 1
def compare_uni_out_trace(
86
    tag_value: Union[None, int, str],
87
    interface: Interface,
88
    trace: dict
89
) -> bool:
90
    """Check if the trace last step (output) matches the UNI attributes."""
91
    # keep compatibility for old versions of sdntrace-cp
92 1
    if "out" not in trace:
93 1
        return True
94 1
    if not isinstance(trace["out"], dict):
95 1
        return False
96 1
    uni_vlan = map_dl_vlan(tag_value) if tag_value else None
97 1
    return (
98
        interface.port_number == trace["out"].get("port")
99
        and uni_vlan == trace["out"].get("vlan")
100
    )
101
102
103 1
def max_power2_divisor(number: int, limit: int = 4096) -> int:
104
    """Get the max power of 2 that is divisor of number"""
105 1
    while number % limit > 0:
106 1
        limit //= 2
107 1
    return limit
108
109
110 1
def get_vlan_tags_and_masks(tag_ranges: list[list[int]]) -> list[int, str]:
111
    """Get a list of vlan/mask pairs for a given list of ranges."""
112 1
    masks_list = []
113 1
    for start, end in tag_ranges:
114 1
        limit = end + 1
115 1
        while start < limit:
116 1
            divisor = max_power2_divisor(start)
117 1
            while divisor > limit - start:
118 1
                divisor //= 2
119 1
            mask = 4096 - divisor
120 1
            if mask == 4095:
121 1
                masks_list.append(start)
122
            else:
123 1
                masks_list.append(f"{start}/{mask}")
124 1
            start += divisor
125 1
    return masks_list
126
127
128 1
def check_disabled_component(uni_a: UNI, uni_z: UNI):
129
    """Check if a switch or an interface is disabled"""
130 1
    if uni_a.interface.switch != uni_z.interface.switch:
131 1
        return
132 1
    if uni_a.interface.switch.status == EntityStatus.DISABLED:
133 1
        dpid = uni_a.interface.switch.dpid
134 1
        raise DisabledSwitch(f"Switch {dpid} is disabled")
135 1
    if uni_a.interface.status == EntityStatus.DISABLED:
136 1
        id_ = uni_a.interface.id
137 1
        raise DisabledSwitch(f"Interface {id_} is disabled")
138 1
    if uni_z.interface.status == EntityStatus.DISABLED:
139 1
        id_ = uni_z.interface.id
140 1
        raise DisabledSwitch(f"Interface {id_} is disabled")
141
142
143 1
def make_uni_list(list_circuits: list) -> list:
144
    """Make uni list to be sent to sdntrace"""
145 1
    uni_list = []
146 1
    for circuit in list_circuits:
147 1
        if isinstance(circuit.uni_a.user_tag, TAGRange):
148
            # TAGRange value from uni_a and uni_z are currently mirrored
149 1
            mask_list = (circuit.uni_a.user_tag.mask_list or
150
                         circuit.uni_z.user_tag.mask_list)
151 1
            for mask in mask_list:
152 1
                uni_list.append((circuit.uni_a.interface, mask))
153 1
                uni_list.append((circuit.uni_z.interface, mask))
154
        else:
155 1
            tag_a = None
156 1
            if circuit.uni_a.user_tag:
157 1
                tag_a = circuit.uni_a.user_tag.value
158 1
            uni_list.append(
159
                (circuit.uni_a.interface, tag_a)
160
            )
161 1
            tag_z = None
162 1
            if circuit.uni_z.user_tag:
163 1
                tag_z = circuit.uni_z.user_tag.value
164 1
            uni_list.append(
165
                (circuit.uni_z.interface, tag_z)
166
            )
167 1
    return uni_list
168
169
170 1
def send_flow_mods_event(
171
    controller, flow_dict: dict[str, list], action: str, force=True
172
):
173
    """Send flow mods to be deleted or install to flow_manager
174
     through an event"""
175
    for dpid, flows in flow_dict.items():
176
        emit_event(
177
            controller,
178
            context="kytos.flow_manager",
179
            name=f"flows.{action}",
180
            content={
181
                "dpid": dpid,
182
                "flow_dict": {"flows": flows},
183
                "force": force,
184
            },
185
        )
186
187
188 1
@retry(
189
    stop=stop_after_attempt(3),
190
    wait=wait_combine(wait_fixed(3), wait_random(min=2, max=7)),
191
    retry=retry_if_exception_type(FlowModException),
192
    before_sleep=before_sleep,
193
    reraise=True,
194
)
195 1
def send_flow_mods_http(
196
    flow_dict: dict[str, list],
197
    action: str, force=True
198
):
199
    """
200
    Send a flow_mod list to a specific switch.
201
202
    Args:
203
        dpid(str): The target of flows (i.e. Switch.id).
204
        flow_mods(dict): Python dictionary with flow_mods.
205
        command(str): By default is 'flows'. To remove a flow is 'remove'.
206
        force(bool): True to send via consistency check in case of errors.
207
        by_switch(bool): True to send to 'flows_by_switch' request instead.
208
    """
209
    endpoint = f"{settings.MANAGER_URL}/flows_by_switch/?force={force}"
210
211
    formatted_dict = {
212
        dpid: {"flows": flows}
213
        for (dpid, flows) in flow_dict.items()
214
    }
215
216
    try:
217
        if action == "install":
218
            res = httpx.post(endpoint, json=formatted_dict, timeout=30)
219
        elif action == "delete":
220
            res = httpx.request(
221
                "DELETE", endpoint, json=formatted_dict, timeout=30
222
            )
223
    except httpx.RequestError as err:
224
        raise FlowModException(str(err)) from err
225
    if res.is_server_error or res.status_code >= 400:
0 ignored issues
show
The variable res does not seem to be defined for all execution paths.
Loading history...
226
        raise FlowModException(res.text)
227
228
229 1
def prepare_delete_flow(evc_flows: dict[str, list[dict]]):
230
    """Create flow mods suited for flow deletion."""
231 1
    dpid_flows: dict[str, list[dict]] = {}
232
233 1
    if not evc_flows:
234
        return dpid_flows
235
236 1
    for dpid, flows in evc_flows.items():
237 1
        dpid_flows.setdefault(dpid, [])
238 1
        for flow in flows:
239 1
            dpid_flows[dpid].append({
240
                "cookie": flow["cookie"],
241
                "match": flow["match"],
242
                "owner": "mef_eline",
243
                "cookie_mask": int(0xffffffffffffffff)
244
            })
245 1
    return dpid_flows
246
247
248 1
def _does_uni_affect_evc(evc, interface: Interface, link_event: str) -> bool:
249
    """Check if an interface flap is affecting an EVC UNI."""
250 1
    interface_a = evc.uni_a.interface
251 1
    interface_z = evc.uni_z.interface
252 1
    interface_affected = interface in (interface_a, interface_z)
253 1
    interface_down = (
254
        interface_a.status != EntityStatus.UP
255
        or interface_z.status != EntityStatus.UP
256
    )
257 1
    if link_event == "up":
258 1
        return (not evc.is_active() and interface_affected
259
                and not interface_down)
260 1
    if link_event == "down":
261 1
        return evc.is_active() and interface_affected and interface_down
262
    return False
263