Test Failed
Pull Request — master (#35)
by Vinicius
06:08
created

build.utils.set_new_cookie()   A

Complexity

Conditions 1

Size

Total Lines 6
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 1
CRAP Score 1.125

Importance

Changes 0
Metric Value
cc 1
eloc 4
nop 1
dl 0
loc 6
ccs 1
cts 2
cp 0.5
crap 1.125
rs 10
c 0
b 0
f 0
1
""" Support function for main.py """
2
3 1
from napps.kytos.telemetry_int import settings
4
5 1
from kytos.core import Controller
6
7 1
from .kytos_api_helper import (
8 1
    get_evcs,
9
    get_stored_flows as _get_stored_flows,
10 1
)
11
from .proxy_port import ProxyPort
12
13
from .exceptions import (
14
    FlowsNotFound,
15
    ProxyPortNotFound,
16 1
    PriorityOverflow,
17
)
18
19
20
async def get_found_stored_flows(cookies: list[int] = None) -> dict[int, list[dict]]:
21 1
    """Get stored flows ensuring that flows are found."""
22
    cookies = cookies or []
23
    stored_flows = await _get_stored_flows()
24
    for cookie, flows in stored_flows.items():
25
        if not flows:
26
            raise FlowsNotFound(get_id_from_cookie(cookie))
27
    return stored_flows
28
29
30
def get_evc_with_telemetry() -> dict:
31
    """Retrieve the list of EVC IDs and list those with
32 1
    metadata {"telemetry": {"enabled": true}}"""
33
34 1
    evc_ids = {"evcs_with_telemetry": []}
35
    for evc in get_evcs().values():
36
        if has_int_enabled(evc):
37
            evc_ids["evcs_with_telemetry"].append(evc["id"])
38
    return evc_ids
39
40
41
def has_int_enabled(evc: dict) -> bool:
42 1
    """Check if evc has telemetry."""
43
    return (
44 1
        "telemetry" in evc["metadata"]
45 1
        and isinstance(evc["metadata"]["telemetry"], dict)
46 1
        and "enabled" in evc["metadata"]["telemetry"]
47
        and evc["metadata"]["telemetry"]["enabled"]
48
    )
49
50
51
def get_evc_unis(evc: dict) -> tuple[dict, dict]:
52
    """Parse evc for unis."""
53
    uni_a_split = evc["uni_a"]["interface_id"].split(":")
54
    uni_z_split = evc["uni_z"]["interface_id"].split(":")
55
    return (
56
        {
57
            "interface_id": evc["uni_a"]["interface_id"],
58
            "tag": evc["uni_a"].get("tag", {}),
59
            "port_number": int(uni_a_split[-1]),
60 1
            "switch": ":".join(uni_a_split[:-1]),
61
        },
62
        {
63
            "interface_id": evc["uni_z"]["interface_id"],
64
            "tag": evc["uni_z"].get("tag", {}),
65 1
            "port_number": int(uni_z_split[-1]),
66
            "switch": ":".join(uni_z_split[:-1]),
67
        },
68
    )
69
70 1
71
def get_proxy_port_or_raise(
72
    controller: Controller, intf_id: str, evc_id: str
73
) -> ProxyPort:
74
    """Return a ProxyPort assigned to a UNI or raise."""
75
76 1
    interface = controller.get_interface_by_id(intf_id)
77
    if not interface:
78
        raise ProxyPortNotFound(evc_id, f"UNI interface {intf_id} not found")
79
80
    if "proxy_port" not in interface.metadata:
81
        raise ProxyPortNotFound(evc_id, f"proxy_port metadata not found in {intf_id}")
82
83
    source_intf = interface.switch.get_interface_by_port_no(
84
        interface.metadata.get("proxy_port")
85
    )
86
    if not source_intf:
87
        raise ProxyPortNotFound(
88
            evc_id,
89 1
            f"proxy_port of {intf_id} source interface not found",
90
        )
91
92
    pp = ProxyPort(controller, source_intf)
93
94
    if not pp.destination:
95
        raise ProxyPortNotFound(
96
            evc_id,
97 1
            f"proxy_port of {intf_id} destination interface not found",
98
        )
99
    return pp
100
101
102 1
def add_to_apply_actions(instructions, new_instruction, position):
103
    """Create the actions list"""
104
    for instruction in instructions:
105
        if instruction["instruction_type"] == "apply_actions":
106
            instruction["actions"].insert(position, new_instruction)
107
    return instructions
108 1
109
110
def get_cookie(evc_id: str, cookie_prefix: int) -> int:
111
    """Return the cookie integer from evc id.
112
113
    cookie_prefix is supposed to be the reserved byte value that
114
    mef_eline or telemetry_int uses.
115
    """
116
    return int(evc_id, 16) + (cookie_prefix << 56)
117
118
119
def get_id_from_cookie(cookie: int) -> str:
120
    """Return the evc id given a cookie value."""
121
    evc_id = cookie & 0xFFFFFFFFFFFFFF
122
    return f"{evc_id:x}"
123
124
125
def is_intra_switch_evc(evc):
126
    """Returns if EVC is intra-switch (two UNIs on the same switch)"""
127
    uni_a, uni_z = get_evc_unis(evc)
128
    if uni_a["switch"] == uni_z["switch"]:
129
        return True
130
    return False
131
132
133
def modify_actions(actions, actions_to_change, remove=True):
134
    """Change the current actions
135
    If remove == True, remove actions_to_change from actions.
136
    If remove == False, keep actions_to_change, remove everything else
137
    Args:
138
        actions = current list of actions on a flow
139
        actions_to_change = list of actions as strings
140
        remove = boolean
141
    Return
142 1
        actions
143
    """
144 1
    indexes = []
145 1
    count = 0
146
147
    for action in actions:
148 1
        if remove:
149
            if action["action_type"] in actions_to_change:
150 1
                indexes.append(count)
151 1
        else:
152 1
            if action["action_type"] not in actions_to_change:
153 1
                indexes.append(count)
154
        count += 1
155
156 1
    for index in sorted(indexes, reverse=True):
157
        del actions[index]
158
    return actions
159
160
161
def set_priority(flow: dict, evc_id: str = "") -> dict:
162
    """Find a suitable priority number. EP031 describes 100 as the addition."""
163
    if flow["flow"]["priority"] + 100 < (2**16 - 2):
164
        flow["flow"]["priority"] += 100
165
    elif flow["flow"]["priority"] + 1 < (2**16 - 2):
166
        flow["flow"]["priority"] += 1
167
    else:
168
        raise PriorityOverflow(evc_id, f"Flow {flow} would overflow max priority")
169
    return flow
170
171
172
def set_owner(flow: dict) -> dict:
173
    """Set flow owner."""
174
    flow["flow"]["owner"] = "telemetry_int"
175
    return flow
176
177
178
def set_table_group(flow: dict, table_group="base") -> dict:
179
    """Set flow owner."""
180
    flow["flow"]["table_group"] = table_group
181
    return flow
182
183
184 1
def get_new_cookie(cookie: int, cookie_prefix=settings.INT_COOKIE_PREFIX) -> int:
185
    """Convert from mef-eline cookie by replacing the most significant byte."""
186
    return (cookie & 0xFFFFFFFFFFFFFF) + (cookie_prefix << 56)
187
188
189
def set_new_cookie(flow: dict) -> dict:
190
    """Set new cookie."""
191
    flow["flow"]["cookie"] = get_new_cookie(
192 1
        flow["flow"]["cookie"], cookie_prefix=settings.INT_COOKIE_PREFIX
193
    )
194
    return flow
195
196
197
def set_instructions_from_actions(flow: dict) -> dict:
198
    """Get intructions or convert from actions."""
199
    if "instructions" in flow["flow"]:
200
        return flow
201
202
    instructions = [
203
        {
204
            "instruction_type": "apply_actions",
205
            "actions": flow["flow"].get("actions", []),
206
        }
207
    ]
208
    flow["flow"].pop("actions", None)
209
    flow["flow"]["instructions"] = instructions
210
    return flow
211