Total Complexity | 98 |
Total Lines | 573 |
Duplicated Lines | 8.9 % |
Coverage | 67.65% |
Changes | 0 |
Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.
Common duplication problems, and corresponding solutions are:
Complex classes like build.main often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
1 | """Main module of kytos/telemetry Network Application. |
||
2 | |||
3 | Napp to deploy In-band Network Telemetry over Ethernet Virtual Circuits |
||
4 | |||
5 | """ |
||
6 | |||
7 | 1 | import asyncio |
|
8 | 1 | import pathlib |
|
9 | 1 | from datetime import datetime |
|
10 | |||
11 | 1 | import napps.kytos.telemetry_int.kytos_api_helper as api |
|
12 | 1 | from napps.kytos.telemetry_int import settings, utils |
|
13 | 1 | from tenacity import RetryError |
|
14 | |||
15 | 1 | from kytos.core import KytosEvent, KytosNApp, log, rest |
|
16 | 1 | from kytos.core.common import EntityStatus |
|
17 | 1 | from kytos.core.helpers import alisten_to, avalidate_openapi_request, load_spec |
|
18 | 1 | from kytos.core.rest_api import HTTPException, JSONResponse, Request, aget_json_or_400 |
|
19 | |||
20 | 1 | from .exceptions import ( |
|
21 | EVCError, |
||
22 | EVCHasINT, |
||
23 | EVCHasNoINT, |
||
24 | EVCNotFound, |
||
25 | FlowsNotFound, |
||
26 | ProxyPortError, |
||
27 | ProxyPortNotFound, |
||
28 | ProxyPortSameSourceIntraEVC, |
||
29 | ProxyPortShared, |
||
30 | ProxyPortStatusNotUP, |
||
31 | UnrecoverableError, |
||
32 | ) |
||
33 | 1 | from .managers.int import INTManager |
|
34 | |||
35 | |||
36 | 1 | class Main(KytosNApp): |
|
37 | """Main class of kytos/telemetry NApp. |
||
38 | |||
39 | This class is the entry point for this NApp. |
||
40 | """ |
||
41 | |||
42 | 1 | spec = load_spec(pathlib.Path(__file__).parent / "openapi.yml") |
|
43 | |||
44 | 1 | def setup(self): |
|
45 | """Replace the '__init__' method for the KytosNApp subclass. |
||
46 | |||
47 | The setup method is automatically called by the controller when your |
||
48 | application is loaded. |
||
49 | |||
50 | So, if you have any setup routine, insert it here. |
||
51 | """ |
||
52 | |||
53 | 1 | self.int_manager = INTManager(self.controller) |
|
54 | 1 | self._ofpt_error_lock = asyncio.Lock() |
|
55 | |||
56 | 1 | def execute(self): |
|
57 | """Run after the setup method execution. |
||
58 | |||
59 | You can also use this method in loop mode if you add to the above setup |
||
60 | method a line like the following example: |
||
61 | |||
62 | self.execute_as_loop(30) # 30-second interval. |
||
63 | """ |
||
64 | |||
65 | 1 | def shutdown(self): |
|
66 | """Run when your NApp is unloaded. |
||
67 | |||
68 | If you have some cleanup procedure, insert it here. |
||
69 | """ |
||
70 | |||
71 | 1 | @rest("v1/evc/enable", methods=["POST"]) |
|
72 | 1 | async def enable_telemetry(self, request: Request) -> JSONResponse: |
|
73 | """REST to enable INT flows on EVCs. |
||
74 | |||
75 | If a list of evc_ids is empty, it'll enable on non-INT EVCs. |
||
76 | """ |
||
77 | 1 | await avalidate_openapi_request(self.spec, request) |
|
78 | |||
79 | 1 | try: |
|
80 | 1 | content = await aget_json_or_400(request) |
|
81 | 1 | evc_ids = content["evc_ids"] |
|
82 | 1 | force = content.get("force", False) |
|
83 | 1 | if not isinstance(force, bool): |
|
84 | raise TypeError(f"'force' wrong type: {type(force)} expected bool") |
||
85 | except (TypeError, KeyError): |
||
86 | raise HTTPException(400, detail=f"Invalid payload: {content}") |
||
87 | |||
88 | 1 | try: |
|
89 | 1 | evcs = ( |
|
90 | await api.get_evcs() |
||
91 | if len(evc_ids) != 1 |
||
92 | else await api.get_evc(evc_ids[0]) |
||
93 | ) |
||
94 | except RetryError as exc: |
||
95 | exc_error = str(exc.last_attempt.exception()) |
||
96 | log.error(exc_error) |
||
97 | raise HTTPException(503, detail=exc_error) |
||
98 | |||
99 | 1 | if evc_ids: |
|
100 | 1 | evcs = {evc_id: evcs.get(evc_id, {}) for evc_id in evc_ids} |
|
101 | else: |
||
102 | evcs = {k: v for k, v in evcs.items() if not utils.has_int_enabled(v)} |
||
103 | if not evcs: |
||
104 | # There's no non-INT EVCs to get enabled. |
||
105 | return JSONResponse(list(evcs.keys())) |
||
106 | |||
107 | 1 | try: |
|
108 | # First, it tries to get and remove the existing INT flows like mef_eline |
||
109 | 1 | stored_flows = await api.get_stored_flows( |
|
110 | [ |
||
111 | utils.get_cookie(evc_id, settings.INT_COOKIE_PREFIX) |
||
112 | for evc_id in evcs |
||
113 | ] |
||
114 | ) |
||
115 | 1 | await self.int_manager._remove_int_flows_by_cookies(stored_flows) |
|
116 | 1 | await self.int_manager.enable_int(evcs, force) |
|
117 | except (EVCNotFound, FlowsNotFound, ProxyPortNotFound) as exc: |
||
118 | raise HTTPException(404, detail=str(exc)) |
||
119 | except ( |
||
120 | EVCHasINT, |
||
121 | ProxyPortStatusNotUP, |
||
122 | ProxyPortSameSourceIntraEVC, |
||
123 | ProxyPortShared, |
||
124 | ) as exc: |
||
125 | raise HTTPException(409, detail=str(exc)) |
||
126 | except RetryError as exc: |
||
127 | exc_error = str(exc.last_attempt.exception()) |
||
128 | log.error(exc_error) |
||
129 | raise HTTPException(503, detail=exc_error) |
||
130 | except UnrecoverableError as exc: |
||
131 | exc_error = str(exc) |
||
132 | log.error(exc_error) |
||
133 | raise HTTPException(500, detail=exc_error) |
||
134 | |||
135 | 1 | return JSONResponse(list(evcs.keys()), status_code=201) |
|
136 | |||
137 | 1 | @rest("v1/evc/disable", methods=["POST"]) |
|
138 | 1 | async def disable_telemetry(self, request: Request) -> JSONResponse: |
|
139 | """REST to disable/remove INT flows for an EVC_ID |
||
140 | |||
141 | If a list of evc_ids is empty, it'll disable on all INT EVCs. |
||
142 | """ |
||
143 | 1 | await avalidate_openapi_request(self.spec, request) |
|
144 | |||
145 | 1 | try: |
|
146 | 1 | content = await aget_json_or_400(request) |
|
147 | 1 | evc_ids = content["evc_ids"] |
|
148 | 1 | force = content.get("force", False) |
|
149 | 1 | if not isinstance(force, bool): |
|
150 | raise TypeError(f"'force' wrong type: {type(force)} expected bool") |
||
151 | except (TypeError, KeyError): |
||
152 | raise HTTPException(400, detail=f"Invalid payload: {content}") |
||
153 | |||
154 | 1 | try: |
|
155 | 1 | evcs = ( |
|
156 | await api.get_evcs() |
||
157 | if len(evc_ids) != 1 |
||
158 | else await api.get_evc(evc_ids[0]) |
||
159 | ) |
||
160 | except RetryError as exc: |
||
161 | exc_error = str(exc.last_attempt.exception()) |
||
162 | log.error(exc_error) |
||
163 | raise HTTPException(503, detail=exc_error) |
||
164 | |||
165 | 1 | if evc_ids: |
|
166 | 1 | evcs = {evc_id: evcs.get(evc_id, {}) for evc_id in evc_ids} |
|
167 | else: |
||
168 | evcs = {k: v for k, v in evcs.items() if utils.has_int_enabled(v)} |
||
169 | if not evcs: |
||
170 | # There's no INT EVCs to get disabled. |
||
171 | return JSONResponse(list(evcs.keys())) |
||
172 | |||
173 | 1 | try: |
|
174 | 1 | await self.int_manager.disable_int(evcs, force) |
|
175 | except EVCNotFound as exc: |
||
176 | raise HTTPException(404, detail=str(exc)) |
||
177 | except EVCHasNoINT as exc: |
||
178 | raise HTTPException(409, detail=str(exc)) |
||
179 | except RetryError as exc: |
||
180 | exc_error = str(exc.last_attempt.exception()) |
||
181 | log.error(exc_error) |
||
182 | raise HTTPException(503, detail=exc_error) |
||
183 | except UnrecoverableError as exc: |
||
184 | exc_error = str(exc) |
||
185 | log.error(exc_error) |
||
186 | raise HTTPException(500, detail=exc_error) |
||
187 | |||
188 | 1 | return JSONResponse(list(evcs.keys())) |
|
189 | |||
190 | 1 | @rest("v1/evc") |
|
191 | 1 | async def get_evcs(self, _request: Request) -> JSONResponse: |
|
192 | """REST to return the list of EVCs with INT enabled""" |
||
193 | 1 | try: |
|
194 | 1 | evcs = await api.get_evcs(**{"metadata.telemetry.enabled": "true"}) |
|
195 | 1 | return JSONResponse(evcs) |
|
196 | except RetryError as exc: |
||
197 | exc_error = str(exc.last_attempt.exception()) |
||
198 | log.error(exc_error) |
||
199 | raise HTTPException(503, detail=exc_error) |
||
200 | except UnrecoverableError as exc: |
||
201 | exc_error = str(exc) |
||
202 | log.error(exc_error) |
||
203 | raise HTTPException(500, detail=exc_error) |
||
204 | |||
205 | 1 | @rest("v1/evc/redeploy", methods=["PATCH"]) |
|
206 | 1 | async def redeploy_telemetry(self, request: Request) -> JSONResponse: |
|
207 | """REST to redeploy INT on EVCs. |
||
208 | |||
209 | If a list of evc_ids is empty, it'll redeploy on all INT EVCs. |
||
210 | """ |
||
211 | 1 | await avalidate_openapi_request(self.spec, request) |
|
212 | |||
213 | 1 | try: |
|
214 | 1 | content = await aget_json_or_400(request) |
|
215 | 1 | evc_ids = content["evc_ids"] |
|
216 | except (TypeError, KeyError): |
||
217 | raise HTTPException(400, detail=f"Invalid payload: {content}") |
||
218 | |||
219 | 1 | try: |
|
220 | 1 | evcs = ( |
|
221 | await api.get_evcs() |
||
222 | if len(evc_ids) != 1 |
||
223 | else await api.get_evc(evc_ids[0]) |
||
224 | ) |
||
225 | except RetryError as exc: |
||
226 | exc_error = str(exc.last_attempt.exception()) |
||
227 | log.error(exc_error) |
||
228 | raise HTTPException(503, detail=exc_error) |
||
229 | |||
230 | 1 | if evc_ids: |
|
231 | 1 | evcs = {evc_id: evcs.get(evc_id, {}) for evc_id in evc_ids} |
|
232 | else: |
||
233 | evcs = {k: v for k, v in evcs.items() if utils.has_int_enabled(v)} |
||
234 | if not evcs: |
||
235 | raise HTTPException(404, detail="There aren't INT EVCs to redeploy") |
||
236 | |||
237 | 1 | try: |
|
238 | 1 | await self.int_manager.redeploy_int(evcs) |
|
239 | 1 | except (EVCNotFound, FlowsNotFound, ProxyPortNotFound) as exc: |
|
240 | raise HTTPException(404, detail=str(exc)) |
||
241 | 1 | except (EVCHasNoINT, ProxyPortSameSourceIntraEVC, ProxyPortShared) as exc: |
|
242 | 1 | raise HTTPException(409, detail=str(exc)) |
|
243 | except RetryError as exc: |
||
244 | exc_error = str(exc.last_attempt.exception()) |
||
245 | log.error(exc_error) |
||
246 | raise HTTPException(503, detail=exc_error) |
||
247 | except UnrecoverableError as exc: |
||
248 | exc_error = str(exc) |
||
249 | log.error(exc_error) |
||
250 | raise HTTPException(500, detail=exc_error) |
||
251 | |||
252 | 1 | return JSONResponse(list(evcs.keys()), status_code=201) |
|
253 | |||
254 | 1 | @rest("v1/evc/compare") |
|
255 | 1 | async def evc_compare(self, _request: Request) -> JSONResponse: |
|
256 | """List and compare which INT EVCs have flows installed comparing with |
||
257 | mef_eline flows and telemetry metadata. You should use this endpoint |
||
258 | to confirm if both the telemetry metadata is still coherent and also |
||
259 | the minimum expected number of flows. A list of EVCs will get returned |
||
260 | with the inconsistent INT EVCs. If you encounter any inconsistent |
||
261 | EVC you need to analyze the situation and then decide if you'd |
||
262 | like to force enable or disable INT. |
||
263 | """ |
||
264 | |||
265 | 1 | try: |
|
266 | 1 | int_flows, mef_flows, evcs = await asyncio.gather( |
|
267 | api.get_stored_flows( |
||
268 | [ |
||
269 | ( |
||
270 | settings.INT_COOKIE_PREFIX << 56, |
||
271 | settings.INT_COOKIE_PREFIX << 56 | 0xFFFFFFFFFFFFFF, |
||
272 | ), |
||
273 | ] |
||
274 | ), |
||
275 | api.get_stored_flows( |
||
276 | [ |
||
277 | ( |
||
278 | settings.MEF_COOKIE_PREFIX << 56, |
||
279 | settings.MEF_COOKIE_PREFIX << 56 | 0xFFFFFFFFFFFFFF, |
||
280 | ), |
||
281 | ] |
||
282 | ), |
||
283 | api.get_evcs(), |
||
284 | ) |
||
285 | except RetryError as exc: |
||
286 | exc_error = str(exc.last_attempt.exception()) |
||
287 | log.error(exc_error) |
||
288 | raise HTTPException(503, detail=exc_error) |
||
289 | except UnrecoverableError as exc: |
||
290 | exc_error = str(exc) |
||
291 | log.error(exc_error) |
||
292 | raise HTTPException(500, detail=exc_error) |
||
293 | |||
294 | 1 | response = [ |
|
295 | {"id": k, "name": evcs[k]["name"], "compare_reason": v} |
||
296 | for k, v in self.int_manager.evc_compare(int_flows, mef_flows, evcs).items() |
||
297 | ] |
||
298 | 1 | return JSONResponse(response) |
|
299 | |||
300 | 1 | @rest("v1/uni/{interface_id}/proxy_port", methods=["DELETE"]) |
|
301 | 1 | async def delete_proxy_port_metadata(self, request: Request) -> JSONResponse: |
|
302 | """Delete proxy port metadata.""" |
||
303 | 1 | intf_id = request.path_params["interface_id"] |
|
304 | 1 | if ( |
|
305 | intf := self.controller.get_interface_by_id(intf_id) |
||
306 | ) and "proxy_port" not in intf.metadata: |
||
307 | 1 | return JSONResponse("Operation successful") |
|
308 | |||
309 | 1 | qparams = request.query_params |
|
310 | 1 | force = qparams.get("force", "false").lower() == "true" |
|
311 | 1 | try: |
|
312 | 1 | pp = self.int_manager.get_proxy_port_or_raise(intf_id, "no_evc_id") |
|
313 | 1 | if pp.evc_ids and not force: |
|
314 | 1 | return JSONResponse( |
|
315 | { |
||
316 | "status_code": 409, |
||
317 | "code": 409, |
||
318 | "description": f"{pp} is in use on {len(pp.evc_ids)} EVCs", |
||
319 | "evc_ids": sorted(pp.evc_ids), |
||
320 | }, |
||
321 | status_code=409, |
||
322 | ) |
||
323 | except ProxyPortError as exc: |
||
324 | raise HTTPException(404, detail=exc.message) |
||
325 | |||
326 | 1 | try: |
|
327 | 1 | await api.delete_proxy_port_metadata(intf_id) |
|
328 | 1 | return JSONResponse("Operation successful") |
|
329 | except ValueError as exc: |
||
330 | raise HTTPException(404, detail=str(exc)) |
||
331 | except UnrecoverableError as exc: |
||
332 | raise HTTPException(500, detail=str(exc)) |
||
333 | |||
334 | 1 | @rest("v1/uni/{interface_id}/proxy_port/{port_number:int}", methods=["POST"]) |
|
335 | 1 | async def add_proxy_port_metadata(self, request: Request) -> JSONResponse: |
|
336 | """Add proxy port metadata.""" |
||
337 | 1 | intf_id = request.path_params["interface_id"] |
|
338 | 1 | port_no = request.path_params["port_number"] |
|
339 | 1 | qparams = request.query_params |
|
340 | 1 | if not (intf := self.controller.get_interface_by_id(intf_id)): |
|
341 | raise HTTPException(404, detail=f"Interface id {intf_id} not found") |
||
342 | 1 | if "proxy_port" in intf.metadata and intf.metadata["proxy_port"] == port_no: |
|
343 | 1 | return JSONResponse("Operation successful") |
|
344 | |||
345 | 1 | force = qparams.get("force", "false").lower() == "true" |
|
346 | 1 | try: |
|
347 | 1 | pp = self.int_manager.get_proxy_port_or_raise(intf_id, "no_evc_id", port_no) |
|
348 | 1 | if pp.status != EntityStatus.UP and not force: |
|
349 | 1 | raise HTTPException(409, detail=f"{pp} status isn't UP") |
|
350 | 1 | self.int_manager._validate_new_dedicated_proxy_port(intf, port_no) |
|
351 | 1 | except ProxyPortShared as exc: |
|
352 | 1 | raise HTTPException(409, detail=exc.message) |
|
353 | 1 | except ProxyPortError as exc: |
|
354 | raise HTTPException(404, detail=exc.message) |
||
355 | |||
356 | 1 | try: |
|
357 | 1 | await api.add_proxy_port_metadata(intf_id, port_no) |
|
358 | 1 | return JSONResponse("Operation successful") |
|
359 | except ValueError as exc: |
||
360 | raise HTTPException(404, detail=str(exc)) |
||
361 | except UnrecoverableError as exc: |
||
362 | raise HTTPException(500, detail=str(exc)) |
||
363 | |||
364 | 1 | @rest("v1/uni/proxy_port") |
|
365 | 1 | async def list_uni_proxy_ports(self, _request: Request) -> JSONResponse: |
|
366 | """List configured UNI proxy ports.""" |
||
367 | 1 | interfaces_proxy_ports = [] |
|
368 | 1 | for switch in self.controller.switches.copy().values(): |
|
369 | 1 | for intf in switch.interfaces.copy().values(): |
|
370 | 1 | if "proxy_port" in intf.metadata: |
|
371 | 1 | payload = { |
|
372 | "uni": { |
||
373 | "id": intf.id, |
||
374 | "status": intf.status.value, |
||
375 | "status_reason": sorted(intf.status_reason), |
||
376 | }, |
||
377 | "proxy_port": { |
||
378 | "port_number": intf.metadata["proxy_port"], |
||
379 | "status": "DOWN", |
||
380 | "status_reason": [], |
||
381 | }, |
||
382 | } |
||
383 | 1 | try: |
|
384 | 1 | pp = self.int_manager.get_proxy_port_or_raise( |
|
385 | intf.id, "no_evc_id" |
||
386 | ) |
||
387 | 1 | payload["proxy_port"]["status"] = pp.status.value |
|
388 | 1 | except ProxyPortError as exc: |
|
389 | 1 | payload["proxy_port"]["status_reason"] = [exc.message] |
|
390 | 1 | interfaces_proxy_ports.append(payload) |
|
391 | 1 | return JSONResponse(interfaces_proxy_ports) |
|
392 | |||
393 | 1 | @alisten_to("kytos/mef_eline.evcs_loaded") |
|
394 | 1 | async def on_mef_eline_evcs_loaded(self, event: KytosEvent) -> None: |
|
395 | """Handle kytos/mef_eline.evcs_loaded.""" |
||
396 | 1 | self.int_manager.load_uni_src_proxy_ports(event.content) |
|
397 | |||
398 | 1 | @alisten_to("kytos/of_multi_table.enable_table") |
|
399 | 1 | async def on_table_enabled(self, event): |
|
400 | """Handle of_multi_table.enable_table.""" |
||
401 | 1 | table_group = event.content.get("telemetry_int", {}) |
|
402 | 1 | if not table_group: |
|
403 | 1 | return |
|
404 | 1 | for group in table_group: |
|
405 | 1 | if group not in settings.TABLE_GROUP_ALLOWED: |
|
406 | 1 | log.error( |
|
407 | f'The table group "{group}" is not allowed for ' |
||
408 | f"telemetry_int. Allowed table groups are " |
||
409 | f"{settings.TABLE_GROUP_ALLOWED}" |
||
410 | ) |
||
411 | 1 | return |
|
412 | 1 | self.int_manager.flow_builder.table_group.update(table_group) |
|
413 | 1 | content = {"group_table": self.int_manager.flow_builder.table_group} |
|
414 | 1 | event_out = KytosEvent(name="kytos/telemetry_int.enable_table", content=content) |
|
415 | 1 | await self.controller.buffers.app.aput(event_out) |
|
416 | |||
417 | 1 | @alisten_to("kytos/mef_eline.deleted") |
|
418 | 1 | async def on_evc_deleted(self, event: KytosEvent) -> None: |
|
419 | """On EVC deleted.""" |
||
420 | 1 | content = event.content |
|
421 | 1 | if ( |
|
422 | "metadata" in content |
||
423 | and "telemetry" in content["metadata"] |
||
424 | and content["metadata"]["telemetry"]["enabled"] |
||
425 | ): |
||
426 | 1 | evc_id = content["id"] |
|
427 | 1 | log.info(f"Handling mef_eline.deleted on EVC id: {evc_id}") |
|
428 | 1 | await self.int_manager.disable_int({evc_id: content}, force=True) |
|
429 | |||
430 | 1 | @alisten_to("kytos/mef_eline.deployed") |
|
431 | 1 | async def on_evc_deployed(self, event: KytosEvent) -> None: |
|
432 | """On EVC deployed.""" |
||
433 | 1 | content = event.content |
|
434 | 1 | evc_id = content["id"] |
|
435 | 1 | evcs = {evc_id: content} |
|
436 | 1 | try: |
|
437 | 1 | if ( |
|
438 | "metadata" in content |
||
439 | and "telemetry" in content["metadata"] |
||
440 | and content["metadata"]["telemetry"]["enabled"] |
||
441 | ): |
||
442 | 1 | log.info(f"Handling mef_eline.deployed on EVC id: {evc_id}") |
|
443 | 1 | await self.int_manager.redeploy_int(evcs) |
|
444 | 1 | elif ( |
|
445 | "metadata" in content |
||
446 | and "telemetry_request" in content["metadata"] |
||
447 | and "telemetry" not in content["metadata"] |
||
448 | ): |
||
449 | 1 | log.info(f"Handling mef_eline.deployed on EVC id: {evc_id}") |
|
450 | 1 | await self.int_manager.enable_int(evcs, force=True) |
|
451 | 1 | except EVCError as exc: |
|
452 | 1 | log.error( |
|
453 | f"Failed when handling mef_eline.deployed: {exc}. Analyze the error " |
||
454 | f"and you'll need to enable or redeploy EVC {evc_id} later" |
||
455 | ) |
||
456 | |||
457 | 1 | @alisten_to("kytos/mef_eline.undeployed") |
|
458 | 1 | async def on_evc_undeployed(self, event: KytosEvent) -> None: |
|
459 | """On EVC undeployed.""" |
||
460 | 1 | content = event.content |
|
461 | 1 | if ( |
|
462 | not content["enabled"] |
||
463 | and "metadata" in content |
||
464 | and "telemetry" in content["metadata"] |
||
465 | and content["metadata"]["telemetry"]["enabled"] |
||
466 | ): |
||
467 | 1 | metadata = { |
|
468 | "telemetry": { |
||
469 | "enabled": True, |
||
470 | "status": "DOWN", |
||
471 | "status_reason": ["undeployed"], |
||
472 | "status_updated_at": datetime.utcnow().strftime( |
||
473 | "%Y-%m-%dT%H:%M:%S" |
||
474 | ), |
||
475 | } |
||
476 | } |
||
477 | 1 | evc_id = content["id"] |
|
478 | 1 | evcs = {evc_id: content} |
|
479 | 1 | log.info(f"Handling mef_eline.undeployed on EVC id: {evc_id}") |
|
480 | 1 | await self.int_manager.remove_int_flows(evcs, metadata, force=True) |
|
481 | |||
482 | 1 | @alisten_to("kytos/mef_eline.(redeployed_link_down|redeployed_link_up)") |
|
483 | 1 | async def on_evc_redeployed_link(self, event: KytosEvent) -> None: |
|
484 | """On EVC redeployed_link_down|redeployed_link_up.""" |
||
485 | 1 | content = event.content |
|
486 | 1 | if ( |
|
487 | content["enabled"] |
||
488 | and "metadata" in content |
||
489 | and "telemetry" in content["metadata"] |
||
490 | and content["metadata"]["telemetry"]["enabled"] |
||
491 | ): |
||
492 | 1 | evc_id = content["id"] |
|
493 | 1 | evcs = {evc_id: content} |
|
494 | 1 | log.info(f"Handling {event.name}, EVC id: {evc_id}") |
|
495 | 1 | try: |
|
496 | 1 | await self.int_manager.redeploy_int(evcs) |
|
497 | 1 | except EVCError as exc: |
|
498 | 1 | log.error( |
|
499 | f"Failed to redeploy: {exc}. " |
||
500 | f"Analyze the error and you'll need to redeploy EVC {evc_id} later" |
||
501 | ) |
||
502 | |||
503 | 1 | @alisten_to("kytos/mef_eline.error_redeploy_link_down") |
|
504 | 1 | async def on_evc_error_redeployed_link_down(self, event: KytosEvent) -> None: |
|
505 | """On EVC error_redeploy_link_down, this is supposed to happen when |
||
506 | a path isn't when mef_eline handles a link down.""" |
||
507 | 1 | content = event.content |
|
508 | 1 | if ( |
|
509 | content["enabled"] |
||
510 | and "metadata" in content |
||
511 | and "telemetry" in content["metadata"] |
||
512 | and content["metadata"]["telemetry"]["enabled"] |
||
513 | ): |
||
514 | 1 | metadata = { |
|
515 | "telemetry": { |
||
516 | "enabled": True, |
||
517 | "status": "DOWN", |
||
518 | "status_reason": ["redeployed_link_down_no_path"], |
||
519 | "status_updated_at": datetime.utcnow().strftime( |
||
520 | "%Y-%m-%dT%H:%M:%S" |
||
521 | ), |
||
522 | } |
||
523 | } |
||
524 | 1 | evc_id = content["id"] |
|
525 | 1 | evcs = {evc_id: content} |
|
526 | 1 | log.info( |
|
527 | f"Handling mef_eline.redeployed_link_down_no_path on EVC id: {evc_id}" |
||
528 | ) |
||
529 | 1 | await self.int_manager.remove_int_flows(evcs, metadata, force=True) |
|
530 | |||
531 | 1 | @alisten_to("kytos/mef_eline.failover_link_down") |
|
532 | 1 | async def on_failover_link_down(self, event: KytosEvent): |
|
533 | """Handle kytos/mef_eline.failover_link_down.""" |
||
534 | 1 | await self.int_manager.handle_failover_flows( |
|
535 | event.content, event_name="failover_link_down" |
||
536 | ) |
||
537 | |||
538 | 1 | @alisten_to("kytos/mef_eline.failover_old_path") |
|
539 | 1 | async def on_failover_old_path(self, event: KytosEvent): |
|
540 | """Handle kytos/mef_eline.failover_old_path.""" |
||
541 | 1 | await self.int_manager.handle_failover_flows( |
|
542 | event.content, event_name="failover_old_path" |
||
543 | ) |
||
544 | |||
545 | 1 | @alisten_to("kytos/mef_eline.failover_deployed") |
|
546 | 1 | async def on_failover_deployed(self, event: KytosEvent): |
|
547 | """Handle kytos/mef_eline.failover_deployed.""" |
||
548 | 1 | await self.int_manager.handle_failover_flows( |
|
549 | event.content, event_name="failover_deployed" |
||
550 | ) |
||
551 | |||
552 | 1 | @alisten_to("kytos/topology.link_down") |
|
553 | 1 | async def on_link_down(self, event): |
|
554 | """Handle topology.link_down.""" |
||
555 | 1 | await self.int_manager.handle_pp_link_down(event.content["link"]) |
|
556 | |||
557 | 1 | @alisten_to("kytos/topology.link_up") |
|
558 | 1 | async def on_link_up(self, event): |
|
559 | """Handle topology.link_up.""" |
||
560 | 1 | await self.int_manager.handle_pp_link_up(event.content["link"]) |
|
561 | |||
562 | 1 | @alisten_to("kytos/mef_eline.uni_active_updated") |
|
563 | 1 | async def on_uni_active_updated(self, event: KytosEvent) -> None: |
|
564 | """On mef_eline UNI active updated.""" |
||
565 | 1 | content = event.content |
|
566 | 1 | if ( |
|
567 | "metadata" in content |
||
568 | and "telemetry" in content["metadata"] |
||
569 | and content["metadata"]["telemetry"]["enabled"] |
||
570 | ): |
||
571 | 1 | evc_id, active = content["id"], content["active"] |
|
572 | 1 | log.info( |
|
573 | f"Handling mef_eline.uni_active_updated active {active} " |
||
645 |