Code

< 40 %
40-60 %
> 60 %
1
"""Main module of kytos/telemetry Network Application.
2
3
Napp to deploy In-band Network Telemetry over Ethernet Virtual Circuits
4
5
"""
6
7 1
import asyncio
8 1
import copy
9 1
import pathlib
10 1
from datetime import datetime
11
12 1
import napps.kytos.telemetry_int.kytos_api_helper as api
13 1
from napps.kytos.telemetry_int import settings, utils
14 1
from tenacity import RetryError
15
16 1
from kytos.core import KytosEvent, KytosNApp, log, rest
17 1
from kytos.core.common import EntityStatus
18 1
from kytos.core.helpers import alisten_to, avalidate_openapi_request, load_spec
19 1
from kytos.core.rest_api import HTTPException, JSONResponse, Request, aget_json_or_400
20
21 1
from .exceptions import (
22
    EVCError,
23
    EVCHasINT,
24
    EVCHasNoINT,
25
    EVCNotFound,
26
    FlowsNotFound,
27
    ProxyPortError,
28
    ProxyPortNotFound,
29
    ProxyPortSameSourceIntraEVC,
30
    ProxyPortShared,
31
    ProxyPortStatusNotUP,
32
    UnrecoverableError,
33
)
34 1
from .managers.int import INTManager
35
36
37 1
class Main(KytosNApp):
38
    """Main class of kytos/telemetry NApp.
39
40
    This class is the entry point for this NApp.
41
    """
42
43 1
    spec = load_spec(pathlib.Path(__file__).parent / "openapi.yml")
44
45 1
    def setup(self):
46
        """Replace the '__init__' method for the KytosNApp subclass.
47
48
        The setup method is automatically called by the controller when your
49
        application is loaded.
50
51
        So, if you have any setup routine, insert it here.
52
        """
53
54 1
        self.int_manager = INTManager(self.controller)
55 1
        self._ofpt_error_lock = asyncio.Lock()
56
57 1
    def execute(self):
58
        """Run after the setup method execution.
59
60
        You can also use this method in loop mode if you add to the above setup
61
        method a line like the following example:
62
63
            self.execute_as_loop(30)  # 30-second interval.
64
        """
65
66 1
    def shutdown(self):
67
        """Run when your NApp is unloaded.
68
69
        If you have some cleanup procedure, insert it here.
70
        """
71
72 1
    @rest("v1/evc/enable", methods=["POST"])
73 1
    async def enable_telemetry(self, request: Request) -> JSONResponse:
74
        """REST to enable INT flows on EVCs.
75
76
        If a list of evc_ids is empty, it'll enable on non-INT EVCs.
77
        """
78 1
        await avalidate_openapi_request(self.spec, request)
79
80 1
        try:
81 1
            content = await aget_json_or_400(request)
82 1
            evc_ids = content["evc_ids"]
83 1
            force = content.get("force", False)
84 1
            if not isinstance(force, bool):
85
                raise TypeError(f"'force' wrong type: {type(force)} expected bool")
86
        except (TypeError, KeyError):
87
            raise HTTPException(400, detail=f"Invalid payload: {content}")
88
89 1
        try:
90 1
            evcs = (
91
                await api.get_evcs()
92
                if len(evc_ids) != 1
93
                else await api.get_evc(evc_ids[0])
94
            )
95
        except RetryError as exc:
96
            exc_error = str(exc.last_attempt.exception())
97
            log.error(exc_error)
98
            raise HTTPException(503, detail=exc_error)
99
100 1
        if evc_ids:
101 1
            evcs = {evc_id: evcs.get(evc_id, {}) for evc_id in evc_ids}
102
        else:
103
            evcs = {k: v for k, v in evcs.items() if not utils.has_int_enabled(v)}
104
            if not evcs:
105
                # There's no non-INT EVCs to get enabled.
106
                return JSONResponse(list(evcs.keys()))
107
108 1
        try:
109
            # First, it tries to get and remove the existing INT flows like mef_eline
