Passed
Pull Request — master (#304)
by Vinicius
04:58 queued 01:14
created

build.utils.validate()   B

Complexity

Conditions 5

Size

Total Lines 49
Code Lines 37

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 29
CRAP Score 5

Importance

Changes 0
Metric Value
cc 5
eloc 37
nop 1
dl 0
loc 49
ccs 29
cts 29
cp 1
crap 5
rs 8.5253
c 0
b 0
f 0
1
"""Utility functions."""
2 1
from kytos.core.events import KytosEvent
3
4
5 1
def map_evc_event_content(evc, **kwargs):
6
    """Returns a set of values from evc to be used for content"""
7 1
    return kwargs | {"evc_id": evc.id,
8
                     "name": evc.name,
9
                     "metadata": evc.metadata,
10
                     "active": evc._active,
11
                     "enabled": evc._enabled,
12
                     "uni_a": evc.uni_a.as_dict(),
13
                     "uni_z": evc.uni_z.as_dict()}
14
15
16 1
def emit_event(controller, name, context="kytos/mef_eline", content=None):
17
    """Send an event when something happens with an EVC."""
18 1
    event_name = f"{context}.{name}"
19 1
    event = KytosEvent(name=event_name, content=content)
20 1
    controller.buffers.app.put(event)
21
22
23 1
def notify_link_available_tags(controller, link, src_func=None):
24
    """Notify link available tags."""
25 1
    emit_event(controller, "link_available_tags", content={
26
        "link": link,
27
        "src_func": src_func
28
    })
29
30
31 1
def compare_endpoint_trace(endpoint, vlan, trace):
32
    """Compare and endpoint with a trace step."""
33 1
    if vlan and "vlan" in trace:
34 1
        return (
35
            endpoint.switch.dpid == trace["dpid"]
36
            and endpoint.port_number == trace["port"]
37
            and vlan == trace["vlan"]
38
        )
39 1
    return (
40
        endpoint.switch.dpid == trace["dpid"]
41
        and endpoint.port_number == trace["port"]
42
    )
43
44
45 1
def compare_uni_out_trace(uni, trace):
46
    """Check if the trace last step (output) matches the UNI attributes."""
47
    # keep compatibility for old versions of sdntrace-cp
48 1
    if "out" not in trace:
49 1
        return True
50 1
    if not isinstance(trace["out"], dict):
51 1
        return False
52 1
    uni_vlan = uni.user_tag.value if uni.user_tag else None
53 1
    return (
54
        uni.interface.port_number == trace["out"].get("port")
55
        and uni_vlan == trace["out"].get("vlan")
56
    )
57