Passed
Push — master ( e3f2ff...728489 )
by Vinicius
02:32 queued 14s
created

build.utils.map_evc_event_content()   A

Complexity

Conditions 1

Size

Total Lines 9
Code Lines 8

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 1

Importance

Changes 0
Metric Value
cc 1
eloc 8
nop 2
dl 0
loc 9
ccs 2
cts 2
cp 1
crap 1
rs 10
c 0
b 0
f 0
1
"""Utility functions."""
2 1
from kytos.core.common import EntityStatus
3 1
from kytos.core.events import KytosEvent
4 1
from kytos.core.interface import UNI
5 1
from napps.kytos.mef_eline.exceptions import DisabledSwitch
6
7
8 1
def map_evc_event_content(evc, **kwargs):
9
    """Returns a set of values from evc to be used for content"""
10 1
    return kwargs | {"evc_id": evc.id,
11
                     "name": evc.name,
12
                     "metadata": evc.metadata,
13
                     "active": evc._active,
14
                     "enabled": evc._enabled,
15
                     "uni_a": evc.uni_a.as_dict(),
16
                     "uni_z": evc.uni_z.as_dict()}
17
18
19 1
def emit_event(controller, name, context="kytos/mef_eline", content=None):
20
    """Send an event when something happens with an EVC."""
21 1
    event_name = f"{context}.{name}"
22 1
    event = KytosEvent(name=event_name, content=content)
23 1
    controller.buffers.app.put(event)
24
25
26 1
async def aemit_event(controller, name, content):
27
    """Send an asynchronous event"""
28 1
    event = KytosEvent(name=name, content=content)
29 1
    await controller.buffers.app.aput(event)
30
31
32 1
def notify_link_available_tags(controller, link, src_func=None):
33
    """Notify link available tags."""
34 1
    emit_event(controller, "link_available_tags", content={
35
        "link": link,
36
        "src_func": src_func
37
    })
38
39
40 1
def compare_endpoint_trace(endpoint, vlan, trace):
41
    """Compare and endpoint with a trace step."""
42 1
    if vlan and "vlan" in trace:
43 1
        return (
44
            endpoint.switch.dpid == trace["dpid"]
45
            and endpoint.port_number == trace["port"]
46
            and vlan == trace["vlan"]
47
        )
48 1
    return (
49
        endpoint.switch.dpid == trace["dpid"]
50
        and endpoint.port_number == trace["port"]
51
    )
52
53
54 1
def map_dl_vlan(value):
55
    """Map dl_vlan value with the following criteria:
56
    dl_vlan = untagged or 0 -> None
57
    dl_vlan = any or "4096/4096" -> 1
58
    dl_vlan = "num1/num2" -> int in [1, 4095]"""
59 1
    special_untagged = {"untagged", 0}
60 1
    if value in special_untagged:
61 1
        return None
62 1
    special_any = {"any": 1, "4096/4096": 1}
63 1
    value = special_any.get(value, value)
64 1
    if isinstance(value, int):
65 1
        return value
66 1
    value, mask = map(int, value.split('/'))
67 1
    return value & (mask & 4095)
68
69
70 1
def compare_uni_out_trace(uni, trace):
71
    """Check if the trace last step (output) matches the UNI attributes."""
72
    # keep compatibility for old versions of sdntrace-cp
73 1
    if "out" not in trace:
74 1
        return True
75 1
    if not isinstance(trace["out"], dict):
76 1
        return False
77 1
    uni_vlan = map_dl_vlan(uni.user_tag.value) if uni.user_tag else None
78 1
    return (
79
        uni.interface.port_number == trace["out"].get("port")
80
        and uni_vlan == trace["out"].get("vlan")
81
    )
82
83
84 1
def max_power2_divisor(number: int, limit: int = 4096) -> int:
85
    """Get the max power of 2 that is divisor of number"""
86 1
    while number % limit > 0:
87 1
        limit //= 2
88 1
    return limit
89
90
91 1
def get_vlan_tags_and_masks(start: int, end: int) -> list[tuple[int, int]]:
92
    """Get the minimum number of vlan/mask pairs for a given range."""
93 1
    limit = end + 1
94 1
    tags_and_masks = []
95 1
    while start < limit:
96 1
        divisor = max_power2_divisor(start)
97 1
        while divisor > limit - start:
98 1
            divisor //= 2
99 1
        tags_and_masks.append((start, 4096-divisor))
100 1
        start += divisor
101 1
    return tags_and_masks
102
103
104 1
def check_disabled_component(uni_a: UNI, uni_z: UNI):
105
    """Check if a switch or an interface is disabled"""
106 1
    if uni_a.interface.switch != uni_z.interface.switch:
107 1
        return
108 1
    if uni_a.interface.switch.status == EntityStatus.DISABLED:
109 1
        dpid = uni_a.interface.switch.dpid
110 1
        raise DisabledSwitch(f"Switch {dpid} is disabled")
111 1
    if uni_a.interface.status == EntityStatus.DISABLED:
112 1
        id_ = uni_a.interface.id
113 1
        raise DisabledSwitch(f"Interface {id_} is disabled")
114 1
    if uni_z.interface.status == EntityStatus.DISABLED:
115 1
        id_ = uni_z.interface.id
116
        raise DisabledSwitch(f"Interface {id_} is disabled")
117