1 | """Main module of kytos/telemetry Network Application. |
||
2 | |||
3 | Napp to deploy In-band Network Telemetry over Ethernet Virtual Circuits |
||
4 | |||
5 | """ |
||
6 | |||
7 | 1 | import asyncio |
|
8 | 1 | import copy |
|
9 | 1 | import pathlib |
|
10 | 1 | from datetime import datetime |
|
11 | |||
12 | 1 | import napps.kytos.telemetry_int.kytos_api_helper as api |
|
13 | 1 | from napps.kytos.telemetry_int import settings, utils |
|
14 | 1 | from tenacity import RetryError |
|
15 | |||
16 | 1 | from kytos.core import KytosEvent, KytosNApp, log, rest |
|
17 | 1 | from kytos.core.common import EntityStatus |
|
18 | 1 | from kytos.core.helpers import alisten_to, avalidate_openapi_request, load_spec |
|
19 | 1 | from kytos.core.rest_api import HTTPException, JSONResponse, Request, aget_json_or_400 |
|
20 | |||
21 | 1 | from .exceptions import ( |
|
22 | EVCError, |
||
23 | EVCHasINT, |
||
24 | EVCHasNoINT, |
||
25 | EVCNotFound, |
||
26 | FlowsNotFound, |
||
27 | ProxyPortError, |
||
28 | ProxyPortNotFound, |
||
29 | ProxyPortSameSourceIntraEVC, |
||
30 | ProxyPortShared, |
||
31 | ProxyPortStatusNotUP, |
||
32 | UnrecoverableError, |
||
33 | ) |
||
34 | 1 | from .managers.int import INTManager |
|
35 | |||
36 | |||
37 | 1 | class Main(KytosNApp): |
|
38 | """Main class of kytos/telemetry NApp. |
||
39 | |||
40 | This class is the entry point for this NApp. |
||
41 | """ |
||
42 | |||
43 | 1 | spec = load_spec(pathlib.Path(__file__).parent / "openapi.yml") |
|
44 | |||
45 | 1 | def setup(self): |
|
46 | """Replace the '__init__' method for the KytosNApp subclass. |
||
47 | |||
48 | The setup method is automatically called by the controller when your |
||
49 | application is loaded. |
||
50 | |||
51 | So, if you have any setup routine, insert it here. |
||
52 | """ |
||
53 | |||
54 | 1 | self.int_manager = INTManager(self.controller) |
|
55 | 1 | self._ofpt_error_lock = asyncio.Lock() |
|
56 | |||
57 | 1 | def execute(self): |
|
58 | """Run after the setup method execution. |
||
59 | |||
60 | You can also use this method in loop mode if you add to the above setup |
||
61 | method a line like the following example: |
||
62 | |||
63 | self.execute_as_loop(30) # 30-second interval. |
||
64 | """ |
||
65 | |||
66 | 1 | def shutdown(self): |
|
67 | """Run when your NApp is unloaded. |
||
68 | |||
69 | If you have some cleanup procedure, insert it here. |
||
70 | """ |
||
71 | |||
72 | 1 | @rest("v1/evc/enable", methods=["POST"]) |
|
73 | 1 | async def enable_telemetry(self, request: Request) -> JSONResponse: |
|
74 | """REST to enable INT flows on EVCs. |
||
75 | |||
76 | If a list of evc_ids is empty, it'll enable on non-INT EVCs. |
||
77 | """ |
||
78 | 1 | await avalidate_openapi_request(self.spec, request) |
|
79 | |||
80 | 1 | try: |
|
81 | 1 | content = await aget_json_or_400(request) |
|
82 | 1 | evc_ids = content["evc_ids"] |
|
83 | 1 | force = content.get("force", False) |
|
84 | 1 | if not isinstance(force, bool): |
|
85 | raise TypeError(f"'force' wrong type: {type(force)} expected bool") |
||
86 | except (TypeError, KeyError): |
||
87 | raise HTTPException(400, detail=f"Invalid payload: {content}") |
||
88 | |||
89 | 1 | try: |
|
90 | 1 | evcs = ( |
|
91 | await api.get_evcs() |
||
92 | if len(evc_ids) != 1 |
||
93 | else await api.get_evc(evc_ids[0]) |
||
94 | ) |
||
95 | except RetryError as exc: |
||
96 | exc_error = str(exc.last_attempt.exception()) |
||
97 | log.error(exc_error) |
||
98 | raise HTTPException(503, detail=exc_error) |
||
99 | |||
100 | 1 | if evc_ids: |
|
101 | 1 | evcs = {evc_id: evcs.get(evc_id, {}) for evc_id in evc_ids} |
|
102 | else: |
||
103 | evcs = {k: v for k, v in evcs.items() if not utils.has_int_enabled(v)} |
||
104 | if not evcs: |
||
105 | # There's no non-INT EVCs to get enabled. |
||
106 | return JSONResponse(list(evcs.keys())) |
||
107 | |||
108 | 1 | try: |
|
109 | # First, it tries to get and remove the existing INT flows like mef_eline |
||
110 | 1 | stored_flows = await api.get_stored_flows( |
|
111 | [ |
||
112 | utils.get_cookie(evc_id, settings.INT_COOKIE_PREFIX) |
||
113 | for evc_id in evcs |
||
114 | ] |
||
115 | ) |
||
116 | 1 | await self.int_manager._remove_int_flows_by_cookies(stored_flows) |
|
117 | 1 | await self.int_manager.enable_int(evcs, force) |
|
118 | except (EVCNotFound, FlowsNotFound, ProxyPortNotFound) as exc: |
||
119 | raise HTTPException(404, detail=str(exc)) |
||
120 | except ( |
||
121 | EVCHasINT, |
||
122 | ProxyPortStatusNotUP, |
||
123 | ProxyPortSameSourceIntraEVC, |
||
124 | ProxyPortShared, |
||
125 | ) as exc: |
||
126 | raise HTTPException(409, detail=str(exc)) |
||
127 | except RetryError as exc: |
||
128 | exc_error = str(exc.last_attempt.exception()) |
||
129 | log.error(exc_error) |
||
130 | raise HTTPException(503, detail=exc_error) |
||
131 | except UnrecoverableError as exc: |
||
132 | exc_error = str(exc) |
||
133 | log.error(exc_error) |
||
134 | raise HTTPException(500, detail=exc_error) |
||
135 | |||
136 | 1 | return JSONResponse(list(evcs.keys()), status_code=201) |
|
137 | |||
138 | 1 | @rest("v1/evc/disable", methods=["POST"]) |
|
139 | 1 | async def disable_telemetry(self, request: Request) -> JSONResponse: |
|
140 | """REST to disable/remove INT flows for an EVC_ID |
||
141 | |||
142 | If a list of evc_ids is empty, it'll disable on all INT EVCs. |
||
143 | """ |
||
144 | 1 | await avalidate_openapi_request(self.spec, request) |
|
145 | |||
146 | 1 | try: |
|
147 | 1 | content = await aget_json_or_400(request) |
|
148 | 1 | evc_ids = content["evc_ids"] |
|
149 | 1 | force = content.get("force", False) |
|
150 | 1 | if not isinstance(force, bool): |
|
151 | raise TypeError(f"'force' wrong type: {type(force)} expected bool") |
||
152 | except (TypeError, KeyError): |
||
153 | raise HTTPException(400, detail=f"Invalid payload: {content}") |
||
154 | |||
155 | 1 | try: |
|
156 | 1 | evcs = ( |
|
157 | await api.get_evcs() |
||
158 | if len(evc_ids) != 1 |
||
159 | else await api.get_evc(evc_ids[0]) |
||
160 | ) |
||
161 | except RetryError as exc: |
||
162 | exc_error = str(exc.last_attempt.exception()) |
||
163 | log.error(exc_error) |
||
164 | raise HTTPException(503, detail=exc_error) |
||
165 | |||
166 | 1 | if evc_ids: |
|
167 | 1 | evcs = {evc_id: evcs.get(evc_id, {}) for evc_id in evc_ids} |
|
168 | else: |
||
169 | evcs = {k: v for k, v in evcs.items() if utils.has_int_enabled(v)} |
||
170 | if not evcs: |
||
171 | # There's no INT EVCs to get disabled. |
||
172 | return JSONResponse(list(evcs.keys())) |
||
173 | |||
174 | 1 | try: |
|
175 | 1 | await self.int_manager.disable_int(evcs, force) |
|
176 | except EVCNotFound as exc: |
||
177 | raise HTTPException(404, detail=str(exc)) |
||
178 | except EVCHasNoINT as exc: |
||
179 | raise HTTPException(409, detail=str(exc)) |
||
180 | except RetryError as exc: |
||
181 | exc_error = str(exc.last_attempt.exception()) |
||
182 | log.error(exc_error) |
||
183 | raise HTTPException(503, detail=exc_error) |
||
184 | except UnrecoverableError as exc: |
||
185 | exc_error = str(exc) |
||
186 | log.error(exc_error) |
||
187 | raise HTTPException(500, detail=exc_error) |
||
188 | |||
189 | 1 | return JSONResponse(list(evcs.keys())) |
|
190 | |||
191 | 1 | @rest("v1/evc") |
|
192 | 1 | async def get_evcs(self, _request: Request) -> JSONResponse: |
|
193 | """REST to return the list of EVCs with INT enabled""" |
||
194 | 1 | try: |
|
195 | 1 | evcs = await api.get_evcs(**{"metadata.telemetry.enabled": "true"}) |
|
196 | 1 | return JSONResponse(evcs) |
|
197 | except RetryError as exc: |
||
198 | exc_error = str(exc.last_attempt.exception()) |
||
199 | log.error(exc_error) |
||
200 | raise HTTPException(503, detail=exc_error) |
||
201 | except UnrecoverableError as exc: |
||
202 | exc_error = str(exc) |
||
203 | log.error(exc_error) |
||
204 | raise HTTPException(500, detail=exc_error) |
||
205 | |||
206 | 1 | @rest("v1/evc/redeploy", methods=["PATCH"]) |
|
207 | 1 | async def redeploy_telemetry(self, request: Request) -> JSONResponse: |
|
208 | """REST to redeploy INT on EVCs. |
||
209 | |||
210 | If a list of evc_ids is empty, it'll redeploy on all INT EVCs. |
||
211 | """ |
||
212 | 1 | await avalidate_openapi_request(self.spec, request) |
|
213 | |||
214 | 1 | try: |
|
215 | 1 | content = await aget_json_or_400(request) |
|
216 | 1 | evc_ids = content["evc_ids"] |
|
217 | except (TypeError, KeyError): |
||
218 | raise HTTPException(400, detail=f"Invalid payload: {content}") |
||
219 | |||
220 | 1 | try: |
|
221 | 1 | evcs = ( |
|
222 | await api.get_evcs() |
||
223 | if len(evc_ids) != 1 |
||
224 | else await api.get_evc(evc_ids[0]) |
||
225 | ) |
||
226 | except RetryError as exc: |
||
227 | exc_error = str(exc.last_attempt.exception()) |
||
228 | log.error(exc_error) |
||
229 | raise HTTPException(503, detail=exc_error) |
||
230 | |||
231 | 1 | if evc_ids: |
|
232 | 1 | evcs = {evc_id: evcs.get(evc_id, {}) for evc_id in evc_ids} |
|
233 | else: |
||
234 | evcs = {k: v for k, v in evcs.items() if utils.has_int_enabled(v)} |
||
235 | if not evcs: |
||
236 | raise HTTPException(404, detail="There aren't INT EVCs to redeploy") |
||
237 | |||
238 | 1 | try: |
|
239 | 1 | await self.int_manager.redeploy_int(evcs) |
|
240 | 1 | except (EVCNotFound, FlowsNotFound, ProxyPortNotFound) as exc: |
|
241 | raise HTTPException(404, detail=str(exc)) |
||
242 | 1 | except (EVCHasNoINT, ProxyPortSameSourceIntraEVC, ProxyPortShared) as exc: |
|
243 | 1 | raise HTTPException(409, detail=str(exc)) |
|
244 | except RetryError as exc: |
||
245 | exc_error = str(exc.last_attempt.exception()) |
||
246 | log.error(exc_error) |
||
247 | raise HTTPException(503, detail=exc_error) |
||
248 | except UnrecoverableError as exc: |
||
249 | exc_error = str(exc) |
||
250 | log.error(exc_error) |
||
251 | raise HTTPException(500, detail=exc_error) |
||
252 | |||
253 | 1 | return JSONResponse(list(evcs.keys()), status_code=201) |
|
254 | |||
255 | 1 | @rest("v1/evc/compare") |
|
256 | 1 | async def evc_compare(self, _request: Request) -> JSONResponse: |
|
257 | """List and compare which INT EVCs have flows installed comparing with |
||
258 | mef_eline flows and telemetry metadata. You should use this endpoint |
||
259 | to confirm if both the telemetry metadata is still coherent and also |
||
260 | the minimum expected number of flows. A list of EVCs will get returned |
||
261 | with the inconsistent INT EVCs. If you encounter any inconsistent |
||
262 | EVC you need to analyze the situation and then decide if you'd |
||
263 | like to force enable or disable INT. |
||
264 | """ |
||
265 | |||
266 | 1 | try: |
|
267 | 1 | int_flows, mef_flows, evcs = await asyncio.gather( |
|
268 | api.get_stored_flows( |
||
269 | [ |
||
270 | ( |
||
271 | settings.INT_COOKIE_PREFIX << 56, |
||
272 | settings.INT_COOKIE_PREFIX << 56 | 0xFFFFFFFFFFFFFF, |
||
273 | ), |
||
274 | ] |
||
275 | ), |
||
276 | api.get_stored_flows( |
||
277 | [ |
||
278 | ( |
||
279 | settings.MEF_COOKIE_PREFIX << 56, |
||
280 | settings.MEF_COOKIE_PREFIX << 56 | 0xFFFFFFFFFFFFFF, |
||
281 | ), |
||
282 | ] |
||
283 | ), |
||
284 | api.get_evcs(), |
||
285 | ) |
||
286 | except RetryError as exc: |
||
287 | exc_error = str(exc.last_attempt.exception()) |
||
288 | log.error(exc_error) |
||
289 | raise HTTPException(503, detail=exc_error) |
||
290 | except UnrecoverableError as exc: |
||
291 | exc_error = str(exc) |
||
292 | log.error(exc_error) |
||
293 | raise HTTPException(500, detail=exc_error) |
||
294 | |||
295 | 1 | response = [ |
|
296 | {"id": k, "name": evcs[k]["name"], "compare_reason": v} |
||
297 | for k, v in self.int_manager.evc_compare(int_flows, mef_flows, evcs).items() |
||
298 | ] |
||
299 | 1 | return JSONResponse(response) |
|
300 | |||
301 | 1 | @rest("v1/uni/{interface_id}/proxy_port", methods=["DELETE"]) |
|
302 | 1 | async def delete_proxy_port_metadata(self, request: Request) -> JSONResponse: |
|
303 | """Delete proxy port metadata.""" |
||
304 | 1 | intf_id = request.path_params["interface_id"] |
|
305 | 1 | intf = self.controller.get_interface_by_id(intf_id) |
|
306 | 1 | if not intf: |
|
307 | raise HTTPException(404, detail=f"Interface id {intf_id} not found") |
||
308 | 1 | if "proxy_port" not in intf.metadata: |
|
309 | 1 | return JSONResponse("Operation successful") |
|
310 | |||
311 | 1 | qparams = request.query_params |
|
312 | 1 | force = qparams.get("force", "false").lower() == "true" |
|
313 | |||
314 | 1 | try: |
|
315 | 1 | pp = self.int_manager.srcs_pp[self.int_manager.unis_src[intf_id]] |
|
316 | 1 | if pp.evc_ids and not force: |
|
317 | 1 | return JSONResponse( |
|
318 | { |
||
319 | "status_code": 409, |
||
320 | "code": 409, |
||
321 | "description": f"{pp} is in use on {len(pp.evc_ids)} EVCs", |
||
322 | "evc_ids": sorted(pp.evc_ids), |
||
323 | }, |
||
324 | status_code=409, |
||
325 | ) |
||
326 | 1 | except KeyError: |
|
327 | 1 | pass |
|
328 | |||
329 | 1 | try: |
|
330 | 1 | await api.delete_proxy_port_metadata(intf_id) |
|
331 | 1 | return JSONResponse("Operation successful") |
|
332 | except ValueError as exc: |
||
333 | raise HTTPException(404, detail=str(exc)) |
||
334 | except UnrecoverableError as exc: |
||
335 | raise HTTPException(500, detail=str(exc)) |
||
336 | |||
337 | 1 | @rest("v1/uni/{interface_id}/proxy_port/{port_number:int}", methods=["POST"]) |
|
338 | 1 | async def add_proxy_port_metadata(self, request: Request) -> JSONResponse: |
|
339 | """Add proxy port metadata.""" |
||
340 | 1 | intf_id = request.path_params["interface_id"] |
|
341 | 1 | port_no = request.path_params["port_number"] |
|
342 | 1 | qparams = request.query_params |
|
343 | 1 | if not (intf := self.controller.get_interface_by_id(intf_id)): |
|
344 | raise HTTPException(404, detail=f"Interface id {intf_id} not found") |
||
345 | 1 | if "proxy_port" in intf.metadata and intf.metadata["proxy_port"] == port_no: |
|
346 | 1 | return JSONResponse("Operation successful") |
|
347 | |||
348 | 1 | force = qparams.get("force", "false").lower() == "true" |
|
349 | 1 | try: |
|
350 | 1 | pp = self.int_manager.get_proxy_port_or_raise(intf_id, "no_evc_id", port_no) |
|
351 | 1 | if pp.status != EntityStatus.UP and not force: |
|
352 | 1 | raise HTTPException(409, detail=f"{pp} status isn't UP") |
|
353 | 1 | self.int_manager._validate_new_dedicated_proxy_port(intf, port_no) |
|
354 | 1 | except ProxyPortShared as exc: |
|
355 | 1 | raise HTTPException(409, detail=exc.message) |
|
356 | 1 | except ProxyPortError as exc: |
|
357 | raise HTTPException(404, detail=exc.message) |
||
358 | |||
359 | 1 | try: |
|
360 | 1 | await api.add_proxy_port_metadata(intf_id, port_no) |
|
361 | 1 | return JSONResponse("Operation successful") |
|
362 | except ValueError as exc: |
||
363 | raise HTTPException(404, detail=str(exc)) |
||
364 | except UnrecoverableError as exc: |
||
365 | raise HTTPException(500, detail=str(exc)) |
||
366 | |||
367 | 1 | @rest("v1/uni/proxy_port") |
|
368 | 1 | async def list_uni_proxy_ports(self, _request: Request) -> JSONResponse: |
|
369 | """List configured UNI proxy ports.""" |
||
370 | 1 | interfaces_proxy_ports = [] |
|
371 | 1 | for switch in self.controller.switches.copy().values(): |
|
372 | 1 | for intf in switch.interfaces.copy().values(): |
|
373 | 1 | if "proxy_port" in intf.metadata: |
|
374 | 1 | payload = { |
|
375 | "uni": { |
||
376 | "id": intf.id, |
||
377 | "status": intf.status.value, |
||
378 | "status_reason": sorted(intf.status_reason), |
||
379 | }, |
||
380 | "proxy_port": { |
||
381 | "port_number": intf.metadata["proxy_port"], |
||
382 | "status": "DOWN", |
||
383 | "status_reason": [], |
||
384 | }, |
||
385 | } |
||
386 | 1 | try: |
|
387 | 1 | pp = self.int_manager.get_proxy_port_or_raise( |
|
388 | intf.id, "no_evc_id" |
||
389 | ) |
||
390 | 1 | payload["proxy_port"]["status"] = pp.status.value |
|
391 | 1 | except ProxyPortError as exc: |
|
392 | 1 | payload["proxy_port"]["status_reason"] = [exc.message] |
|
393 | 1 | interfaces_proxy_ports.append(payload) |
|
394 | 1 | return JSONResponse(interfaces_proxy_ports) |
|
395 | |||
396 | 1 | @alisten_to("kytos/mef_eline.evcs_loaded") |
|
397 | 1 | async def on_mef_eline_evcs_loaded(self, event: KytosEvent) -> None: |
|
398 | """Handle kytos/mef_eline.evcs_loaded.""" |
||
399 | 1 | self.int_manager.load_uni_src_proxy_ports(event.content) |
|
400 | |||
401 | 1 | @alisten_to("kytos/of_multi_table.enable_table") |
|
402 | 1 | async def on_table_enabled(self, event): |
|
403 | """Handle of_multi_table.enable_table.""" |
||
404 | 1 | table_group = event.content.get("telemetry_int", {}) |
|
405 | 1 | if not table_group: |
|
406 | 1 | return |
|
407 | 1 | for group in table_group: |
|
408 | 1 | if group not in settings.TABLE_GROUP_ALLOWED: |
|
409 | 1 | log.error( |
|
410 | f'The table group "{group}" is not allowed for ' |
||
411 | f"telemetry_int. Allowed table groups are " |
||
412 | f"{settings.TABLE_GROUP_ALLOWED}" |
||
413 | ) |
||
414 | 1 | return |
|
415 | 1 | self.int_manager.flow_builder.table_group.update(table_group) |
|
416 | 1 | content = {"group_table": self.int_manager.flow_builder.table_group} |
|
417 | 1 | event_out = KytosEvent(name="kytos/telemetry_int.enable_table", content=content) |
|
418 | 1 | await self.controller.buffers.app.aput(event_out) |
|
419 | |||
420 | 1 | @alisten_to("kytos/mef_eline.deleted") |
|
421 | 1 | async def on_evc_deleted(self, event: KytosEvent) -> None: |
|
422 | """On EVC deleted.""" |
||
423 | 1 | content = event.content |
|
424 | 1 | if ( |
|
425 | "metadata" in content |
||
426 | and "telemetry" in content["metadata"] |
||
427 | and content["metadata"]["telemetry"]["enabled"] |
||
428 | ): |
||
429 | 1 | evc_id = content["id"] |
|
430 | 1 | log.info(f"Handling mef_eline.deleted on EVC id: {evc_id}") |
|
431 | 1 | await self.int_manager.disable_int({evc_id: content}, force=True) |
|
432 | |||
433 | 1 | @alisten_to("kytos/mef_eline.deployed") |
|
434 | 1 | async def on_evc_deployed(self, event: KytosEvent) -> None: |
|
435 | """On EVC deployed.""" |
||
436 | 1 | content = event.content |
|
437 | 1 | evc_id = content["id"] |
|
438 | 1 | evcs = {evc_id: content} |
|
439 | 1 | try: |
|
440 | 1 | if ( |
|
441 | "metadata" in content |
||
442 | and "telemetry" in content["metadata"] |
||
443 | and content["metadata"]["telemetry"]["enabled"] |
||
444 | ): |
||
445 | 1 | log.info(f"Handling mef_eline.deployed on EVC id: {evc_id}") |
|
446 | 1 | await self.int_manager.redeploy_int(evcs) |
|
447 | 1 | elif ( |
|
448 | "metadata" in content |
||
449 | and "telemetry_request" in content["metadata"] |
||
450 | and "telemetry" not in content["metadata"] |
||
451 | ): |
||
452 | 1 | log.info(f"Handling mef_eline.deployed on EVC id: {evc_id}") |
|
453 | 1 | await self.int_manager.enable_int(evcs, force=True) |
|
454 | 1 | except EVCError as exc: |
|
455 | 1 | log.error( |
|
456 | f"Failed when handling mef_eline.deployed: {exc}. Analyze the error " |
||
457 | f"and you'll need to enable or redeploy EVC {evc_id} later" |
||
458 | ) |
||
459 | |||
460 | 1 | @alisten_to("kytos/mef_eline.undeployed") |
|
461 | 1 | async def on_evc_undeployed(self, event: KytosEvent) -> None: |
|
462 | """On EVC undeployed.""" |
||
463 | 1 | content = event.content |
|
464 | 1 | if ( |
|
465 | not content["enabled"] |
||
466 | and "metadata" in content |
||
467 | and "telemetry" in content["metadata"] |
||
468 | and content["metadata"]["telemetry"]["enabled"] |
||
469 | ): |
||
470 | 1 | metadata = { |
|
471 | "telemetry": { |
||
472 | "enabled": True, |
||
473 | "status": "DOWN", |
||
474 | "status_reason": ["undeployed"], |
||
475 | "status_updated_at": datetime.utcnow().strftime( |
||
476 | "%Y-%m-%dT%H:%M:%S" |
||
477 | ), |
||
478 | } |
||
479 | } |
||
480 | 1 | evc_id = content["id"] |
|
481 | 1 | evcs = {evc_id: content} |
|
482 | 1 | log.info(f"Handling mef_eline.undeployed on EVC id: {evc_id}") |
|
483 | 1 | await self.int_manager.remove_int_flows(evcs, metadata, force=True) |
|
484 | |||
485 | 1 | @alisten_to("kytos/mef_eline.(redeployed_link_down|redeployed_link_up)") |
|
486 | 1 | async def on_evc_redeployed_link(self, event: KytosEvent) -> None: |
|
487 | """On EVC redeployed_link_down|redeployed_link_up.""" |
||
488 | 1 | content = event.content |
|
489 | 1 | if ( |
|
490 | content["enabled"] |
||
491 | and "metadata" in content |
||
492 | and "telemetry" in content["metadata"] |
||
493 | and content["metadata"]["telemetry"]["enabled"] |
||
494 | ): |
||
495 | 1 | evc_id = content["id"] |
|
496 | 1 | evcs = {evc_id: content} |
|
497 | 1 | log.info(f"Handling {event.name}, EVC id: {evc_id}") |
|
498 | 1 | try: |
|
499 | 1 | await self.int_manager.redeploy_int(evcs) |
|
500 | 1 | except EVCError as exc: |
|
501 | 1 | log.error( |
|
502 | f"Failed to redeploy: {exc}. " |
||
503 | f"Analyze the error and you'll need to redeploy EVC {evc_id} later" |
||
504 | ) |
||
505 | |||
506 | 1 | @alisten_to("kytos/mef_eline.error_redeploy_link_down") |
|
507 | 1 | async def on_evc_error_redeployed_link_down(self, event: KytosEvent) -> None: |
|
508 | """On EVC error_redeploy_link_down, this is supposed to happen when |
||
509 | a path isn't when mef_eline handles a link down.""" |
||
510 | 1 | content = event.content |
|
511 | 1 | if ( |
|
512 | content["enabled"] |
||
513 | and "metadata" in content |
||
514 | and "telemetry" in content["metadata"] |
||
515 | and content["metadata"]["telemetry"]["enabled"] |
||
516 | ): |
||
517 | 1 | metadata = { |
|
518 | "telemetry": { |
||
519 | "enabled": True, |
||
520 | "status": "DOWN", |
||
521 | "status_reason": ["redeployed_link_down_no_path"], |
||
522 | "status_updated_at": datetime.utcnow().strftime( |
||
523 | "%Y-%m-%dT%H:%M:%S" |
||
524 | ), |
||
525 | } |
||
526 | } |
||
527 | 1 | evc_id = content["id"] |
|
528 | 1 | evcs = {evc_id: content} |
|
529 | 1 | log.info( |
|
530 | f"Handling mef_eline.redeployed_link_down_no_path on EVC id: {evc_id}" |
||
531 | ) |
||
532 | 1 | await self.int_manager.remove_int_flows(evcs, metadata, force=True) |
|
533 | |||
534 | 1 | @alisten_to("kytos/mef_eline.failover_link_down") |
|
535 | 1 | async def on_failover_link_down(self, event: KytosEvent): |
|
536 | """Handle kytos/mef_eline.failover_link_down.""" |
||
537 | 1 | await self.int_manager.handle_failover_flows( |
|
538 | copy.deepcopy(event.content), event_name="failover_link_down" |
||
539 | ) |
||
540 | |||
541 | 1 | @alisten_to("kytos/mef_eline.failover_old_path") |
|
542 | 1 | async def on_failover_old_path(self, event: KytosEvent): |
|
543 | """Handle kytos/mef_eline.failover_old_path.""" |
||
544 | 1 | await self.int_manager.handle_failover_flows( |
|
545 | copy.deepcopy(event.content), event_name="failover_old_path" |
||
546 | ) |
||
547 | |||
548 | 1 | @alisten_to("kytos/mef_eline.failover_deployed") |
|
549 | 1 | async def on_failover_deployed(self, event: KytosEvent): |
|
550 | """Handle kytos/mef_eline.failover_deployed.""" |
||
551 | 1 | await self.int_manager.handle_failover_flows( |
|
552 | copy.deepcopy(event.content), event_name="failover_deployed" |
||
553 | ) |
||
554 | |||
555 | 1 | @alisten_to("kytos/topology.link_down") |
|
556 | 1 | async def on_link_down(self, event): |
|
557 | """Handle topology.link_down.""" |
||
558 | 1 | await self.int_manager.handle_pp_link_down(event.content["link"]) |
|
559 | |||
560 | 1 | @alisten_to("kytos/topology.link_up") |
|
561 | 1 | async def on_link_up(self, event): |
|
562 | """Handle topology.link_up.""" |
||
563 | 1 | await self.int_manager.handle_pp_link_up(event.content["link"]) |
|
564 | |||
565 | 1 | @alisten_to("kytos/mef_eline.uni_active_updated") |
|
566 | 1 | async def on_uni_active_updated(self, event: KytosEvent) -> None: |
|
567 | """On mef_eline UNI active updated.""" |
||
568 | 1 | content = event.content |
|
569 | 1 | if ( |
|
570 | "metadata" in content |
||
571 | and "telemetry" in content["metadata"] |
||
572 | and content["metadata"]["telemetry"]["enabled"] |
||
573 | ): |
||
574 | 1 | evc_id, active = content["id"], content["active"] |
|
575 | 1 | log.info( |
|
576 | f"Handling mef_eline.uni_active_updated active {active} " |
||
577 | f"on EVC id: {evc_id}" |
||
578 | ) |
||
579 | |||
580 | 1 | metadata = { |
|
581 | "telemetry": { |
||
582 | "enabled": True, |
||
583 | "status": "UP" if active else "DOWN", |
||
584 | "status_reason": [] if active else ["uni_down"], |
||
585 | "status_updated_at": datetime.utcnow().strftime( |
||
586 | "%Y-%m-%dT%H:%M:%S" |
||
587 | ), |
||
588 | } |
||
589 | } |
||
590 | 1 | await api.add_evcs_metadata({evc_id: content}, metadata) |
|
591 | |||
592 | 1 | @alisten_to("kytos/flow_manager.flow.error") |
|
593 | 1 | async def on_flow_mod_error(self, event: KytosEvent): |
|
594 | """On flow mod errors. |
||
595 | |||
596 | Only OFPT_ERRORs will be handled, telemetry_int already uses force: true |
||
597 | """ |
||
598 | 1 | flow = event.content["flow"] |
|
599 | 1 | if any( |
|
600 | ( |
||
601 | event.content.get("error_exception"), |
||
602 | event.content.get("error_command") != "add", |
||
603 | flow.cookie >> 56 != settings.INT_COOKIE_PREFIX, |
||
604 | ) |
||
605 | ): |
||
606 | return |
||
607 | |||
608 | 1 | async with self._ofpt_error_lock: |
|
609 | 1 | evc_id = utils.get_id_from_cookie(flow.cookie) |
|
610 | 1 | evc = await api.get_evc(evc_id, exclude_archived=False) |
|
611 | 1 | if ( |
|
612 | not evc |
||
613 | or "telemetry" not in evc[evc_id]["metadata"] |
||
614 | or "enabled" not in evc[evc_id]["metadata"]["telemetry"] |
||
615 | or not evc[evc_id]["metadata"]["telemetry"]["enabled"] |
||
616 | ): |
||
617 | return |
||
618 | |||
619 | 1 | metadata = { |
|
620 | "telemetry": { |
||
621 | "enabled": False, |
||
622 | "status": "DOWN", |
||
623 | "status_reason": ["ofpt_error"], |
||
624 | "status_updated_at": datetime.utcnow().strftime( |
||
625 | "%Y-%m-%dT%H:%M:%S" |
||
626 | ), |
||
627 | } |
||
628 | } |
||
629 | 1 | log.error( |
|
630 | f"Disabling EVC({evc_id}) due to OFPT_ERROR, " |
||
631 | f"error_type: {event.content.get('error_type')}, " |
||
632 | f"error_code: {event.content.get('error_code')}, " |
||
633 | f"flow: {flow.as_dict()} " |
||
634 | ) |
||
635 | |||
636 | 1 | evcs = {evc_id: {evc_id: evc_id}} |
|
637 | 1 | await self.int_manager.remove_int_flows(evcs, metadata, force=True) |
|
638 | |||
639 | 1 | @alisten_to("kytos/topology.interfaces.metadata.removed") |
|
640 | 1 | async def on_intf_metadata_removed(self, event: KytosEvent) -> None: |
|
641 | """On interface metadata removed.""" |
||
642 | 1 | await self.int_manager.handle_pp_metadata_removed(event.content["interface"]) |
|
643 | |||
644 | 1 | @alisten_to("kytos/topology.interfaces.metadata.added") |
|
645 | 1 | async def on_intf_metadata_added(self, event: KytosEvent) -> None: |
|
646 | """On interface metadata added.""" |
||
647 | await self.int_manager.handle_pp_metadata_added(event.content["interface"]) |
||
648 |