1
|
|
|
"""Utility functions.""" |
2
|
1 |
|
from kytos.core.events import KytosEvent |
3
|
|
|
|
4
|
|
|
|
5
|
1 |
|
def map_evc_event_content(evc, **kwargs): |
6
|
|
|
"""Returns a set of values from evc to be used for content""" |
7
|
1 |
|
return kwargs | {"evc_id": evc.id, |
8
|
|
|
"name": evc.name, |
9
|
|
|
"metadata": evc.metadata, |
10
|
|
|
"active": evc._active, |
11
|
|
|
"enabled": evc._enabled, |
12
|
|
|
"uni_a": evc.uni_a.as_dict(), |
13
|
|
|
"uni_z": evc.uni_z.as_dict()} |
14
|
|
|
|
15
|
|
|
|
16
|
1 |
|
def emit_event(controller, name, context="kytos/mef_eline", content=None): |
17
|
|
|
"""Send an event when something happens with an EVC.""" |
18
|
1 |
|
event_name = f"{context}.{name}" |
19
|
1 |
|
event = KytosEvent(name=event_name, content=content) |
20
|
1 |
|
controller.buffers.app.put(event) |
21
|
|
|
|
22
|
|
|
|
23
|
1 |
|
def notify_link_available_tags(controller, link, src_func=None): |
24
|
|
|
"""Notify link available tags.""" |
25
|
1 |
|
emit_event(controller, "link_available_tags", content={ |
26
|
|
|
"link": link, |
27
|
|
|
"src_func": src_func |
28
|
|
|
}) |
29
|
|
|
|
30
|
|
|
|
31
|
1 |
|
def compare_endpoint_trace(endpoint, vlan, trace): |
32
|
|
|
"""Compare and endpoint with a trace step.""" |
33
|
1 |
|
if vlan and "vlan" in trace: |
34
|
1 |
|
return ( |
35
|
|
|
endpoint.switch.dpid == trace["dpid"] |
36
|
|
|
and endpoint.port_number == trace["port"] |
37
|
|
|
and vlan == trace["vlan"] |
38
|
|
|
) |
39
|
1 |
|
return ( |
40
|
|
|
endpoint.switch.dpid == trace["dpid"] |
41
|
|
|
and endpoint.port_number == trace["port"] |
42
|
|
|
) |
43
|
|
|
|
44
|
|
|
|
45
|
1 |
|
def compare_uni_out_trace(uni, trace): |
46
|
|
|
"""Check if the trace last step (output) matches the UNI attributes.""" |
47
|
|
|
# keep compatibility for old versions of sdntrace-cp |
48
|
1 |
|
if "out" not in trace: |
49
|
1 |
|
return True |
50
|
1 |
|
if not isinstance(trace["out"], dict): |
51
|
1 |
|
return False |
52
|
1 |
|
uni_vlan = uni.user_tag.value if uni.user_tag else None |
53
|
1 |
|
return ( |
54
|
|
|
uni.interface.port_number == trace["out"].get("port") |
55
|
|
|
and uni_vlan == trace["out"].get("vlan") |
56
|
|
|
) |
57
|
|
|
|