Passed
Pull Request — master (#169)
by Vinicius
04:52
created

build.utils.set_priority()   A

Complexity

Conditions 3

Size

Total Lines 9
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 7
CRAP Score 3

Importance

Changes 0
Metric Value
cc 3
eloc 7
nop 2
dl 0
loc 9
ccs 7
cts 7
cp 1
crap 3
rs 10
c 0
b 0
f 0
1
""" Support function for main.py """
2
3 1
from typing import Literal, Optional
4
5 1
from napps.kytos.telemetry_int import settings
6
7 1
from .exceptions import FlowsNotFound, PriorityOverflow
8 1
from .kytos_api_helper import get_stored_flows as _get_stored_flows
9
10
11 1
async def get_found_stored_flows(cookies: list[int] = None) -> dict[int, list[dict]]:
12
    """Get stored flows ensuring that flows are found."""
13 1
    cookies = cookies or []
14 1
    stored_flows = await _get_stored_flows(cookies)
15 1
    for cookie, flows in stored_flows.items():
16 1
        if not flows:
17 1
            raise FlowsNotFound(get_id_from_cookie(cookie))
18 1
    return stored_flows
19
20
21 1
def has_int_enabled(evc: dict) -> bool:
22
    """Check if evc has telemetry."""
23 1
    return (
24
        "metadata" in evc
25
        and "telemetry" in evc["metadata"]
26
        and isinstance(evc["metadata"]["telemetry"], dict)
27
        and "enabled" in evc["metadata"]["telemetry"]
28
        and evc["metadata"]["telemetry"]["enabled"]
29
    )
30
31
32 1
def has_uni_vlan_type(evc: dict, uni_key: Literal["uni_a", "uni_z"]) -> bool:
33
    """Check if a given EVC UNI has vlan type."""
34 1
    try:
35 1
        return evc[uni_key]["tag"]["tag_type"] in (1, "vlan")
36 1
    except (TypeError, KeyError):
37 1
        return False
38
39
40 1
def has_special_dl_vlan(evc: dict, uni_key: Literal["uni_a", "uni_z"]) -> bool:
41
    """Check if a given EVC has expected dl_vlan mask or dl_vlan untagged
42
    match based on its type."""
43 1
    try:
44 1
        return has_uni_vlan_type(evc, uni_key) and (
45
            isinstance(evc[uni_key]["tag"]["value"], list)
46
            or evc[uni_key]["tag"]["value"] == "any"
47
            or evc[uni_key]["tag"]["value"] == "untagged"
48
        )
49
    except (TypeError, KeyError):
50
        return False
51
52
53 1
def has_vlan_translation(evc: dict) -> bool:
54
    """Check if a given EVC has vlan translation."""
55 1
    try:
56 1
        return (
57
            has_uni_vlan_type(evc, "uni_a")
58
            and has_uni_vlan_type(evc, "uni_z")
59
            and isinstance(evc["uni_a"]["tag"]["value"], int)
60
            and isinstance(evc["uni_z"]["tag"]["value"], int)
61
            and evc["uni_a"]["tag"]["value"] != evc["uni_z"]["tag"]["value"]
62
        )
63
    except (TypeError, KeyError):
64
        return False
65
66
67 1
def has_qinq(evc: dict) -> bool:
68
    """Check if an EVC has qinq."""
69 1
    return not has_vlan_translation(evc)
70
71
72 1
def set_proxy_port_value(evc: dict, proxy_port_enabled: Optional[bool] = None) -> dict:
73
    """Set proxy_port_enabled metadata value for an existing EVC."""
74 1
    if not evc or not isinstance(evc, dict):
75 1
        return evc
76 1
    if "metadata" not in evc:
77
        evc["metadata"] = {}
78 1
    evc["metadata"]["proxy_port_enabled"] = proxy_port_enabled
79 1
    return evc
80
81
82 1
def get_evc_proxy_port_value(evc: dict) -> Optional[bool]:
83
    """Get proxy_port_enabled from EVC metadata."""
84 1
    try:
85 1
        return evc["metadata"]["proxy_port_enabled"]
86 1
    except (KeyError, TypeError):
87 1
        return None
88
89
90 1
def get_evc_unis(evc: dict) -> tuple[dict, dict]:
91
    """Parse evc for unis."""
92 1
    uni_a_split = evc["uni_a"]["interface_id"].split(":")
93 1
    uni_z_split = evc["uni_z"]["interface_id"].split(":")
94 1
    return (
95
        {
96
            "interface_id": evc["uni_a"]["interface_id"],
97
            "tag": evc["uni_a"].get("tag", {}),
98
            "port_number": int(uni_a_split[-1]),
99
            "switch": ":".join(uni_a_split[:-1]),
100
        },
101
        {
102
            "interface_id": evc["uni_z"]["interface_id"],
103
            "tag": evc["uni_z"].get("tag", {}),
104
            "port_number": int(uni_z_split[-1]),
105
            "switch": ":".join(uni_z_split[:-1]),
106
        },
107
    )
108
109
110 1
def add_to_apply_actions(
111
    instructions: list[dict], new_instruction: dict, position: int
112
):
113
    """Create the actions list"""
114 1
    for instruction in instructions:
115 1
        if instruction["instruction_type"] == "apply_actions":
116 1
            instruction["actions"].insert(position, new_instruction)
117 1
    return instructions
118
119
120 1
def has_instruction_and_action_type(
121
    instructions: list[dict], instruction_type: str, action_type: str
122
) -> bool:
123
    """Check if any of the instructions has a given type and action type."""
124 1
    for instruction in instructions:
125 1
        if (
126
            instruction["instruction_type"] != instruction_type
127
            or "actions" not in instruction
128
        ):
129 1
            continue
130 1
        for action in instruction["actions"]:
131 1
            if "action_type" in action and action["action_type"] == action_type:
132 1
                return True
133 1
    return False
134
135
136 1
def get_cookie(evc_id: str, cookie_prefix: int) -> int:
137
    """Return the cookie integer from evc id.
138
139
    cookie_prefix is supposed to be the reserved byte value that
140
    mef_eline or telemetry_int uses.
141
    """
142 1
    return int(evc_id, 16) + (cookie_prefix << 56)
143
144
145 1
def get_id_from_cookie(cookie: int) -> str:
146
    """Return the evc id given a cookie value."""
147 1
    evc_id = cookie & 0xFFFFFFFFFFFFFF
148 1
    return f"{evc_id:x}".zfill(14)
149
150
151 1
def is_intra_switch_evc(evc):
152
    """Returns if EVC is intra-switch (two UNIs on the same switch)"""
153 1
    uni_a, uni_z = get_evc_unis(evc)
154 1
    if uni_a["switch"] == uni_z["switch"]:
155 1
        return True
156 1
    return False
157
158
159 1
def modify_actions(actions: list[dict], actions_to_change: list[str], remove=True):
160
    """Change the current actions
161
    If remove == True, remove actions_to_change from actions.
162
    If remove == False, keep actions_to_change, remove everything else
163
    Args:
164
        actions = current list of actions on a flow
165
        actions_to_change = list of actions as strings
166
        remove = boolean
167
    Return
168
        actions
169
    """
170 1
    del_indexes = set()
171 1
    for index, action in enumerate(actions):
172 1
        if remove:
173 1
            if action["action_type"] in actions_to_change:
174 1
                del_indexes.add(index)
175
        else:
176 1
            if action["action_type"] not in actions_to_change:
177 1
                del_indexes.add(index)
178 1
    return [action for i, action in enumerate(actions) if i not in del_indexes]
179
180
181 1
def set_priority(flow: dict, evc_id: str = "") -> dict:
182
    """Find a suitable priority number. EP031 describes 100 as the addition."""
183 1
    if flow["flow"]["priority"] + 100 < (2**16 - 2):
184 1
        flow["flow"]["priority"] += 100
185 1
    elif flow["flow"]["priority"] + 1 < (2**16 - 2):
186 1
        flow["flow"]["priority"] += 1
187
    else:
188 1
        raise PriorityOverflow(evc_id, f"Flow {flow} would overflow max priority")
189 1
    return flow
190
191
192 1
def set_owner(flow: dict) -> dict:
193
    """Set flow owner."""
194 1
    flow["flow"]["owner"] = "telemetry_int"
195 1
    return flow
196
197
198 1
def get_new_cookie(cookie: int, cookie_prefix=settings.INT_COOKIE_PREFIX) -> int:
199
    """Convert from mef-eline cookie by replacing the most significant byte."""
200 1
    return (cookie & 0xFFFFFFFFFFFFFF) + (cookie_prefix << 56)
201
202
203 1
def set_new_cookie(flow: dict) -> dict:
204
    """Set new cookie."""
205 1
    flow["flow"]["cookie"] = get_new_cookie(
206
        flow["flow"]["cookie"], cookie_prefix=settings.INT_COOKIE_PREFIX
207
    )
208 1
    return flow
209
210
211 1
def set_instructions_from_actions(flow: dict) -> dict:
212
    """Get intructions or convert from actions."""
213 1
    if "instructions" in flow["flow"]:
214 1
        return flow
215
216 1
    instructions = [
217
        {
218
            "instruction_type": "apply_actions",
219
            "actions": flow["flow"].get("actions", []),
220
        }
221
    ]
222 1
    flow["flow"].pop("actions", None)
223 1
    flow["flow"]["instructions"] = instructions
224 1
    return flow
225
226
227 1
def sorted_evcs_by_svc_lvl(evcs: dict[str, dict]) -> dict[str, dict]:
228
    """Sorted EVCs by service level and id.
229
    This is to ensure processing by service level, and to leverage deterministic
230
    EVC order processing.
231
    """
232 1
    return {
233
        evc["id"]: evc
234
        for evc in sorted(
235
            evcs.values(), key=lambda evc: (-evc.get("service_level", 0), evc["id"])
236
        )
237
    }
238