1 | """ Support function for main.py """ |
||
2 | |||
3 | 1 | from typing import Optional |
|
4 | |||
5 | 1 | from napps.kytos.telemetry_int import settings |
|
6 | |||
7 | 1 | from .exceptions import FlowsNotFound, PriorityOverflow |
|
8 | 1 | from .kytos_api_helper import get_stored_flows as _get_stored_flows |
|
9 | |||
10 | |||
11 | 1 | async def get_found_stored_flows(cookies: list[int] = None) -> dict[int, list[dict]]: |
|
12 | """Get stored flows ensuring that flows are found.""" |
||
13 | 1 | cookies = cookies or [] |
|
14 | 1 | stored_flows = await _get_stored_flows(cookies) |
|
15 | 1 | for cookie, flows in stored_flows.items(): |
|
16 | 1 | if not flows: |
|
17 | 1 | raise FlowsNotFound(get_id_from_cookie(cookie)) |
|
18 | 1 | return stored_flows |
|
19 | |||
20 | |||
21 | 1 | def has_int_enabled(evc: dict) -> bool: |
|
22 | """Check if evc has telemetry.""" |
||
23 | 1 | return ( |
|
24 | "metadata" in evc |
||
25 | and "telemetry" in evc["metadata"] |
||
26 | and isinstance(evc["metadata"]["telemetry"], dict) |
||
27 | and "enabled" in evc["metadata"]["telemetry"] |
||
28 | and evc["metadata"]["telemetry"]["enabled"] |
||
29 | ) |
||
30 | |||
31 | |||
32 | 1 | def get_evc_unis(evc: dict) -> tuple[dict, dict]: |
|
33 | """Parse evc for unis.""" |
||
34 | 1 | uni_a_split = evc["uni_a"]["interface_id"].split(":") |
|
35 | 1 | uni_z_split = evc["uni_z"]["interface_id"].split(":") |
|
36 | 1 | return ( |
|
37 | { |
||
38 | "interface_id": evc["uni_a"]["interface_id"], |
||
39 | "tag": evc["uni_a"].get("tag", {}), |
||
40 | "port_number": int(uni_a_split[-1]), |
||
41 | "switch": ":".join(uni_a_split[:-1]), |
||
42 | }, |
||
43 | { |
||
44 | "interface_id": evc["uni_z"]["interface_id"], |
||
45 | "tag": evc["uni_z"].get("tag", {}), |
||
46 | "port_number": int(uni_z_split[-1]), |
||
47 | "switch": ":".join(uni_z_split[:-1]), |
||
48 | }, |
||
49 | ) |
||
50 | |||
51 | |||
52 | 1 | def add_to_apply_actions( |
|
53 | instructions: list[dict], new_instruction: dict, position: int |
||
54 | ): |
||
55 | """Create the actions list""" |
||
56 | 1 | for instruction in instructions: |
|
57 | 1 | if instruction["instruction_type"] == "apply_actions": |
|
58 | 1 | instruction["actions"].insert(position, new_instruction) |
|
59 | 1 | return instructions |
|
60 | |||
61 | |||
62 | 1 | def has_instruction_and_action_type( |
|
63 | instructions: list[dict], instruction_type: str, action_type: str |
||
64 | ) -> bool: |
||
65 | """Check if any of the instructions has a given type and action type.""" |
||
66 | 1 | for instruction in instructions: |
|
67 | 1 | if ( |
|
68 | instruction["instruction_type"] != instruction_type |
||
69 | or "actions" not in instruction |
||
70 | ): |
||
71 | 1 | continue |
|
72 | 1 | for action in instruction["actions"]: |
|
73 | 1 | if "action_type" in action and action["action_type"] == action_type: |
|
74 | 1 | return True |
|
75 | 1 | return False |
|
76 | |||
77 | |||
78 | 1 | def get_cookie(evc_id: str, cookie_prefix: int) -> int: |
|
79 | """Return the cookie integer from evc id. |
||
80 | |||
81 | cookie_prefix is supposed to be the reserved byte value that |
||
82 | mef_eline or telemetry_int uses. |
||
83 | """ |
||
84 | 1 | return int(evc_id, 16) + (cookie_prefix << 56) |
|
85 | |||
86 | |||
87 | 1 | def get_id_from_cookie(cookie: int) -> str: |
|
88 | """Return the evc id given a cookie value.""" |
||
89 | 1 | evc_id = cookie & 0xFFFFFFFFFFFFFF |
|
90 | 1 | return f"{evc_id:x}".zfill(14) |
|
91 | |||
92 | |||
93 | 1 | def is_intra_switch_evc(evc): |
|
94 | """Returns if EVC is intra-switch (two UNIs on the same switch)""" |
||
95 | 1 | uni_a, uni_z = get_evc_unis(evc) |
|
96 | 1 | if uni_a["switch"] == uni_z["switch"]: |
|
97 | 1 | return True |
|
98 | 1 | return False |
|
99 | |||
100 | |||
101 | 1 | def modify_actions(actions: list[dict], actions_to_change: list[str], remove=True): |
|
102 | """Change the current actions |
||
103 | If remove == True, remove actions_to_change from actions. |
||
104 | If remove == False, keep actions_to_change, remove everything else |
||
105 | Args: |
||
106 | actions = current list of actions on a flow |
||
107 | actions_to_change = list of actions as strings |
||
108 | remove = boolean |
||
109 | Return |
||
110 | actions |
||
111 | """ |
||
112 | 1 | del_indexes = set() |
|
113 | 1 | for index, action in enumerate(actions): |
|
114 | 1 | if remove: |
|
115 | 1 | if action["action_type"] in actions_to_change: |
|
116 | 1 | del_indexes.add(index) |
|
117 | else: |
||
118 | 1 | if action["action_type"] not in actions_to_change: |
|
119 | 1 | del_indexes.add(index) |
|
120 | 1 | return [action for i, action in enumerate(actions) if i not in del_indexes] |
|
121 | |||
122 | |||
123 | 1 | def set_priority(flow: dict, evc_id: str = "") -> dict: |
|
124 | """Find a suitable priority number. EP031 describes 100 as the addition.""" |
||
125 | 1 | if flow["flow"]["priority"] + 100 < (2**16 - 2): |
|
126 | 1 | flow["flow"]["priority"] += 100 |
|
127 | 1 | elif flow["flow"]["priority"] + 1 < (2**16 - 2): |
|
128 | 1 | flow["flow"]["priority"] += 1 |
|
129 | else: |
||
130 | 1 | raise PriorityOverflow(evc_id, f"Flow {flow} would overflow max priority") |
|
131 | 1 | return flow |
|
132 | |||
133 | |||
134 | 1 | def set_owner(flow: dict) -> dict: |
|
135 | """Set flow owner.""" |
||
136 | 1 | flow["flow"]["owner"] = "telemetry_int" |
|
137 | 1 | return flow |
|
138 | |||
139 | |||
140 | 1 | def get_new_cookie(cookie: int, cookie_prefix=settings.INT_COOKIE_PREFIX) -> int: |
|
141 | """Convert from mef-eline cookie by replacing the most significant byte.""" |
||
142 | 1 | return (cookie & 0xFFFFFFFFFFFFFF) + (cookie_prefix << 56) |
|
143 | |||
144 | |||
145 | 1 | def set_new_cookie(flow: dict) -> dict: |
|
146 | """Set new cookie.""" |
||
147 | 1 | flow["flow"]["cookie"] = get_new_cookie( |
|
148 | flow["flow"]["cookie"], cookie_prefix=settings.INT_COOKIE_PREFIX |
||
149 | ) |
||
150 | 1 | return flow |
|
151 | |||
152 | |||
153 | 1 | def set_instructions_from_actions(flow: dict) -> dict: |
|
154 | """Get intructions or convert from actions.""" |
||
155 | 1 | if "instructions" in flow["flow"]: |
|
156 | 1 | return flow |
|
157 | |||
158 | 1 | instructions = [ |
|
159 | { |
||
160 | "instruction_type": "apply_actions", |
||
161 | "actions": flow["flow"].get("actions", []), |
||
162 | } |
||
163 | ] |
||
164 | 1 | flow["flow"].pop("actions", None) |
|
165 | 1 | flow["flow"]["instructions"] = instructions |
|
166 | 1 | return flow |
|
167 | |||
168 | |||
169 | 1 | def get_svlan_dpid_link(link: dict, dpid: str) -> Optional[int]: |
|
170 | """Try to get svlan of a link if a dpid matches one of the endpoints.""" |
||
171 | 1 | if any( |
|
172 | ( |
||
173 | link["endpoint_a"]["switch"] == dpid and "s_vlan" in link["metadata"], |
||
174 | link["endpoint_b"]["switch"] == dpid and "s_vlan" in link["metadata"], |
||
175 | ) |
||
176 | ): |
||
177 | 1 | return link["metadata"]["s_vlan"]["value"] |
|
178 | return None |
||
179 |