Passed
Pull Request — master (#35)
by Vinicius
02:34
created

build.utils   A

Complexity

Total Complexity 35

Size/Duplication

Total Lines 204
Duplicated Lines 0 %

Test Coverage

Coverage 45.05%

Importance

Changes 0
Metric Value
eloc 114
dl 0
loc 204
ccs 41
cts 91
cp 0.4505
rs 9.6
c 0
b 0
f 0
wmc 35

16 Functions

Rating   Name   Duplication   Size   Complexity  
A get_id_from_cookie() 0 4 1
A get_new_cookie() 0 3 1
B modify_actions() 0 26 6
A set_instructions_from_actions() 0 14 2
A set_owner() 0 4 1
A set_new_cookie() 0 6 1
A get_evc_unis() 0 16 1
A set_table_group() 0 4 1
A set_priority() 0 9 3
A get_evc_with_telemetry() 0 9 3
A get_cookie() 0 7 1
A get_found_stored_flows() 0 8 3
A is_intra_switch_evc() 0 6 2
B get_proxy_port_or_raise() 0 29 5
A has_int_enabled() 0 7 1
A add_to_apply_actions() 0 6 3
1
""" Support function for main.py """
2
3 1
from napps.kytos.telemetry_int import settings
4
5 1
from kytos.core import Controller
6
7 1
from .exceptions import FlowsNotFound, PriorityOverflow, ProxyPortNotFound
8 1
from .kytos_api_helper import get_evcs
9 1
from .kytos_api_helper import get_stored_flows as _get_stored_flows
10 1
from .proxy_port import ProxyPort
11
12
13 1
async def get_found_stored_flows(cookies: list[int] = None) -> dict[int, list[dict]]:
14
    """Get stored flows ensuring that flows are found."""
15
    cookies = cookies or []
16
    stored_flows = await _get_stored_flows()
17
    for cookie, flows in stored_flows.items():
18
        if not flows:
19
            raise FlowsNotFound(get_id_from_cookie(cookie))
20
    return stored_flows
21
22
23 1
def get_evc_with_telemetry() -> dict:
24
    """Retrieve the list of EVC IDs and list those with
25
    metadata {"telemetry": {"enabled": true}}"""
26
27
    evc_ids = {"evcs_with_telemetry": []}
28
    for evc in get_evcs().values():
29
        if has_int_enabled(evc):
30
            evc_ids["evcs_with_telemetry"].append(evc["id"])
31
    return evc_ids
32
33
34 1
def has_int_enabled(evc: dict) -> bool:
35
    """Check if evc has telemetry."""
36 1
    return (
37
        "telemetry" in evc["metadata"]
38
        and isinstance(evc["metadata"]["telemetry"], dict)
39
        and "enabled" in evc["metadata"]["telemetry"]
40
        and evc["metadata"]["telemetry"]["enabled"]
41
    )
42
43
44 1
def get_evc_unis(evc: dict) -> tuple[dict, dict]:
45
    """Parse evc for unis."""
46 1
    uni_a_split = evc["uni_a"]["interface_id"].split(":")
47 1
    uni_z_split = evc["uni_z"]["interface_id"].split(":")
48 1
    return (
49
        {
50
            "interface_id": evc["uni_a"]["interface_id"],
51
            "tag": evc["uni_a"].get("tag", {}),
52
            "port_number": int(uni_a_split[-1]),
53
            "switch": ":".join(uni_a_split[:-1]),
54
        },
55
        {
56
            "interface_id": evc["uni_z"]["interface_id"],
57
            "tag": evc["uni_z"].get("tag", {}),
58
            "port_number": int(uni_z_split[-1]),
59
            "switch": ":".join(uni_z_split[:-1]),
60
        },
61
    )
62
63
64 1
def get_proxy_port_or_raise(
65
    controller: Controller, intf_id: str, evc_id: str
66
) -> ProxyPort:
67
    """Return a ProxyPort assigned to a UNI or raise."""
68
69
    interface = controller.get_interface_by_id(intf_id)
70
    if not interface:
71
        raise ProxyPortNotFound(evc_id, f"UNI interface {intf_id} not found")
72
73
    if "proxy_port" not in interface.metadata:
74
        raise ProxyPortNotFound(evc_id, f"proxy_port metadata not found in {intf_id}")
75
76
    source_intf = interface.switch.get_interface_by_port_no(
77
        interface.metadata.get("proxy_port")
78
    )
79
    if not source_intf:
80
        raise ProxyPortNotFound(
81
            evc_id,
82
            f"proxy_port of {intf_id} source interface not found",
83
        )
84
85
    pp = ProxyPort(controller, source_intf)
86
87
    if not pp.destination:
88
        raise ProxyPortNotFound(
89
            evc_id,
90
            f"proxy_port of {intf_id} destination interface not found",
91
        )
92
    return pp
93
94
95 1
def add_to_apply_actions(instructions, new_instruction, position):
96
    """Create the actions list"""
97
    for instruction in instructions:
98
        if instruction["instruction_type"] == "apply_actions":
99
            instruction["actions"].insert(position, new_instruction)
100
    return instructions
101
102
103 1
def get_cookie(evc_id: str, cookie_prefix: int) -> int:
104
    """Return the cookie integer from evc id.
105
106
    cookie_prefix is supposed to be the reserved byte value that
107
    mef_eline or telemetry_int uses.
108
    """
109
    return int(evc_id, 16) + (cookie_prefix << 56)
110
111
112 1
def get_id_from_cookie(cookie: int) -> str:
113
    """Return the evc id given a cookie value."""
114 1
    evc_id = cookie & 0xFFFFFFFFFFFFFF
115 1
    return f"{evc_id:x}"
116
117
118 1
def is_intra_switch_evc(evc):
119
    """Returns if EVC is intra-switch (two UNIs on the same switch)"""
120 1
    uni_a, uni_z = get_evc_unis(evc)
121 1
    if uni_a["switch"] == uni_z["switch"]:
122 1
        return True
123 1
    return False
124
125
126 1
def modify_actions(actions, actions_to_change, remove=True):
127
    """Change the current actions
128
    If remove == True, remove actions_to_change from actions.
129
    If remove == False, keep actions_to_change, remove everything else
130
    Args:
131
        actions = current list of actions on a flow
132
        actions_to_change = list of actions as strings
133
        remove = boolean
134
    Return
135
        actions
136
    """
137
    indexes = []
138
    count = 0
139
140
    for action in actions:
141
        if remove:
142
            if action["action_type"] in actions_to_change:
143
                indexes.append(count)
144
        else:
145
            if action["action_type"] not in actions_to_change:
146
                indexes.append(count)
147
        count += 1
148
149
    for index in sorted(indexes, reverse=True):
150
        del actions[index]
151
    return actions
152
153
154 1
def set_priority(flow: dict, evc_id: str = "") -> dict:
155
    """Find a suitable priority number. EP031 describes 100 as the addition."""
156
    if flow["flow"]["priority"] + 100 < (2**16 - 2):
157
        flow["flow"]["priority"] += 100
158
    elif flow["flow"]["priority"] + 1 < (2**16 - 2):
159
        flow["flow"]["priority"] += 1
160
    else:
161
        raise PriorityOverflow(evc_id, f"Flow {flow} would overflow max priority")
162
    return flow
163
164
165 1
def set_owner(flow: dict) -> dict:
166
    """Set flow owner."""
167
    flow["flow"]["owner"] = "telemetry_int"
168
    return flow
169
170
171 1
def set_table_group(flow: dict, table_group="base") -> dict:
172
    """Set flow owner."""
173
    flow["flow"]["table_group"] = table_group
174
    return flow
175
176
177 1
def get_new_cookie(cookie: int, cookie_prefix=settings.INT_COOKIE_PREFIX) -> int:
178
    """Convert from mef-eline cookie by replacing the most significant byte."""
179 1
    return (cookie & 0xFFFFFFFFFFFFFF) + (cookie_prefix << 56)
180
181
182 1
def set_new_cookie(flow: dict) -> dict:
183
    """Set new cookie."""
184 1
    flow["flow"]["cookie"] = get_new_cookie(
185
        flow["flow"]["cookie"], cookie_prefix=settings.INT_COOKIE_PREFIX
186
    )
187 1
    return flow
188
189
190 1
def set_instructions_from_actions(flow: dict) -> dict:
191
    """Get intructions or convert from actions."""
192 1
    if "instructions" in flow["flow"]:
193 1
        return flow
194
195 1
    instructions = [
196
        {
197
            "instruction_type": "apply_actions",
198
            "actions": flow["flow"].get("actions", []),
199
        }
200
    ]
201 1
    flow["flow"].pop("actions", None)
202 1
    flow["flow"]["instructions"] = instructions
203
    return flow
204