110 1
            stored_flows = await api.get_stored_flows(
111
                [
112
                    utils.get_cookie(evc_id, settings.INT_COOKIE_PREFIX)
113
                    for evc_id in evcs
114
                ]
115
            )
116 1
            await self.int_manager._remove_int_flows_by_cookies(stored_flows)
117 1
            await self.int_manager.enable_int(evcs, force)
118
        except (EVCNotFound, FlowsNotFound, ProxyPortNotFound) as exc:
119
            raise HTTPException(404, detail=str(exc))
120
        except (
121
            EVCHasINT,
122
            ProxyPortStatusNotUP,
123
            ProxyPortSameSourceIntraEVC,
124
            ProxyPortShared,
125
        ) as exc:
126
            raise HTTPException(409, detail=str(exc))
127
        except RetryError as exc:
128
            exc_error = str(exc.last_attempt.exception())
129
            log.error(exc_error)
130
            raise HTTPException(503, detail=exc_error)
131
        except UnrecoverableError as exc:
132
            exc_error = str(exc)
133
            log.error(exc_error)
134
            raise HTTPException(500, detail=exc_error)
135
136 1
        return JSONResponse(list(evcs.keys()), status_code=201)
137
138 1
    @rest("v1/evc/disable", methods=["POST"])
139 1
    async def disable_telemetry(self, request: Request) -> JSONResponse:
140
        """REST to disable/remove INT flows for an EVC_ID
141
142
        If a list of evc_ids is empty, it'll disable on all INT EVCs.
143
        """
144 1
        await avalidate_openapi_request(self.spec, request)
145
146 1
        try:
147 1
            content = await aget_json_or_400(request)
148 1
            evc_ids = content["evc_ids"]
149 1
            force = content.get("force", False)
150 1
            if not isinstance(force, bool):
151
                raise TypeError(f"'force' wrong type: {type(force)} expected bool")
152
        except (TypeError, KeyError):
153
            raise HTTPException(400, detail=f"Invalid payload: {content}")
154
155 1
        try:
156 1
            evcs = (
157
                await api.get_evcs()
158
                if len(evc_ids) != 1
159
                else await api.get_evc(evc_ids[0])
160
            )
161
        except RetryError as exc:
162
            exc_error = str(exc.last_attempt.exception())
163
            log.error(exc_error)
164
            raise HTTPException(503, detail=exc_error)
165
166 1
        if evc_ids:
167 1
            evcs = {evc_id: evcs.get(evc_id, {}) for evc_id in evc_ids}
168
        else:
169
            evcs = {k: v for k, v in evcs.items() if utils.has_int_enabled(v)}
170
            if not evcs:
171
                # There's no INT EVCs to get disabled.
172
                return JSONResponse(list(evcs.keys()))
173
174 1
        try:
175 1
            await self.int_manager.disable_int(evcs, force)
176
        except EVCNotFound as exc:
177
            raise HTTPException(404, detail=str(exc))
178
        except EVCHasNoINT as exc:
179
            raise HTTPException(409, detail=str(exc))
180
        except RetryError as exc:
181
            exc_error = str(exc.last_attempt.exception())
182
            log.error(exc_error)
183
            raise HTTPException(503, detail=exc_error)
184
        except UnrecoverableError as exc:
185
            exc_error = str(exc)
186
            log.error(exc_error)
187
            raise HTTPException(500, detail=exc_error)
188
189 1
        return JSONResponse(list(evcs.keys()))
190
191 1
    @rest("v1/evc")
192 1
    async def get_evcs(self, _request: Request) -> JSONResponse:
193
        """REST to return the list of EVCs with INT enabled"""
194 1
        try:
195 1
            evcs = await api.get_evcs(**{"metadata.telemetry.enabled": "true"})
196 1
            return JSONResponse(evcs)
197
        except RetryError as exc:
198
            exc_error = str(exc.last_attempt.exception())
199
            log.error(exc_error)
200
            raise HTTPException(503, detail=exc_error)
201
        except UnrecoverableError as exc:
202
            exc_error = str(exc)
203
            log.error(exc_error)
