Passed
Pull Request — master (#407)
by Vinicius
07:56
created

build.utils.compare_endpoint_trace()   A

Complexity

Conditions 3

Size

Total Lines 11
Code Lines 9

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 4
CRAP Score 3

Importance

Changes 0
Metric Value
eloc 9
dl 0
loc 11
ccs 4
cts 4
cp 1
rs 9.95
c 0
b 0
f 0
cc 3
nop 3
crap 3
1
"""Utility functions."""
2 1
from typing import Union
3
4 1
from kytos.core.common import EntityStatus
5 1
from kytos.core.events import KytosEvent
6 1
from kytos.core.interface import UNI, Interface, TAGRange
7 1
from napps.kytos.mef_eline.exceptions import DisabledSwitch
8
9
10 1
def map_evc_event_content(evc, **kwargs):
11
    """Returns a set of values from evc to be used for content"""
12 1
    return kwargs | {"evc_id": evc.id,
13
                     "name": evc.name,
14
                     "metadata": evc.metadata,
15
                     "active": evc._active,
16
                     "enabled": evc._enabled,
17
                     "uni_a": evc.uni_a.as_dict(),
18
                     "uni_z": evc.uni_z.as_dict()}
19
20
21 1
def emit_event(controller, name, context="kytos/mef_eline", content=None):
22
    """Send an event when something happens with an EVC."""
23 1
    event_name = f"{context}.{name}"
24 1
    event = KytosEvent(name=event_name, content=content)
25 1
    controller.buffers.app.put(event)
26
27
28 1
async def aemit_event(controller, name, content):
29
    """Send an asynchronous event"""
30 1
    event = KytosEvent(name=name, content=content)
31 1
    await controller.buffers.app.aput(event)
32
33
34 1
def compare_endpoint_trace(endpoint, vlan, trace):
35
    """Compare and endpoint with a trace step."""
36 1
    if vlan and "vlan" in trace:
37 1
        return (
38
            endpoint.switch.dpid == trace["dpid"]
39
            and endpoint.port_number == trace["port"]
40
            and vlan == trace["vlan"]
41
        )
42 1
    return (
43
        endpoint.switch.dpid == trace["dpid"]
44
        and endpoint.port_number == trace["port"]
45
    )
46
47
48 1
def map_dl_vlan(value: Union[str, int]) -> bool:
49
    """Map dl_vlan value with the following criteria:
50
    dl_vlan = untagged or 0 -> None
51
    dl_vlan = any or "4096/4096" -> 1
52
    dl_vlan = "num1/num2" -> int in [1, 4095]"""
53 1
    special_untagged = {"untagged", 0}
54 1
    if value in special_untagged:
55 1
        return None
56 1
    special_any = {"any": 1, "4096/4096": 1}
57 1
    value = special_any.get(value, value)
58 1
    if isinstance(value, int):
59 1
        return value
60 1
    value, mask = map(int, value.split('/'))
61 1
    return value & (mask & 4095)
62
63
64 1
def compare_uni_out_trace(
65
    tag_value: Union[None, int, str],
66
    interface: Interface,
67
    trace: dict
68
) -> bool:
69
    """Check if the trace last step (output) matches the UNI attributes."""
70
    # keep compatibility for old versions of sdntrace-cp
71 1
    if "out" not in trace:
72 1
        return True
73 1
    if not isinstance(trace["out"], dict):
74 1
        return False
75 1
    uni_vlan = map_dl_vlan(tag_value) if tag_value else None
76 1
    return (
77
        interface.port_number == trace["out"].get("port")
78
        and uni_vlan == trace["out"].get("vlan")
79
    )
80
81
82 1
def max_power2_divisor(number: int, limit: int = 4096) -> int:
83
    """Get the max power of 2 that is divisor of number"""
84 1
    while number % limit > 0:
85 1
        limit //= 2
86 1
    return limit
87
88
89 1
def get_vlan_tags_and_masks(tag_ranges: list[list[int]]) -> list[int, str]:
90
    """Get a list of vlan/mask pairs for a given list of ranges."""
91 1
    masks_list = []
92 1
    for start, end in tag_ranges:
93 1
        limit = end + 1
94 1
        while start < limit:
95 1
            divisor = max_power2_divisor(start)
96 1
            while divisor > limit - start:
97 1
                divisor //= 2
98 1
            mask = 4096 - divisor
99 1
            if mask == 4095:
100 1
                masks_list.append(start)
101
            else:
102 1
                masks_list.append(f"{start}/{mask}")
103 1
            start += divisor
104 1
    return masks_list
105
106
107 1
def check_disabled_component(uni_a: UNI, uni_z: UNI):
108
    """Check if a switch or an interface is disabled"""
109 1
    if uni_a.interface.switch != uni_z.interface.switch:
110 1
        return
111 1
    if uni_a.interface.switch.status == EntityStatus.DISABLED:
112 1
        dpid = uni_a.interface.switch.dpid
113 1
        raise DisabledSwitch(f"Switch {dpid} is disabled")
114 1
    if uni_a.interface.status == EntityStatus.DISABLED:
115 1
        id_ = uni_a.interface.id
116 1
        raise DisabledSwitch(f"Interface {id_} is disabled")
117 1
    if uni_z.interface.status == EntityStatus.DISABLED:
118 1
        id_ = uni_z.interface.id
119 1
        raise DisabledSwitch(f"Interface {id_} is disabled")
120
121
122 1
def make_uni_list(list_circuits: list) -> list:
123
    """Make uni list to be sent to sdntrace"""
124 1
    uni_list = []
125 1
    for circuit in list_circuits:
126 1
        if isinstance(circuit.uni_a.user_tag, TAGRange):
127
            # TAGRange value from uni_a and uni_z are currently mirrored
128 1
            mask_list = (circuit.uni_a.user_tag.mask_list or
129
                         circuit.uni_z.user_tag.mask_list)
130 1
            for mask in mask_list:
131 1
                uni_list.append((circuit.uni_a.interface, mask))
132 1
                uni_list.append((circuit.uni_z.interface, mask))
133
        else:
134 1
            tag_a = None
135 1
            if circuit.uni_a.user_tag:
136 1
                tag_a = circuit.uni_a.user_tag.value
137 1
            uni_list.append(
138
                (circuit.uni_a.interface, tag_a)
139
            )
140 1
            tag_z = None
141 1
            if circuit.uni_z.user_tag:
142 1
                tag_z = circuit.uni_z.user_tag.value
143 1
            uni_list.append(
144
                (circuit.uni_z.interface, tag_z)
145
            )
146
    return uni_list
147