1 | """ This module was created to be the main interface between the telemetry napp and all |
||
2 | other kytos napps' APIs """ |
||
3 | |||
4 | 1 | from collections import defaultdict |
|
5 | 1 | from typing import Union |
|
6 | |||
7 | 1 | import httpx |
|
8 | 1 | from napps.kytos.telemetry_int import settings |
|
9 | 1 | from napps.kytos.telemetry_int.exceptions import UnrecoverableError |
|
10 | 1 | from tenacity import ( |
|
11 | retry, |
||
12 | retry_if_exception_type, |
||
13 | stop_after_attempt, |
||
14 | wait_combine, |
||
15 | wait_fixed, |
||
16 | wait_random, |
||
17 | ) |
||
18 | |||
19 | 1 | from kytos.core.retry import before_sleep |
|
20 | |||
21 | |||
22 | 1 | @retry( |
|
23 | stop=stop_after_attempt(5), |
||
24 | wait=wait_combine(wait_fixed(3), wait_random(min=2, max=7)), |
||
25 | before_sleep=before_sleep, |
||
26 | retry=retry_if_exception_type((httpx.RequestError, httpx.HTTPStatusError)), |
||
27 | ) |
||
28 | 1 | async def get_evcs(**kwargs) -> dict: |
|
29 | """Get EVCs.""" |
||
30 | 1 | archived = "false" |
|
31 | 1 | async with httpx.AsyncClient(base_url=settings.mef_eline_api) as client: |
|
32 | 1 | endpoint = f"/evc/?archived={archived}" |
|
33 | 1 | if kwargs: |
|
34 | query_args = [f"{k}={v}" for k, v in kwargs.items()] |
||
35 | endpoint = f"{endpoint}&{'&'.join(query_args)}" |
||
36 | 1 | response = await client.get(endpoint, timeout=10) |
|
37 | 1 | if response.is_server_error: |
|
38 | raise httpx.RequestError(response.text) |
||
39 | 1 | if not response.is_success: |
|
40 | raise UnrecoverableError( |
||
41 | f"Failed to get_evcs archived {archived}" |
||
42 | f"status code {response.status_code}, response text: {response.text}" |
||
43 | ) |
||
44 | 1 | return response.json() |
|
45 | |||
46 | |||
47 | 1 | @retry( |
|
48 | stop=stop_after_attempt(5), |
||
49 | wait=wait_combine(wait_fixed(3), wait_random(min=2, max=7)), |
||
50 | before_sleep=before_sleep, |
||
51 | retry=retry_if_exception_type(httpx.RequestError), |
||
52 | ) |
||
53 | 1 | async def get_evc(evc_id: str, exclude_archived=True) -> dict: |
|
54 | """Get EVC.""" |
||
55 | 1 | async with httpx.AsyncClient(base_url=settings.mef_eline_api) as client: |
|
56 | 1 | response = await client.get(f"/evc/{evc_id}", timeout=10) |
|
57 | 1 | if response.status_code == 404: |
|
58 | return {} |
||
59 | 1 | if response.is_server_error: |
|
60 | raise httpx.RequestError(response.text) |
||
61 | 1 | if not response.is_success: |
|
62 | raise UnrecoverableError( |
||
63 | f"Failed to get_evc id {evc_id} " |
||
64 | f"status code {response.status_code}, response text: {response.text}" |
||
65 | ) |
||
66 | 1 | data = response.json() |
|
67 | 1 | if data["archived"] and exclude_archived: |
|
68 | return {} |
||
69 | 1 | return {data["id"]: data} |
|
70 | |||
71 | |||
72 | 1 | @retry( |
|
73 | stop=stop_after_attempt(5), |
||
74 | wait=wait_combine(wait_fixed(3), wait_random(min=2, max=7)), |
||
75 | before_sleep=before_sleep, |
||
76 | retry=retry_if_exception_type(httpx.RequestError), |
||
77 | ) |
||
78 | 1 | async def get_stored_flows( |
|
79 | cookies: list[Union[int, tuple[int, int]]] = None, |
||
80 | ) -> dict[int, list[dict]]: |
||
81 | """Get flow_manager stored_flows grouped by cookies given a list of cookies.""" |
||
82 | 1 | cookies = cookies or [] |
|
83 | |||
84 | 1 | cookie_range_args = [] |
|
85 | 1 | for cookie in cookies: |
|
86 | 1 | if isinstance(cookie, int): |
|
87 | # gte cookie |
||
88 | 1 | cookie_range_args.append(cookie) |
|
89 | # lte cookie |
||
90 | 1 | cookie_range_args.append(cookie) |
|
91 | 1 | elif isinstance(cookie, tuple) and len(cookie) == 2: |
|
92 | # gte cookie |
||
93 | 1 | cookie_range_args.append(cookie[0]) |
|
94 | # lte cookie |
||
95 | 1 | cookie_range_args.append(cookie[1]) |
|
96 | |||
97 | 1 | endpoint = "stored_flows?state=installed&state=pending" |
|
98 | 1 | async with httpx.AsyncClient(base_url=settings.flow_manager_api) as client: |
|
99 | 1 | if cookie_range_args: |
|
100 | 1 | response = await client.request( |
|
101 | "GET", |
||
102 | f"/{endpoint}", |
||
103 | json={"cookie_range": cookie_range_args}, |
||
104 | timeout=10, |
||
105 | ) |
||
106 | else: |
||
107 | 1 | response = await client.get(f"/{endpoint}", timeout=10) |
|
108 | |||
109 | 1 | if response.is_server_error: |
|
110 | raise httpx.RequestError(response.text) |
||
111 | 1 | if not response.is_success: |
|
112 | raise UnrecoverableError( |
||
113 | f"Failed to get_stored_flows cookies {cookies} " |
||
114 | f"status code {response.status_code}, response text: {response.text}" |
||
115 | ) |
||
116 | 1 | return _map_stored_flows_by_cookies(response.json()) |
|
117 | |||
118 | |||
119 | 1 | def _map_stored_flows_by_cookies(stored_flows: dict) -> dict[int, list[dict]]: |
|
120 | """Map stored flows by cookies. |
||
121 | |||
122 | This is for mapping the data by cookies, just to it can be |
||
123 | reused upfront by bulk operations. |
||
124 | """ |
||
125 | 1 | flows_by_cookies = defaultdict(list) |
|
126 | 1 | for flows in stored_flows.values(): |
|
127 | 1 | for flow in flows: |
|
128 | 1 | flows_by_cookies[flow["flow"]["cookie"]].append(flow) |
|
129 | 1 | return flows_by_cookies |
|
130 | |||
131 | |||
132 | 1 | @retry( |
|
133 | stop=stop_after_attempt(5), |
||
134 | wait=wait_combine(wait_fixed(3), wait_random(min=2, max=7)), |
||
135 | before_sleep=before_sleep, |
||
136 | retry=retry_if_exception_type(httpx.RequestError), |
||
137 | ) |
||
138 | 1 | async def add_evcs_metadata( |
|
139 | evcs: dict[str, dict], new_metadata: dict, force=False |
||
140 | ) -> dict: |
||
141 | """Add EVC metadata.""" |
||
142 | |||
143 | 1 | circuit_ids = [evc_id for evc_id, evc in evcs.items() if evc] |
|
144 | # return early if there's no circuits to update their metadata |
||
145 | 1 | if not circuit_ids: |
|
146 | 1 | return {} |
|
147 | |||
148 | 1 | async with httpx.AsyncClient(base_url=settings.mef_eline_api) as client: |
|
149 | 1 | response = await client.post( |
|
150 | "/evc/metadata", |
||
151 | timeout=10, |
||
152 | json={ |
||
153 | **new_metadata, |
||
154 | **{"circuit_ids": circuit_ids}, |
||
155 | }, |
||
156 | ) |
||
157 | 1 | if response.is_success: |
|
158 | 1 | return response.json() |
|
159 | # Ignore 404 if force just so it's easier to handle this concurrently |
||
160 | if response.status_code == 404 and force: |
||
161 | return {} |
||
162 | |||
163 | if response.is_server_error: |
||
164 | raise httpx.RequestError(response.text) |
||
165 | raise UnrecoverableError( |
||
166 | f"Failed to add_evc_metadata for EVC ids {list(evcs.keys())} " |
||
167 | f"status code {response.status_code}, response text: {response.text}" |
||
168 | ) |
||
169 | |||
170 | |||
171 | 1 | View Code Duplication | @retry( |
0 ignored issues
–
show
Duplication
introduced
by
![]() |
|||
172 | stop=stop_after_attempt(5), |
||
173 | wait=wait_combine(wait_fixed(3), wait_random(min=2, max=7)), |
||
174 | before_sleep=before_sleep, |
||
175 | retry=retry_if_exception_type(httpx.RequestError), |
||
176 | ) |
||
177 | 1 | async def add_proxy_port_metadata(intf_id: str, port_no: int) -> dict: |
|
178 | """Add proxy_port metadata.""" |
||
179 | 1 | async with httpx.AsyncClient(base_url=settings.topology_url) as client: |
|
180 | 1 | response = await client.post( |
|
181 | f"/interfaces/{intf_id}/metadata", |
||
182 | timeout=10, |
||
183 | json={"proxy_port": port_no}, |
||
184 | ) |
||
185 | 1 | if response.is_success: |
|
186 | 1 | return response.json() |
|
187 | if response.status_code == 404: |
||
188 | raise ValueError(f"interface_id {intf_id} not found") |
||
189 | if response.is_server_error: |
||
190 | raise httpx.RequestError(response.text) |
||
191 | raise UnrecoverableError( |
||
192 | f"Failed to add_proxy_port {port_no} metadata for intf_id {intf_id} " |
||
193 | f"status code {response.status_code}, response text: {response.text}" |
||
194 | ) |
||
195 | |||
196 | |||
197 | 1 | View Code Duplication | @retry( |
0 ignored issues
–
show
|
|||
198 | stop=stop_after_attempt(5), |
||
199 | wait=wait_combine(wait_fixed(3), wait_random(min=2, max=7)), |
||
200 | before_sleep=before_sleep, |
||
201 | retry=retry_if_exception_type(httpx.RequestError), |
||
202 | ) |
||
203 | 1 | async def delete_proxy_port_metadata(intf_id: str) -> dict: |
|
204 | """Delete proxy_port metadata.""" |
||
205 | 1 | async with httpx.AsyncClient(base_url=settings.topology_url) as client: |
|
206 | 1 | response = await client.delete( |
|
207 | f"/interfaces/{intf_id}/metadata/proxy_port", |
||
208 | timeout=10, |
||
209 | ) |
||
210 | 1 | if response.is_success: |
|
211 | 1 | return response.json() |
|
212 | if response.status_code == 404: |
||
213 | raise ValueError(f"interface_id {intf_id} or metadata proxy_port not found") |
||
214 | if response.is_server_error: |
||
215 | raise httpx.RequestError(response.text) |
||
216 | raise UnrecoverableError( |
||
217 | f"Failed to delete_proxy_port metadata for intf_id {intf_id} " |
||
218 | f"status code {response.status_code}, response text: {response.text}" |
||
219 | ) |
||
220 |