204
            raise HTTPException(500, detail=exc_error)
205
206 1
    @rest("v1/evc/redeploy", methods=["PATCH"])
207 1
    async def redeploy_telemetry(self, request: Request) -> JSONResponse:
208
        """REST to redeploy INT on EVCs.
209
210
        If a list of evc_ids is empty, it'll redeploy on all INT EVCs.
211
        """
212 1
        await avalidate_openapi_request(self.spec, request)
213
214 1
        try:
215 1
            content = await aget_json_or_400(request)
216 1
            evc_ids = content["evc_ids"]
217
        except (TypeError, KeyError):
218
            raise HTTPException(400, detail=f"Invalid payload: {content}")
219
220 1
        try:
221 1
            evcs = (
222
                await api.get_evcs()
223
                if len(evc_ids) != 1
224
                else await api.get_evc(evc_ids[0])
225
            )
226
        except RetryError as exc:
227
            exc_error = str(exc.last_attempt.exception())
228
            log.error(exc_error)
229
            raise HTTPException(503, detail=exc_error)
230
231 1
        if evc_ids:
232 1
            evcs = {evc_id: evcs.get(evc_id, {}) for evc_id in evc_ids}
233
        else:
234
            evcs = {k: v for k, v in evcs.items() if utils.has_int_enabled(v)}
235
            if not evcs:
236
                raise HTTPException(404, detail="There aren't INT EVCs to redeploy")
237
238 1
        try:
239 1
            await self.int_manager.redeploy_int(evcs)
240 1
        except (EVCNotFound, FlowsNotFound, ProxyPortNotFound) as exc:
241
            raise HTTPException(404, detail=str(exc))
242 1
        except (EVCHasNoINT, ProxyPortSameSourceIntraEVC, ProxyPortShared) as exc:
243 1
            raise HTTPException(409, detail=str(exc))
244
        except RetryError as exc:
245
            exc_error = str(exc.last_attempt.exception())
246
            log.error(exc_error)
247
            raise HTTPException(503, detail=exc_error)
248
        except UnrecoverableError as exc:
249
            exc_error = str(exc)
250
            log.error(exc_error)
251
            raise HTTPException(500, detail=exc_error)
252
253 1
        return JSONResponse(list(evcs.keys()), status_code=201)
254
255 1
    @rest("v1/evc/compare")
256 1
    async def evc_compare(self, _request: Request) -> JSONResponse:
257
        """List and compare which INT EVCs have flows installed comparing with
258
        mef_eline flows and telemetry metadata. You should use this endpoint
259
        to confirm if both the telemetry metadata is still coherent and also
260
        the minimum expected number of flows. A list of EVCs will get returned
261
        with the inconsistent INT EVCs. If you encounter any inconsistent
262
        EVC you need to analyze the situation and then decide if you'd
263
        like to force enable or disable INT.
264
        """
265
266 1
        try:
267 1
            int_flows, mef_flows, evcs = await asyncio.gather(
268
                api.get_stored_flows(
269
                    [
270
                        (
271
                            settings.INT_COOKIE_PREFIX << 56,
272
                            settings.INT_COOKIE_PREFIX << 56 | 0xFFFFFFFFFFFFFF,
273
                        ),
274
                    ]
275
                ),
276
                api.get_stored_flows(
277
                    [
278
                        (
279
                            settings.MEF_COOKIE_PREFIX << 56,
280
                            settings.MEF_COOKIE_PREFIX << 56 | 0xFFFFFFFFFFFFFF,
281
                        ),
282
                    ]
283
                ),
284
                api.get_evcs(),
285
            )
286
        except RetryError as exc:
287
            exc_error = str(exc.last_attempt.exception())
288
            log.error(exc_error)
289
            raise HTTPException(503, detail=exc_error)
290
        except UnrecoverableError as exc:
291
            exc_error = str(exc)
292
            log.error(exc_error)
293
            raise HTTPException(500, detail=exc_error)
