Passed
Pull Request — master (#321)
by
unknown
03:36
created

build.utils   A

Complexity

Total Complexity 19

Size/Duplication

Total Lines 99
Duplicated Lines 0 %

Test Coverage

Coverage 100%

Importance

Changes 0
Metric Value
eloc 63
dl 0
loc 99
rs 10
c 0
b 0
f 0
ccs 47
cts 47
cp 1
wmc 19

9 Functions

Rating   Name   Duplication   Size   Complexity  
A emit_event() 0 5 1
A map_evc_event_content() 0 9 1
A compare_endpoint_trace() 0 11 3
A max_power2_divisor() 0 5 2
A map_dl_vlan() 0 14 3
A notify_link_available_tags() 0 5 1
A compare_uni_out_trace() 0 11 4
A aemit_event() 0 4 1
A get_vlan_tags_and_masks() 0 11 3
1
"""Utility functions."""
2 1
from kytos.core.events import KytosEvent
3
4
5 1
def map_evc_event_content(evc, **kwargs):
6
    """Returns a set of values from evc to be used for content"""
7 1
    return kwargs | {"evc_id": evc.id,
8
                     "name": evc.name,
9
                     "metadata": evc.metadata,
10
                     "active": evc._active,
11
                     "enabled": evc._enabled,
12
                     "uni_a": evc.uni_a.as_dict(),
13
                     "uni_z": evc.uni_z.as_dict()}
14
15
16 1
def emit_event(controller, name, context="kytos/mef_eline", content=None):
17
    """Send an event when something happens with an EVC."""
18 1
    event_name = f"{context}.{name}"
19 1
    event = KytosEvent(name=event_name, content=content)
20 1
    controller.buffers.app.put(event)
21
22
23 1
def aemit_event(controller, name, content):
24
    """Send an asynchronous event"""
25 1
    event = KytosEvent(name=name, content=content)
26 1
    controller.buffers.app.aput(event)
27
28
29 1
def notify_link_available_tags(controller, link, src_func=None):
30
    """Notify link available tags."""
31 1
    emit_event(controller, "link_available_tags", content={
32
        "link": link,
33
        "src_func": src_func
34
    })
35
36
37 1
def compare_endpoint_trace(endpoint, vlan, trace):
38
    """Compare and endpoint with a trace step."""
39 1
    if vlan and "vlan" in trace:
40 1
        return (
41
            endpoint.switch.dpid == trace["dpid"]
42
            and endpoint.port_number == trace["port"]
43
            and vlan == trace["vlan"]
44
        )
45 1
    return (
46
        endpoint.switch.dpid == trace["dpid"]
47
        and endpoint.port_number == trace["port"]
48
    )
49
50
51 1
def map_dl_vlan(value):
52
    """Map dl_vlan value with the following criteria:
53
    dl_vlan = untagged or 0 -> None
54
    dl_vlan = any or "4096/4096" -> 1
55
    dl_vlan = "num1/num2" -> int in [1, 4095]"""
56 1
    special_untagged = {"untagged", 0}
57 1
    if value in special_untagged:
58 1
        return None
59 1
    special_any = {"any": 1, "4096/4096": 1}
60 1
    value = special_any.get(value, value)
61 1
    if isinstance(value, int):
62 1
        return value
63 1
    value, mask = map(int, value.split('/'))
64 1
    return value & (mask & 4095)
65
66
67 1
def compare_uni_out_trace(uni, trace):
68
    """Check if the trace last step (output) matches the UNI attributes."""
69
    # keep compatibility for old versions of sdntrace-cp
70 1
    if "out" not in trace:
71 1
        return True
72 1
    if not isinstance(trace["out"], dict):
73 1
        return False
74 1
    uni_vlan = map_dl_vlan(uni.user_tag.value) if uni.user_tag else None
75 1
    return (
76
        uni.interface.port_number == trace["out"].get("port")
77
        and uni_vlan == trace["out"].get("vlan")
78
    )
79
80
81 1
def max_power2_divisor(number: int, limit: int = 4096) -> int:
82
    """Get the max power of 2 that is divisor of number"""
83 1
    while number % limit > 0:
84 1
        limit //= 2
85 1
    return limit
86
87
88 1
def get_vlan_tags_and_masks(start: int, end: int) -> list[tuple[int, int]]:
89
    """Get the minimum number of vlan/mask pairs for a given range."""
90 1
    limit = end + 1
91 1
    tags_and_masks = []
92 1
    while start < limit:
93 1
        divisor = max_power2_divisor(start)
94 1
        while divisor > limit - start:
95 1
            divisor //= 2
96 1
        tags_and_masks.append((start, 4096-divisor))
97 1
        start += divisor
98
    return tags_and_masks
99