Passed
Push — master ( 4bf6f4...81fe6d )
by Vinicius
02:40 queued 17s
created

build.utils   A

Complexity

Total Complexity 29

Size/Duplication

Total Lines 163
Duplicated Lines 0 %

Test Coverage

Coverage 100%

Importance

Changes 0
Metric Value
eloc 89
dl 0
loc 163
ccs 67
cts 67
cp 1
rs 10
c 0
b 0
f 0
wmc 29

14 Functions

Rating   Name   Duplication   Size   Complexity  
A get_id_from_cookie() 0 4 1
A get_new_cookie() 0 3 1
A modify_actions() 0 20 5
A set_instructions_from_actions() 0 14 2
A get_evc_unis() 0 16 1
A set_new_cookie() 0 6 1
A set_owner() 0 4 1
A set_priority() 0 9 3
A get_cookie() 0 7 1
A get_found_stored_flows() 0 8 3
A is_intra_switch_evc() 0 6 2
A has_int_enabled() 0 8 1
A add_to_apply_actions() 0 8 3
A get_svlan_dpid_link() 0 10 4
1
""" Support function for main.py """
2
3 1
from typing import Optional
4
5 1
from napps.kytos.telemetry_int import settings
6
7 1
from .exceptions import FlowsNotFound, PriorityOverflow
8 1
from .kytos_api_helper import get_stored_flows as _get_stored_flows
9
10
11 1
async def get_found_stored_flows(cookies: list[int] = None) -> dict[int, list[dict]]:
12
    """Get stored flows ensuring that flows are found."""
13 1
    cookies = cookies or []
14 1
    stored_flows = await _get_stored_flows(cookies)
15 1
    for cookie, flows in stored_flows.items():
16 1
        if not flows:
17 1
            raise FlowsNotFound(get_id_from_cookie(cookie))
18 1
    return stored_flows
19
20
21 1
def has_int_enabled(evc: dict) -> bool:
22
    """Check if evc has telemetry."""
23 1
    return (
24
        "metadata" in evc
25
        and "telemetry" in evc["metadata"]
26
        and isinstance(evc["metadata"]["telemetry"], dict)
27
        and "enabled" in evc["metadata"]["telemetry"]
28
        and evc["metadata"]["telemetry"]["enabled"]
29
    )
30
31
32 1
def get_evc_unis(evc: dict) -> tuple[dict, dict]:
33
    """Parse evc for unis."""
34 1
    uni_a_split = evc["uni_a"]["interface_id"].split(":")
35 1
    uni_z_split = evc["uni_z"]["interface_id"].split(":")
36 1
    return (
37
        {
38
            "interface_id": evc["uni_a"]["interface_id"],
39
            "tag": evc["uni_a"].get("tag", {}),
40
            "port_number": int(uni_a_split[-1]),
41
            "switch": ":".join(uni_a_split[:-1]),
42
        },
43
        {
44
            "interface_id": evc["uni_z"]["interface_id"],
45
            "tag": evc["uni_z"].get("tag", {}),
46
            "port_number": int(uni_z_split[-1]),
47
            "switch": ":".join(uni_z_split[:-1]),
48
        },
49
    )
50
51
52 1
def add_to_apply_actions(
53
    instructions: list[dict], new_instruction: dict, position: int
54
):
55
    """Create the actions list"""
56 1
    for instruction in instructions:
57 1
        if instruction["instruction_type"] == "apply_actions":
58 1
            instruction["actions"].insert(position, new_instruction)
59 1
    return instructions
60
61
62 1
def get_cookie(evc_id: str, cookie_prefix: int) -> int:
63
    """Return the cookie integer from evc id.
64
65
    cookie_prefix is supposed to be the reserved byte value that
66
    mef_eline or telemetry_int uses.
67
    """
68 1
    return int(evc_id, 16) + (cookie_prefix << 56)
69
70
71 1
def get_id_from_cookie(cookie: int) -> str:
72
    """Return the evc id given a cookie value."""
73 1
    evc_id = cookie & 0xFFFFFFFFFFFFFF
74 1
    return f"{evc_id:x}"
75
76
77 1
def is_intra_switch_evc(evc):
78
    """Returns if EVC is intra-switch (two UNIs on the same switch)"""
79 1
    uni_a, uni_z = get_evc_unis(evc)
80 1
    if uni_a["switch"] == uni_z["switch"]:
81 1
        return True
82 1
    return False
83
84
85 1
def modify_actions(actions: list[dict], actions_to_change: list[str], remove=True):
86
    """Change the current actions
87
    If remove == True, remove actions_to_change from actions.
88
    If remove == False, keep actions_to_change, remove everything else
89
    Args:
90
        actions = current list of actions on a flow
91
        actions_to_change = list of actions as strings
92
        remove = boolean
93
    Return
94
        actions
95
    """
96 1
    del_indexes = set()
97 1
    for index, action in enumerate(actions):
98 1
        if remove:
99 1
            if action["action_type"] in actions_to_change:
100 1
                del_indexes.add(index)
101
        else:
102 1
            if action["action_type"] not in actions_to_change:
103 1
                del_indexes.add(index)
104 1
    return [action for i, action in enumerate(actions) if i not in del_indexes]
105
106
107 1
def set_priority(flow: dict, evc_id: str = "") -> dict:
108
    """Find a suitable priority number. EP031 describes 100 as the addition."""
109 1
    if flow["flow"]["priority"] + 100 < (2**16 - 2):
110 1
        flow["flow"]["priority"] += 100
111 1
    elif flow["flow"]["priority"] + 1 < (2**16 - 2):
112 1
        flow["flow"]["priority"] += 1
113
    else:
114 1
        raise PriorityOverflow(evc_id, f"Flow {flow} would overflow max priority")
115 1
    return flow
116
117
118 1
def set_owner(flow: dict) -> dict:
119
    """Set flow owner."""
120 1
    flow["flow"]["owner"] = "telemetry_int"
121 1
    return flow
122
123
124 1
def get_new_cookie(cookie: int, cookie_prefix=settings.INT_COOKIE_PREFIX) -> int:
125
    """Convert from mef-eline cookie by replacing the most significant byte."""
126 1
    return (cookie & 0xFFFFFFFFFFFFFF) + (cookie_prefix << 56)
127
128
129 1
def set_new_cookie(flow: dict) -> dict:
130
    """Set new cookie."""
131 1
    flow["flow"]["cookie"] = get_new_cookie(
132
        flow["flow"]["cookie"], cookie_prefix=settings.INT_COOKIE_PREFIX
133
    )
134 1
    return flow
135
136
137 1
def set_instructions_from_actions(flow: dict) -> dict:
138
    """Get intructions or convert from actions."""
139 1
    if "instructions" in flow["flow"]:
140 1
        return flow
141
142 1
    instructions = [
143
        {
144
            "instruction_type": "apply_actions",
145
            "actions": flow["flow"].get("actions", []),
146
        }
147
    ]
148 1
    flow["flow"].pop("actions", None)
149 1
    flow["flow"]["instructions"] = instructions
150 1
    return flow
151
152
153 1
def get_svlan_dpid_link(link: dict, dpid: str) -> Optional[int]:
154
    """Try to get svlan of a link if a dpid matches one of the endpoints."""
155 1
    if any(
156
        (
157
            link["endpoint_a"]["switch"] == dpid and "s_vlan" in link["metadata"],
158
            link["endpoint_b"]["switch"] == dpid and "s_vlan" in link["metadata"],
159
        )
160
    ):
161 1
        return link["metadata"]["s_vlan"]["value"]
162
    return None
163