294
295 1
        response = [
296
            {"id": k, "name": evcs[k]["name"], "compare_reason": v}
297
            for k, v in self.int_manager.evc_compare(int_flows, mef_flows, evcs).items()
298
        ]
299 1
        return JSONResponse(response)
300
301 1
    @rest("v1/uni/{interface_id}/proxy_port", methods=["DELETE"])
302 1
    async def delete_proxy_port_metadata(self, request: Request) -> JSONResponse:
303
        """Delete proxy port metadata."""
304 1
        intf_id = request.path_params["interface_id"]
305 1
        intf = self.controller.get_interface_by_id(intf_id)
306 1
        if not intf:
307
            raise HTTPException(404, detail=f"Interface id {intf_id} not found")
308 1
        if "proxy_port" not in intf.metadata:
309 1
            return JSONResponse("Operation successful")
310
311 1
        qparams = request.query_params
312 1
        force = qparams.get("force", "false").lower() == "true"
313
314 1
        try:
315 1
            pp = self.int_manager.srcs_pp[self.int_manager.unis_src[intf_id]]
316 1
            if pp.evc_ids and not force:
317 1
                return JSONResponse(
318
                    {
319
                        "status_code": 409,
320
                        "code": 409,
321
                        "description": f"{pp} is in use on {len(pp.evc_ids)} EVCs",
322
                        "evc_ids": sorted(pp.evc_ids),
323
                    },
324
                    status_code=409,
325
                )
326 1
        except KeyError:
327 1
            pass
328
329 1
        try:
330 1
            await api.delete_proxy_port_metadata(intf_id)
331 1
            return JSONResponse("Operation successful")
332
        except ValueError as exc:
333
            raise HTTPException(404, detail=str(exc))
334
        except UnrecoverableError as exc:
335
            raise HTTPException(500, detail=str(exc))
336
337 1
    @rest("v1/uni/{interface_id}/proxy_port/{port_number:int}", methods=["POST"])
338 1
    async def add_proxy_port_metadata(self, request: Request) -> JSONResponse:
339
        """Add proxy port metadata."""
340 1
        intf_id = request.path_params["interface_id"]
341 1
        port_no = request.path_params["port_number"]
342 1
        qparams = request.query_params
343 1
        if not (intf := self.controller.get_interface_by_id(intf_id)):
344
            raise HTTPException(404, detail=f"Interface id {intf_id} not found")
345 1
        if "proxy_port" in intf.metadata and intf.metadata["proxy_port"] == port_no:
346 1
            return JSONResponse("Operation successful")
347
348 1
        force = qparams.get("force", "false").lower() == "true"
349 1
        try:
350 1
            pp = self.int_manager.get_proxy_port_or_raise(intf_id, "no_evc_id", port_no)
351 1
            if pp.status != EntityStatus.UP and not force:
352 1
                raise HTTPException(409, detail=f"{pp} status isn't UP")
353 1
            self.int_manager._validate_new_dedicated_proxy_port(intf, port_no)
354 1
        except ProxyPortShared as exc:
355 1
            raise HTTPException(409, detail=exc.message)
356 1
        except ProxyPortError as exc:
357
            raise HTTPException(404, detail=exc.message)
358
359 1
        try:
360 1
            await api.add_proxy_port_metadata(intf_id, port_no)
361 1
            return JSONResponse("Operation successful")
362
        except ValueError as exc:
363
            raise HTTPException(404, detail=str(exc))
364
        except UnrecoverableError as exc:
365
            raise HTTPException(500, detail=str(exc))
366
367 1
    @rest("v1/uni/proxy_port")
368 1
    async def list_uni_proxy_ports(self, _request: Request) -> JSONResponse:
369
        """List configured UNI proxy ports."""
370 1
        interfaces_proxy_ports = []
371 1
        for switch in self.controller.switches.copy().values():
372 1
            for intf in switch.interfaces.copy().values():
373 1
                if "proxy_port" in intf.metadata:
