|
1
|
|
|
"""Utility functions.""" |
|
2
|
1 |
|
from kytos.core.events import KytosEvent |
|
3
|
|
|
|
|
4
|
|
|
|
|
5
|
1 |
|
def map_evc_event_content(evc, **kwargs): |
|
6
|
|
|
"""Returns a set of values from evc to be used for content""" |
|
7
|
1 |
|
return kwargs | {"evc_id": evc.id, |
|
8
|
|
|
"name": evc.name, |
|
9
|
|
|
"metadata": evc.metadata, |
|
10
|
|
|
"active": evc._active, |
|
11
|
|
|
"enabled": evc._enabled, |
|
12
|
|
|
"uni_a": evc.uni_a.as_dict(), |
|
13
|
|
|
"uni_z": evc.uni_z.as_dict()} |
|
14
|
|
|
|
|
15
|
|
|
|
|
16
|
1 |
|
def emit_event(controller, name, context="kytos/mef_eline", content=None): |
|
17
|
|
|
"""Send an event when something happens with an EVC.""" |
|
18
|
1 |
|
event_name = f"{context}.{name}" |
|
19
|
1 |
|
event = KytosEvent(name=event_name, content=content) |
|
20
|
1 |
|
controller.buffers.app.put(event) |
|
21
|
|
|
|
|
22
|
|
|
|
|
23
|
1 |
|
def notify_link_available_tags(controller, link, src_func=None): |
|
24
|
|
|
"""Notify link available tags.""" |
|
25
|
1 |
|
emit_event(controller, "link_available_tags", content={ |
|
26
|
|
|
"link": link, |
|
27
|
|
|
"src_func": src_func |
|
28
|
|
|
}) |
|
29
|
|
|
|
|
30
|
|
|
|
|
31
|
1 |
|
def compare_endpoint_trace(endpoint, vlan, trace): |
|
32
|
|
|
"""Compare and endpoint with a trace step.""" |
|
33
|
1 |
|
if vlan and "vlan" in trace: |
|
34
|
1 |
|
return ( |
|
35
|
|
|
endpoint.switch.dpid == trace["dpid"] |
|
36
|
|
|
and endpoint.port_number == trace["port"] |
|
37
|
|
|
and vlan == trace["vlan"] |
|
38
|
|
|
) |
|
39
|
1 |
|
return ( |
|
40
|
|
|
endpoint.switch.dpid == trace["dpid"] |
|
41
|
|
|
and endpoint.port_number == trace["port"] |
|
42
|
|
|
) |
|
43
|
|
|
|
|
44
|
|
|
|
|
45
|
1 |
|
def map_dl_vlan(value): |
|
46
|
|
|
"""Map dl_vlan value with the following criteria: |
|
47
|
|
|
dl_vlan = untagged or 0 -> None |
|
48
|
|
|
dl_vlan = any or "4096/4096" -> 1 |
|
49
|
|
|
dl_vlan = "num1/num2" -> int in [1, 4095]""" |
|
50
|
1 |
|
special_untagged = {"untagged", 0} |
|
51
|
1 |
|
if value in special_untagged: |
|
52
|
1 |
|
return None |
|
53
|
1 |
|
special_any = {"any": 1, "4096/4096": 1} |
|
54
|
1 |
|
value = special_any.get(value, value) |
|
55
|
1 |
|
if isinstance(value, int): |
|
56
|
1 |
|
return value |
|
57
|
1 |
|
value, mask = map(int, value.split('/')) |
|
58
|
1 |
|
return value & (mask & 4095) |
|
59
|
|
|
|
|
60
|
|
|
|
|
61
|
1 |
|
def compare_uni_out_trace(uni, trace): |
|
62
|
|
|
"""Check if the trace last step (output) matches the UNI attributes.""" |
|
63
|
|
|
# keep compatibility for old versions of sdntrace-cp |
|
64
|
1 |
|
if "out" not in trace: |
|
65
|
1 |
|
return True |
|
66
|
1 |
|
if not isinstance(trace["out"], dict): |
|
67
|
1 |
|
return False |
|
68
|
1 |
|
uni_vlan = map_dl_vlan(uni.user_tag.value) if uni.user_tag else None |
|
69
|
1 |
|
return ( |
|
70
|
|
|
uni.interface.port_number == trace["out"].get("port") |
|
71
|
|
|
and uni_vlan == trace["out"].get("vlan") |
|
72
|
|
|
) |
|
73
|
|
|
|
|
74
|
|
|
|
|
75
|
|
|
def max_power2_divisor(number: int, limit: int = 4096) -> int: |
|
76
|
|
|
"""Get the max power of 2 that is divisor of number""" |
|
77
|
|
|
while number % limit > 0: |
|
78
|
|
|
limit //= 2 |
|
79
|
|
|
return limit |
|
80
|
|
|
|
|
81
|
|
|
|
|
82
|
|
|
def get_vlan_tags_and_masks(start: int, end: int) -> list[tuple[int, int]]: |
|
83
|
|
|
"""Get the minimum number of vlan/mask pairs for a given range.""" |
|
84
|
|
|
limit = end + 1 |
|
85
|
|
|
tags_and_masks = [] |
|
86
|
|
|
while start < limit: |
|
87
|
|
|
divisor = max_power2_divisor(start) |
|
88
|
|
|
while divisor > limit - start: |
|
89
|
|
|
divisor //= 2 |
|
90
|
|
|
tags_and_masks.append((start, 4096-divisor)) |
|
91
|
|
|
start += divisor |
|
92
|
|
|
return tags_and_masks |
|
93
|
|
|
|