Passed
Push — master ( 92444a...eafb59 )
by Vinicius
02:09 queued 16s
created

build.utils.get_evc_unis()   A

Complexity

Conditions 1

Size

Total Lines 16
Code Lines 12

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 4
CRAP Score 1

Importance

Changes 0
Metric Value
cc 1
eloc 12
nop 1
dl 0
loc 16
ccs 4
cts 4
cp 1
crap 1
rs 9.8
c 0
b 0
f 0
1
""" Support function for main.py """
2
3 1
from napps.kytos.telemetry_int import settings
4
5 1
from .exceptions import FlowsNotFound, PriorityOverflow
6 1
from .kytos_api_helper import get_stored_flows as _get_stored_flows
7
8
9 1
async def get_found_stored_flows(cookies: list[int] = None) -> dict[int, list[dict]]:
10
    """Get stored flows ensuring that flows are found."""
11 1
    cookies = cookies or []
12 1
    stored_flows = await _get_stored_flows(cookies)
13 1
    for cookie, flows in stored_flows.items():
14 1
        if not flows:
15 1
            raise FlowsNotFound(get_id_from_cookie(cookie))
16 1
    return stored_flows
17
18
19 1
def has_int_enabled(evc: dict) -> bool:
20
    """Check if evc has telemetry."""
21 1
    return (
22
        "metadata" in evc
23
        and "telemetry" in evc["metadata"]
24
        and isinstance(evc["metadata"]["telemetry"], dict)
25
        and "enabled" in evc["metadata"]["telemetry"]
26
        and evc["metadata"]["telemetry"]["enabled"]
27
    )
28
29
30 1
def get_evc_unis(evc: dict) -> tuple[dict, dict]:
31
    """Parse evc for unis."""
32 1
    uni_a_split = evc["uni_a"]["interface_id"].split(":")
33 1
    uni_z_split = evc["uni_z"]["interface_id"].split(":")
34 1
    return (
35
        {
36
            "interface_id": evc["uni_a"]["interface_id"],
37
            "tag": evc["uni_a"].get("tag", {}),
38
            "port_number": int(uni_a_split[-1]),
39
            "switch": ":".join(uni_a_split[:-1]),
40
        },
41
        {
42
            "interface_id": evc["uni_z"]["interface_id"],
43
            "tag": evc["uni_z"].get("tag", {}),
44
            "port_number": int(uni_z_split[-1]),
45
            "switch": ":".join(uni_z_split[:-1]),
46
        },
47
    )
48
49
50 1
def add_to_apply_actions(
51
    instructions: list[dict], new_instruction: dict, position: int
52
):
53
    """Create the actions list"""
54 1
    for instruction in instructions:
55 1
        if instruction["instruction_type"] == "apply_actions":
56 1
            instruction["actions"].insert(position, new_instruction)
57 1
    return instructions
58
59
60 1
def get_cookie(evc_id: str, cookie_prefix: int) -> int:
61
    """Return the cookie integer from evc id.
62
63
    cookie_prefix is supposed to be the reserved byte value that
64
    mef_eline or telemetry_int uses.
65
    """
66 1
    return int(evc_id, 16) + (cookie_prefix << 56)
67
68
69 1
def get_id_from_cookie(cookie: int) -> str:
70
    """Return the evc id given a cookie value."""
71 1
    evc_id = cookie & 0xFFFFFFFFFFFFFF
72 1
    return f"{evc_id:x}"
73
74
75 1
def is_intra_switch_evc(evc):
76
    """Returns if EVC is intra-switch (two UNIs on the same switch)"""
77 1
    uni_a, uni_z = get_evc_unis(evc)
78 1
    if uni_a["switch"] == uni_z["switch"]:
79 1
        return True
80 1
    return False
81
82
83 1
def modify_actions(actions: list[dict], actions_to_change: list[str], remove=True):
84
    """Change the current actions
85
    If remove == True, remove actions_to_change from actions.
86
    If remove == False, keep actions_to_change, remove everything else
87
    Args:
88
        actions = current list of actions on a flow
89
        actions_to_change = list of actions as strings
90
        remove = boolean
91
    Return
92
        actions
93
    """
94 1
    del_indexes = set()
95 1
    for index, action in enumerate(actions):
96 1
        if remove:
97 1
            if action["action_type"] in actions_to_change:
98 1
                del_indexes.add(index)
99
        else:
100 1
            if action["action_type"] not in actions_to_change:
101 1
                del_indexes.add(index)
102 1
    return [action for i, action in enumerate(actions) if i not in del_indexes]
103
104
105 1
def set_priority(flow: dict, evc_id: str = "") -> dict:
106
    """Find a suitable priority number. EP031 describes 100 as the addition."""
107 1
    if flow["flow"]["priority"] + 100 < (2**16 - 2):
108 1
        flow["flow"]["priority"] += 100
109 1
    elif flow["flow"]["priority"] + 1 < (2**16 - 2):
110 1
        flow["flow"]["priority"] += 1
111
    else:
112 1
        raise PriorityOverflow(evc_id, f"Flow {flow} would overflow max priority")
113 1
    return flow
114
115
116 1
def set_owner(flow: dict) -> dict:
117
    """Set flow owner."""
118 1
    flow["flow"]["owner"] = "telemetry_int"
119 1
    return flow
120
121
122 1
def get_new_cookie(cookie: int, cookie_prefix=settings.INT_COOKIE_PREFIX) -> int:
123
    """Convert from mef-eline cookie by replacing the most significant byte."""
124 1
    return (cookie & 0xFFFFFFFFFFFFFF) + (cookie_prefix << 56)
125
126
127 1
def set_new_cookie(flow: dict) -> dict:
128
    """Set new cookie."""
129 1
    flow["flow"]["cookie"] = get_new_cookie(
130
        flow["flow"]["cookie"], cookie_prefix=settings.INT_COOKIE_PREFIX
131
    )
132 1
    return flow
133
134
135 1
def set_instructions_from_actions(flow: dict) -> dict:
136
    """Get intructions or convert from actions."""
137 1
    if "instructions" in flow["flow"]:
138 1
        return flow
139
140 1
    instructions = [
141
        {
142
            "instruction_type": "apply_actions",
143
            "actions": flow["flow"].get("actions", []),
144
        }
145
    ]
146 1
    flow["flow"].pop("actions", None)
147 1
    flow["flow"]["instructions"] = instructions
148
    return flow
149