374 1
                    payload = {
375
                        "uni": {
376
                            "id": intf.id,
377
                            "status": intf.status.value,
378
                            "status_reason": sorted(intf.status_reason),
379
                        },
380
                        "proxy_port": {
381
                            "port_number": intf.metadata["proxy_port"],
382
                            "status": "DOWN",
383
                            "status_reason": [],
384
                        },
385
                    }
386 1
                    try:
387 1
                        pp = self.int_manager.get_proxy_port_or_raise(
388
                            intf.id, "no_evc_id"
389
                        )
390 1
                        payload["proxy_port"]["status"] = pp.status.value
391 1
                    except ProxyPortError as exc:
392 1
                        payload["proxy_port"]["status_reason"] = [exc.message]
393 1
                    interfaces_proxy_ports.append(payload)
394 1
        return JSONResponse(interfaces_proxy_ports)
395
396 1
    @alisten_to("kytos/mef_eline.evcs_loaded")
397 1
    async def on_mef_eline_evcs_loaded(self, event: KytosEvent) -> None:
398
        """Handle kytos/mef_eline.evcs_loaded."""
399 1
        self.int_manager.load_uni_src_proxy_ports(event.content)
400
401 1
    @alisten_to("kytos/of_multi_table.enable_table")
402 1
    async def on_table_enabled(self, event):
403
        """Handle of_multi_table.enable_table."""
404 1
        table_group = event.content.get("telemetry_int", {})
405 1
        if not table_group:
406 1
            return
407 1
        for group in table_group:
408 1
            if group not in settings.TABLE_GROUP_ALLOWED:
409 1
                log.error(
410
                    f'The table group "{group}" is not allowed for '
411
                    f"telemetry_int. Allowed table groups are "
412
                    f"{settings.TABLE_GROUP_ALLOWED}"
413
                )
414 1
                return
415 1
        self.int_manager.flow_builder.table_group.update(table_group)
416 1
        content = {"group_table": self.int_manager.flow_builder.table_group}
417 1
        event_out = KytosEvent(name="kytos/telemetry_int.enable_table", content=content)
418 1
        await self.controller.buffers.app.aput(event_out)
419
420 1
    @alisten_to("kytos/mef_eline.deleted")
421 1
    async def on_evc_deleted(self, event: KytosEvent) -> None:
422
        """On EVC deleted."""
423 1
        content = event.content
424 1
        if (
425
            "metadata" in content
426
            and "telemetry" in content["metadata"]
427
            and content["metadata"]["telemetry"]["enabled"]
428
        ):
429 1
            evc_id = content["id"]
430 1
            log.info(f"Handling mef_eline.deleted on EVC id: {evc_id}")
431 1
            await self.int_manager.disable_int({evc_id: content}, force=True)
432
433 1
    @alisten_to("kytos/mef_eline.deployed")
434 1
    async def on_evc_deployed(self, event: KytosEvent) -> None:
435
        """On EVC deployed."""
436 1
        content = event.content
437 1
        evc_id = content["id"]
438 1
        evcs = {evc_id: content}
439 1
        try:
440 1
            if (
441
                "metadata" in content
442
                and "telemetry" in content["metadata"]
443
                and content["metadata"]["telemetry"]["enabled"]
444
            ):
445 1
                log.info(f"Handling mef_eline.deployed on EVC id: {evc_id}")
446 1
                await self.int_manager.redeploy_int(evcs)
447 1
            elif (
448
                "metadata" in content
449
                and "telemetry_request" in content["metadata"]
450
                and "telemetry" not in content["metadata"]
451
            ):
452 1
                log.info(f"Handling mef_eline.deployed on EVC id: {evc_id}")
453 1
                await self.int_manager.enable_int(evcs, force=True)
454 1
        except EVCError as exc:
455 1
            log.error(
456
                f"Failed when handling mef_eline.deployed: {exc}. Analyze the error "
457
                f"and you'll need to enable or redeploy EVC {evc_id} later"
458
            )
459
460 1
    @alisten_to("kytos/mef_eline.undeployed")
461 1
    async def on_evc_undeployed(self, event: KytosEvent) -> None:
462
        """On EVC undeployed."""
463 1
        content = event.content
