1
|
|
|
"""Utility functions.""" |
2
|
1 |
|
from kytos.core.events import KytosEvent |
3
|
|
|
|
4
|
|
|
|
5
|
1 |
|
def map_evc_event_content(evc, **kwargs): |
6
|
|
|
"""Returns a set of values from evc to be used for content""" |
7
|
1 |
|
return kwargs | {"evc_id": evc.id, |
8
|
|
|
"name": evc.name, |
9
|
|
|
"metadata": evc.metadata, |
10
|
|
|
"active": evc._active, |
11
|
|
|
"enabled": evc._enabled, |
12
|
|
|
"uni_a": evc.uni_a.as_dict(), |
13
|
|
|
"uni_z": evc.uni_z.as_dict()} |
14
|
|
|
|
15
|
|
|
|
16
|
1 |
|
def emit_event(controller, name, context="kytos/mef_eline", content=None): |
17
|
|
|
"""Send an event when something happens with an EVC.""" |
18
|
1 |
|
event_name = f"{context}.{name}" |
19
|
1 |
|
event = KytosEvent(name=event_name, content=content) |
20
|
1 |
|
controller.buffers.app.put(event) |
21
|
|
|
|
22
|
|
|
|
23
|
1 |
|
def notify_link_available_tags(controller, link, src_func=None): |
24
|
|
|
"""Notify link available tags.""" |
25
|
1 |
|
emit_event(controller, "link_available_tags", content={ |
26
|
|
|
"link": link, |
27
|
|
|
"src_func": src_func |
28
|
|
|
}) |
29
|
|
|
|
30
|
|
|
|
31
|
1 |
|
def compare_endpoint_trace(endpoint, vlan, trace): |
32
|
|
|
"""Compare and endpoint with a trace step.""" |
33
|
1 |
|
if vlan and "vlan" in trace: |
34
|
1 |
|
return ( |
35
|
|
|
endpoint.switch.dpid == trace["dpid"] |
36
|
|
|
and endpoint.port_number == trace["port"] |
37
|
|
|
and vlan == trace["vlan"] |
38
|
|
|
) |
39
|
1 |
|
return ( |
40
|
|
|
endpoint.switch.dpid == trace["dpid"] |
41
|
|
|
and endpoint.port_number == trace["port"] |
42
|
|
|
) |
43
|
|
|
|
44
|
|
|
|
45
|
1 |
|
def map_dl_vlan(value): |
46
|
|
|
"""Map dl_vlan value with the following criteria: |
47
|
|
|
dl_vlan = untagged or 0 -> None |
48
|
|
|
dl_vlan = any or "4096/4096" -> 1 |
49
|
|
|
dl_vlan = "num1/num2" -> int in [1, 4095]""" |
50
|
1 |
|
special_untagged = {"untagged", 0} |
51
|
1 |
|
if value in special_untagged: |
52
|
1 |
|
return None |
53
|
1 |
|
special_any = {"any": 1, "4096/4096": 1} |
54
|
1 |
|
value = special_any.get(value, value) |
55
|
1 |
|
if isinstance(value, int): |
56
|
1 |
|
return value |
57
|
1 |
|
value, mask = map(int, value.split('/')) |
58
|
1 |
|
return value & (mask & 4095) |
59
|
|
|
|
60
|
|
|
|
61
|
1 |
|
def compare_uni_out_trace(uni, trace): |
62
|
|
|
"""Check if the trace last step (output) matches the UNI attributes.""" |
63
|
|
|
# keep compatibility for old versions of sdntrace-cp |
64
|
1 |
|
if "out" not in trace: |
65
|
1 |
|
return True |
66
|
1 |
|
if not isinstance(trace["out"], dict): |
67
|
1 |
|
return False |
68
|
1 |
|
uni_vlan = map_dl_vlan(uni.user_tag.value) if uni.user_tag else None |
69
|
1 |
|
return ( |
70
|
|
|
uni.interface.port_number == trace["out"].get("port") |
71
|
|
|
and uni_vlan == trace["out"].get("vlan") |
72
|
|
|
) |
73
|
|
|
|
74
|
|
|
|
75
|
|
|
def max_power2_divisor(number: int, limit: int = 4096) -> int: |
76
|
|
|
"""Get the max power of 2 that is divisor of number""" |
77
|
|
|
while number % limit > 0: |
78
|
|
|
limit //= 2 |
79
|
|
|
return limit |
80
|
|
|
|
81
|
|
|
|
82
|
|
|
def get_vlan_tags_and_masks(start: int, end: int) -> list[tuple[int, int]]: |
83
|
|
|
"""Get the minimum number of vlan/mask pairs for a given range.""" |
84
|
|
|
limit = end + 1 |
85
|
|
|
tags_and_masks = [] |
86
|
|
|
while start < limit: |
87
|
|
|
divisor = max_power2_divisor(start) |
88
|
|
|
while divisor > limit - start: |
89
|
|
|
divisor //= 2 |
90
|
|
|
tags_and_masks.append((start, 4096-divisor)) |
91
|
|
|
start += divisor |
92
|
|
|
return tags_and_masks |
93
|
|
|
|