Passed
Pull Request — master (#163)
by Vinicius
04:26
created

build.utils.set_proxy_port_value()   A

Complexity

Conditions 4

Size

Total Lines 8
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 6
CRAP Score 4.0466

Importance

Changes 0
Metric Value
cc 4
eloc 7
nop 2
dl 0
loc 8
ccs 6
cts 7
cp 0.8571
crap 4.0466
rs 10
c 0
b 0
f 0
1
""" Support function for main.py """
2
3 1
from typing import Optional
4
5 1
from napps.kytos.telemetry_int import settings
6
7 1
from .exceptions import FlowsNotFound, PriorityOverflow
8 1
from .kytos_api_helper import get_stored_flows as _get_stored_flows
9
10
11 1
async def get_found_stored_flows(cookies: list[int] = None) -> dict[int, list[dict]]:
12
    """Get stored flows ensuring that flows are found."""
13 1
    cookies = cookies or []
14 1
    stored_flows = await _get_stored_flows(cookies)
15 1
    for cookie, flows in stored_flows.items():
16 1
        if not flows:
17 1
            raise FlowsNotFound(get_id_from_cookie(cookie))
18 1
    return stored_flows
19
20
21 1
def has_int_enabled(evc: dict) -> bool:
22
    """Check if evc has telemetry."""
23 1
    return (
24
        "metadata" in evc
25
        and "telemetry" in evc["metadata"]
26
        and isinstance(evc["metadata"]["telemetry"], dict)
27
        and "enabled" in evc["metadata"]["telemetry"]
28
        and evc["metadata"]["telemetry"]["enabled"]
29
    )
30
31
32 1
def set_proxy_port_value(evc: dict, proxy_port_enabled: Optional[bool] = None) -> dict:
33
    """Set proxy_port_enabled metadata value for an existing EVC."""
34 1
    if not evc or not isinstance(evc, dict):
35 1
        return evc
36 1
    if "metadata" not in evc:
37
        evc["metadata"] = {}
38 1
    evc["metadata"]["proxy_port_enabled"] = proxy_port_enabled
39 1
    return evc
40
41
42 1
def get_evc_proxy_port_value(evc: dict) -> Optional[bool]:
43
    """Get proxy_port_enabled from EVC metadata."""
44 1
    try:
45 1
        return evc["metadata"]["proxy_port_enabled"]
46 1
    except (KeyError, TypeError):
47 1
        return None
48
49
50 1
def get_evc_unis(evc: dict) -> tuple[dict, dict]:
51
    """Parse evc for unis."""
52 1
    uni_a_split = evc["uni_a"]["interface_id"].split(":")
53 1
    uni_z_split = evc["uni_z"]["interface_id"].split(":")
54 1
    return (
55
        {
56
            "interface_id": evc["uni_a"]["interface_id"],
57
            "tag": evc["uni_a"].get("tag", {}),
58
            "port_number": int(uni_a_split[-1]),
59
            "switch": ":".join(uni_a_split[:-1]),
60
        },
61
        {
62
            "interface_id": evc["uni_z"]["interface_id"],
63
            "tag": evc["uni_z"].get("tag", {}),
64
            "port_number": int(uni_z_split[-1]),
65
            "switch": ":".join(uni_z_split[:-1]),
66
        },
67
    )
68
69
70 1
def add_to_apply_actions(
71
    instructions: list[dict], new_instruction: dict, position: int
72
):
73
    """Create the actions list"""
74 1
    for instruction in instructions:
75 1
        if instruction["instruction_type"] == "apply_actions":
76 1
            instruction["actions"].insert(position, new_instruction)
77 1
    return instructions
78
79
80 1
def has_instruction_and_action_type(
81
    instructions: list[dict], instruction_type: str, action_type: str
82
) -> bool:
83
    """Check if any of the instructions has a given type and action type."""
84 1
    for instruction in instructions:
85 1
        if (
86
            instruction["instruction_type"] != instruction_type
87
            or "actions" not in instruction
88
        ):
89 1
            continue
90 1
        for action in instruction["actions"]:
91 1
            if "action_type" in action and action["action_type"] == action_type:
92 1
                return True
93 1
    return False
94
95
96 1
def get_cookie(evc_id: str, cookie_prefix: int) -> int:
97
    """Return the cookie integer from evc id.
98
99
    cookie_prefix is supposed to be the reserved byte value that
100
    mef_eline or telemetry_int uses.
101
    """
102 1
    return int(evc_id, 16) + (cookie_prefix << 56)
103
104
105 1
def get_id_from_cookie(cookie: int) -> str:
106
    """Return the evc id given a cookie value."""
107 1
    evc_id = cookie & 0xFFFFFFFFFFFFFF
108 1
    return f"{evc_id:x}".zfill(14)
109
110
111 1
def is_intra_switch_evc(evc):
112
    """Returns if EVC is intra-switch (two UNIs on the same switch)"""
113 1
    uni_a, uni_z = get_evc_unis(evc)
114 1
    if uni_a["switch"] == uni_z["switch"]:
115 1
        return True
116 1
    return False
117
118
119 1
def modify_actions(actions: list[dict], actions_to_change: list[str], remove=True):
120
    """Change the current actions
121
    If remove == True, remove actions_to_change from actions.
122
    If remove == False, keep actions_to_change, remove everything else
123
    Args:
124
        actions = current list of actions on a flow
125
        actions_to_change = list of actions as strings
126
        remove = boolean
127
    Return
128
        actions
129
    """
130 1
    del_indexes = set()
131 1
    for index, action in enumerate(actions):
132 1
        if remove:
133 1
            if action["action_type"] in actions_to_change:
134 1
                del_indexes.add(index)
135
        else:
136 1
            if action["action_type"] not in actions_to_change:
137 1
                del_indexes.add(index)
138 1
    return [action for i, action in enumerate(actions) if i not in del_indexes]
139
140
141 1
def set_priority(flow: dict, evc_id: str = "") -> dict:
142
    """Find a suitable priority number. EP031 describes 100 as the addition."""
143 1
    if flow["flow"]["priority"] + 100 < (2**16 - 2):
144 1
        flow["flow"]["priority"] += 100
145 1
    elif flow["flow"]["priority"] + 1 < (2**16 - 2):
146 1
        flow["flow"]["priority"] += 1
147
    else:
148 1
        raise PriorityOverflow(evc_id, f"Flow {flow} would overflow max priority")
149 1
    return flow
150
151
152 1
def set_owner(flow: dict) -> dict:
153
    """Set flow owner."""
154 1
    flow["flow"]["owner"] = "telemetry_int"
155 1
    return flow
156
157
158 1
def get_new_cookie(cookie: int, cookie_prefix=settings.INT_COOKIE_PREFIX) -> int:
159
    """Convert from mef-eline cookie by replacing the most significant byte."""
160 1
    return (cookie & 0xFFFFFFFFFFFFFF) + (cookie_prefix << 56)
161
162
163 1
def set_new_cookie(flow: dict) -> dict:
164
    """Set new cookie."""
165 1
    flow["flow"]["cookie"] = get_new_cookie(
166
        flow["flow"]["cookie"], cookie_prefix=settings.INT_COOKIE_PREFIX
167
    )
168 1
    return flow
169
170
171 1
def set_instructions_from_actions(flow: dict) -> dict:
172
    """Get intructions or convert from actions."""
173 1
    if "instructions" in flow["flow"]:
174 1
        return flow
175
176 1
    instructions = [
177
        {
178
            "instruction_type": "apply_actions",
179
            "actions": flow["flow"].get("actions", []),
180
        }
181
    ]
182 1
    flow["flow"].pop("actions", None)
183 1
    flow["flow"]["instructions"] = instructions
184 1
    return flow
185
186
187 1
def get_svlan_dpid_link(link: dict, dpid: str) -> Optional[int]:
188
    """Try to get svlan of a link if a dpid matches one of the endpoints."""
189 1
    if any(
190
        (
191
            link["endpoint_a"]["switch"] == dpid and "s_vlan" in link["metadata"],
192
            link["endpoint_b"]["switch"] == dpid and "s_vlan" in link["metadata"],
193
        )
194
    ):
195 1
        return link["metadata"]["s_vlan"]["value"]
196
    return None
197