464 1
        if (
465
            not content["enabled"]
466
            and "metadata" in content
467
            and "telemetry" in content["metadata"]
468
            and content["metadata"]["telemetry"]["enabled"]
469
        ):
470 1
            metadata = {
471
                "telemetry": {
472
                    "enabled": True,
473
                    "status": "DOWN",
474
                    "status_reason": ["undeployed"],
475
                    "status_updated_at": datetime.utcnow().strftime(
476
                        "%Y-%m-%dT%H:%M:%S"
477
                    ),
478
                }
479
            }
480 1
            evc_id = content["id"]
481 1
            evcs = {evc_id: content}
482 1
            log.info(f"Handling mef_eline.undeployed on EVC id: {evc_id}")
483 1
            await self.int_manager.remove_int_flows(evcs, metadata, force=True)
484
485 1
    @alisten_to("kytos/mef_eline.(redeployed_link_down|redeployed_link_up)")
486 1
    async def on_evc_redeployed_link(self, event: KytosEvent) -> None:
487
        """On EVC redeployed_link_down|redeployed_link_up."""
488 1
        content = event.content
489 1
        if (
490
            content["enabled"]
491
            and "metadata" in content
492
            and "telemetry" in content["metadata"]
493
            and content["metadata"]["telemetry"]["enabled"]
494
        ):
495 1
            evc_id = content["id"]
496 1
            evcs = {evc_id: content}
497 1
            log.info(f"Handling {event.name}, EVC id: {evc_id}")
498 1
            try:
499 1
                await self.int_manager.redeploy_int(evcs)
500 1
            except EVCError as exc:
501 1
                log.error(
502
                    f"Failed to redeploy: {exc}. "
503
                    f"Analyze the error and you'll need to redeploy EVC {evc_id} later"
504
                )
505
506 1
    @alisten_to("kytos/mef_eline.error_redeploy_link_down")
507 1
    async def on_evc_error_redeployed_link_down(self, event: KytosEvent) -> None:
508
        """On EVC error_redeploy_link_down, this is supposed to happen when
509
        a path isn't when mef_eline handles a link down."""
510 1
        content = event.content
511 1
        if (
512
            content["enabled"]
513
            and "metadata" in content
514
            and "telemetry" in content["metadata"]
515
            and content["metadata"]["telemetry"]["enabled"]
516
        ):
517 1
            metadata = {
518
                "telemetry": {
519
                    "enabled": True,
520
                    "status": "DOWN",
521
                    "status_reason": ["redeployed_link_down_no_path"],
522
                    "status_updated_at": datetime.utcnow().strftime(
523
                        "%Y-%m-%dT%H:%M:%S"
524
                    ),
525
                }
526
            }
527 1
            evc_id = content["id"]
528 1
            evcs = {evc_id: content}
529 1
            log.info(
530
                f"Handling mef_eline.redeployed_link_down_no_path on EVC id: {evc_id}"
531
            )
532 1
            await self.int_manager.remove_int_flows(evcs, metadata, force=True)
533
534 1
    @alisten_to("kytos/mef_eline.failover_link_down")
535 1
    async def on_failover_link_down(self, event: KytosEvent):
536
        """Handle kytos/mef_eline.failover_link_down."""
537 1
        await self.int_manager.handle_failover_flows(
538
            copy.deepcopy(event.content), event_name="failover_link_down"
539
        )
540
541 1
    @alisten_to("kytos/mef_eline.failover_old_path")
542 1
    async def on_failover_old_path(self, event: KytosEvent):
543
        """Handle kytos/mef_eline.failover_old_path."""
544 1
        await self.int_manager.handle_failover_flows(
545
            copy.deepcopy(event.content), event_name="failover_old_path"
546
        )
547
548 1
    @alisten_to("kytos/mef_eline.failover_deployed")
549 1
    async def on_failover_deployed(self, event: KytosEvent):
550
        """Handle kytos/mef_eline.failover_deployed."""
551 1
        await self.int_manager.handle_failover_flows(
552
            copy.deepcopy(event.content), event_name="failover_deployed"
553
        )
554
555 1
    @alisten_to("kytos/topology.link_down")
556 1
    async def on_link_down(self, event):
557
        """Handle topology.link_down."""
558 1
        await self.int_manager.handle_pp_link_down(event.content["link"])
559
560 1
    @alisten_to("kytos/topology.link_up")
561 1
    async def on_link_up(self, event):
562
        """Handle topology.link_up."""
563 1
        await self.int_manager.handle_pp_link_up(event.content["link"])
564
565 1
    @alisten_to("kytos/mef_eline.uni_active_updated")
566 1
    async def on_uni_active_updated(self, event: KytosEvent) -> None:
567
        """On mef_eline UNI active updated."""
568 1
        content = event.content
569 1
        if (
570
            "metadata" in content
571
            and "telemetry" in content["metadata"]
572
            and content["metadata"]["telemetry"]["enabled"]
573
        ):
574 1
            evc_id, active = content["id"], content["active"]
575 1
            log.info(
576
                f"Handling mef_eline.uni_active_updated active {active} "
577
                f"on EVC id: {evc_id}"
578
            )
579
580 1
            metadata = {
581
                "telemetry": {
582
                    "enabled": True,
583
                    "status": "UP" if active else "DOWN",
584
                    "status_reason": [] if active else ["uni_down"],
585
                    "status_updated_at": datetime.utcnow().strftime(
586
                        "%Y-%m-%dT%H:%M:%S"
587
                    ),
588
                }
589
            }
590 1
            await api.add_evcs_metadata({evc_id: content}, metadata)
591
592 1
    @alisten_to("kytos/flow_manager.flow.error")
593 1
    async def on_flow_mod_error(self, event: KytosEvent):
594
        """On flow mod errors.
595
596
        Only OFPT_ERRORs will be handled, telemetry_int already uses force: true
597
        """
598 1
        flow = event.content["flow"]
599 1
        if any(
600
            (
601
                event.content.get("error_exception"),
602
                event.content.get("error_command") != "add",
603
                flow.cookie >> 56 != settings.INT_COOKIE_PREFIX,
604
            )
605
        ):
606
            return
607
608 1
        async with self._ofpt_error_lock:
609 1
            evc_id = utils.get_id_from_cookie(flow.cookie)
610 1
            evc = await api.get_evc(evc_id, exclude_archived=False)
611 1
            if (
612
                not evc
613
                or "telemetry" not in evc[evc_id]["metadata"]
614
                or "enabled" not in evc[evc_id]["metadata"]["telemetry"]
615
                or not evc[evc_id]["metadata"]["telemetry"]["enabled"]
616
            ):
617
                return
618
619 1
            metadata = {
620
                "telemetry": {
621
                    "enabled": False,
622
                    "status": "DOWN",
623
                    "status_reason": ["ofpt_error"],
624
                    "status_updated_at": datetime.utcnow().strftime(
625
                        "%Y-%m-%dT%H:%M:%S"
626
                    ),
627
                }
628
            }
629 1
            log.error(
630
                f"Disabling EVC({evc_id}) due to OFPT_ERROR, "
631
                f"error_type: {event.content.get('error_type')}, "
632
                f"error_code: {event.content.get('error_code')}, "
633
                f"flow: {flow.as_dict()} "
634
            )
635
636 1
            evcs = {evc_id: {evc_id: evc_id}}
637 1
            await self.int_manager.remove_int_flows(evcs, metadata, force=True)
638
639 1
    @alisten_to("kytos/topology.interfaces.metadata.removed")
640 1
    async def on_intf_metadata_removed(self, event: KytosEvent) -> None:
641
        """On interface metadata removed."""
642 1
        await self.int_manager.handle_pp_metadata_removed(event.content["interface"])
643
644 1
    @alisten_to("kytos/topology.interfaces.metadata.added")
645 1
    async def on_intf_metadata_added(self, event: KytosEvent) -> None:
646
        """On interface metadata added."""
647
        await self.int_manager.handle_pp_metadata_added(event.content["interface"])
648