Passed
Push — master ( cce0f4...72819e )
by Vinicius
05:15 queued 14s
created

build.main.Main.redeploy_telemetry()   C

Complexity

Conditions 10

Size

Total Lines 48
Code Lines 37

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 16
CRAP Score 24.8371

Importance

Changes 0
Metric Value
cc 10
eloc 37
nop 2
dl 0
loc 48
ccs 16
cts 34
cp 0.4706
crap 24.8371
rs 5.9999
c 0
b 0
f 0

How to fix   Complexity   

Complexity

Complex classes like build.main.Main.redeploy_telemetry() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""Main module of kytos/telemetry Network Application.
2
3
Napp to deploy In-band Network Telemetry over Ethernet Virtual Circuits
4
5
"""
6
7 1
import asyncio
8 1
import pathlib
9 1
from datetime import datetime
10
11 1
import napps.kytos.telemetry_int.kytos_api_helper as api
12 1
from napps.kytos.telemetry_int import settings, utils
13 1
from tenacity import RetryError
14
15 1
from kytos.core import KytosEvent, KytosNApp, log, rest
16 1
from kytos.core.common import EntityStatus
17 1
from kytos.core.helpers import alisten_to, avalidate_openapi_request, load_spec
18 1
from kytos.core.rest_api import HTTPException, JSONResponse, Request, aget_json_or_400
19
20 1
from .exceptions import (
21
    EVCError,
22
    EVCHasINT,
23
    EVCHasNoINT,
24
    EVCNotFound,
25
    FlowsNotFound,
26
    ProxyPortError,
27
    ProxyPortNotFound,
28
    ProxyPortSameSourceIntraEVC,
29
    ProxyPortShared,
30
    ProxyPortStatusNotUP,
31
    UnrecoverableError,
32
)
33 1
from .managers.int import INTManager
34
35
36 1
class Main(KytosNApp):
37
    """Main class of kytos/telemetry NApp.
38
39
    This class is the entry point for this NApp.
40
    """
41
42 1
    spec = load_spec(pathlib.Path(__file__).parent / "openapi.yml")
43
44 1
    def setup(self):
45
        """Replace the '__init__' method for the KytosNApp subclass.
46
47
        The setup method is automatically called by the controller when your
48
        application is loaded.
49
50
        So, if you have any setup routine, insert it here.
51
        """
52
53 1
        self.int_manager = INTManager(self.controller)
54 1
        self._ofpt_error_lock = asyncio.Lock()
55
56 1
    def execute(self):
57
        """Run after the setup method execution.
58
59
        You can also use this method in loop mode if you add to the above setup
60
        method a line like the following example:
61
62
            self.execute_as_loop(30)  # 30-second interval.
63
        """
64
65 1
    def shutdown(self):
66
        """Run when your NApp is unloaded.
67
68
        If you have some cleanup procedure, insert it here.
69
        """
70
71 1
    @rest("v1/evc/enable", methods=["POST"])
72 1
    async def enable_telemetry(self, request: Request) -> JSONResponse:
73
        """REST to enable INT flows on EVCs.
74
75
        If a list of evc_ids is empty, it'll enable on non-INT EVCs.
76
        """
77 1
        await avalidate_openapi_request(self.spec, request)
78
79 1
        try:
80 1
            content = await aget_json_or_400(request)
81 1
            evc_ids = content["evc_ids"]
82 1
            force = content.get("force", False)
83 1
            if not isinstance(force, bool):
84
                raise TypeError(f"'force' wrong type: {type(force)} expected bool")
85
        except (TypeError, KeyError):
86
            raise HTTPException(400, detail=f"Invalid payload: {content}")
87
88 1
        try:
89 1
            evcs = (
90
                await api.get_evcs()
91
                if len(evc_ids) != 1
92
                else await api.get_evc(evc_ids[0])
93
            )
94
        except RetryError as exc:
95
            exc_error = str(exc.last_attempt.exception())
96
            log.error(exc_error)
97
            raise HTTPException(503, detail=exc_error)
98
99 1
        if evc_ids:
100 1
            evcs = {evc_id: evcs.get(evc_id, {}) for evc_id in evc_ids}
101
        else:
102
            evcs = {k: v for k, v in evcs.items() if not utils.has_int_enabled(v)}
103
            if not evcs:
104
                # There's no non-INT EVCs to get enabled.
105
                return JSONResponse(list(evcs.keys()))
106
107 1
        try:
108
            # First, it tries to get and remove the existing INT flows like mef_eline
109 1
            stored_flows = await api.get_stored_flows(
110
                [
111
                    utils.get_cookie(evc_id, settings.INT_COOKIE_PREFIX)
112
                    for evc_id in evcs
113
                ]
114
            )
115 1
            await self.int_manager._remove_int_flows_by_cookies(stored_flows)
116 1
            await self.int_manager.enable_int(evcs, force)
117
        except (EVCNotFound, FlowsNotFound, ProxyPortNotFound) as exc:
118
            raise HTTPException(404, detail=str(exc))
119
        except (
120
            EVCHasINT,
121
            ProxyPortStatusNotUP,
122
            ProxyPortSameSourceIntraEVC,
123
            ProxyPortShared,
124
        ) as exc:
125
            raise HTTPException(409, detail=str(exc))
126
        except RetryError as exc:
127
            exc_error = str(exc.last_attempt.exception())
128
            log.error(exc_error)
129
            raise HTTPException(503, detail=exc_error)
130
        except UnrecoverableError as exc:
131
            exc_error = str(exc)
132
            log.error(exc_error)
133
            raise HTTPException(500, detail=exc_error)
134
135 1
        return JSONResponse(list(evcs.keys()), status_code=201)
136
137 1
    @rest("v1/evc/disable", methods=["POST"])
138 1
    async def disable_telemetry(self, request: Request) -> JSONResponse:
139
        """REST to disable/remove INT flows for an EVC_ID
140
141
        If a list of evc_ids is empty, it'll disable on all INT EVCs.
142
        """
143 1
        await avalidate_openapi_request(self.spec, request)
144
145 1
        try:
146 1
            content = await aget_json_or_400(request)
147 1
            evc_ids = content["evc_ids"]
148 1
            force = content.get("force", False)
149 1
            if not isinstance(force, bool):
150
                raise TypeError(f"'force' wrong type: {type(force)} expected bool")
151
        except (TypeError, KeyError):
152
            raise HTTPException(400, detail=f"Invalid payload: {content}")
153
154 1
        try:
155 1
            evcs = (
156
                await api.get_evcs()
157
                if len(evc_ids) != 1
158
                else await api.get_evc(evc_ids[0])
159
            )
160
        except RetryError as exc:
161
            exc_error = str(exc.last_attempt.exception())
162
            log.error(exc_error)
163
            raise HTTPException(503, detail=exc_error)
164
165 1
        if evc_ids:
166 1
            evcs = {evc_id: evcs.get(evc_id, {}) for evc_id in evc_ids}
167
        else:
168
            evcs = {k: v for k, v in evcs.items() if utils.has_int_enabled(v)}
169
            if not evcs:
170
                # There's no INT EVCs to get disabled.
171
                return JSONResponse(list(evcs.keys()))
172
173 1
        try:
174 1
            await self.int_manager.disable_int(evcs, force)
175
        except EVCNotFound as exc:
176
            raise HTTPException(404, detail=str(exc))
177
        except EVCHasNoINT as exc:
178
            raise HTTPException(409, detail=str(exc))
179
        except RetryError as exc:
180
            exc_error = str(exc.last_attempt.exception())
181
            log.error(exc_error)
182
            raise HTTPException(503, detail=exc_error)
183
        except UnrecoverableError as exc:
184
            exc_error = str(exc)
185
            log.error(exc_error)
186
            raise HTTPException(500, detail=exc_error)
187
188 1
        return JSONResponse(list(evcs.keys()))
189
190 1
    @rest("v1/evc")
191 1
    async def get_evcs(self, _request: Request) -> JSONResponse:
192
        """REST to return the list of EVCs with INT enabled"""
193 1
        try:
194 1
            evcs = await api.get_evcs(**{"metadata.telemetry.enabled": "true"})
195 1
            return JSONResponse(evcs)
196
        except RetryError as exc:
197
            exc_error = str(exc.last_attempt.exception())
198
            log.error(exc_error)
199
            raise HTTPException(503, detail=exc_error)
200
        except UnrecoverableError as exc:
201
            exc_error = str(exc)
202
            log.error(exc_error)
203
            raise HTTPException(500, detail=exc_error)
204
205 1
    @rest("v1/evc/redeploy", methods=["PATCH"])
206 1
    async def redeploy_telemetry(self, request: Request) -> JSONResponse:
207
        """REST to redeploy INT on EVCs.
208
209
        If a list of evc_ids is empty, it'll redeploy on all INT EVCs.
210
        """
211 1
        await avalidate_openapi_request(self.spec, request)
212
213 1
        try:
214 1
            content = await aget_json_or_400(request)
215 1
            evc_ids = content["evc_ids"]
216
        except (TypeError, KeyError):
217
            raise HTTPException(400, detail=f"Invalid payload: {content}")
218
219 1
        try:
220 1
            evcs = (
221
                await api.get_evcs()
222
                if len(evc_ids) != 1
223
                else await api.get_evc(evc_ids[0])
224
            )
225
        except RetryError as exc:
226
            exc_error = str(exc.last_attempt.exception())
227
            log.error(exc_error)
228
            raise HTTPException(503, detail=exc_error)
229
230 1
        if evc_ids:
231 1
            evcs = {evc_id: evcs.get(evc_id, {}) for evc_id in evc_ids}
232
        else:
233
            evcs = {k: v for k, v in evcs.items() if utils.has_int_enabled(v)}
234
            if not evcs:
235
                raise HTTPException(404, detail="There aren't INT EVCs to redeploy")
236
237 1
        try:
238 1
            await self.int_manager.redeploy_int(evcs)
239 1
        except (EVCNotFound, FlowsNotFound, ProxyPortNotFound) as exc:
240
            raise HTTPException(404, detail=str(exc))
241 1
        except (EVCHasNoINT, ProxyPortSameSourceIntraEVC, ProxyPortShared) as exc:
242 1
            raise HTTPException(409, detail=str(exc))
243
        except RetryError as exc:
244
            exc_error = str(exc.last_attempt.exception())
245
            log.error(exc_error)
246
            raise HTTPException(503, detail=exc_error)
247
        except UnrecoverableError as exc:
248
            exc_error = str(exc)
249
            log.error(exc_error)
250
            raise HTTPException(500, detail=exc_error)
251
252 1
        return JSONResponse(list(evcs.keys()), status_code=201)
253
254 1
    @rest("v1/evc/compare")
255 1
    async def evc_compare(self, _request: Request) -> JSONResponse:
256
        """List and compare which INT EVCs have flows installed comparing with
257
        mef_eline flows and telemetry metadata. You should use this endpoint
258
        to confirm if both the telemetry metadata is still coherent and also
259
        the minimum expected number of flows. A list of EVCs will get returned
260
        with the inconsistent INT EVCs. If you encounter any inconsistent
261
        EVC you need to analyze the situation and then decide if you'd
262
        like to force enable or disable INT.
263
        """
264
265 1
        try:
266 1
            int_flows, mef_flows, evcs = await asyncio.gather(
267
                api.get_stored_flows(
268
                    [
269
                        (
270
                            settings.INT_COOKIE_PREFIX << 56,
271
                            settings.INT_COOKIE_PREFIX << 56 | 0xFFFFFFFFFFFFFF,
272
                        ),
273
                    ]
274
                ),
275
                api.get_stored_flows(
276
                    [
277
                        (
278
                            settings.MEF_COOKIE_PREFIX << 56,
279
                            settings.MEF_COOKIE_PREFIX << 56 | 0xFFFFFFFFFFFFFF,
280
                        ),
281
                    ]
282
                ),
283
                api.get_evcs(),
284
            )
285
        except RetryError as exc:
286
            exc_error = str(exc.last_attempt.exception())
287
            log.error(exc_error)
288
            raise HTTPException(503, detail=exc_error)
289
        except UnrecoverableError as exc:
290
            exc_error = str(exc)
291
            log.error(exc_error)
292
            raise HTTPException(500, detail=exc_error)
293
294 1
        response = [
295
            {"id": k, "name": evcs[k]["name"], "compare_reason": v}
296
            for k, v in self.int_manager.evc_compare(int_flows, mef_flows, evcs).items()
297
        ]
298 1
        return JSONResponse(response)
299
300 1
    @rest("v1/uni/{interface_id}/proxy_port", methods=["DELETE"])
301 1
    async def delete_proxy_port_metadata(self, request: Request) -> JSONResponse:
302
        """Delete proxy port metadata."""
303 1
        intf_id = request.path_params["interface_id"]
304 1
        if (
305
            intf := self.controller.get_interface_by_id(intf_id)
306
        ) and "proxy_port" not in intf.metadata:
307 1
            return JSONResponse("Operation successful")
308
309 1
        qparams = request.query_params
310 1
        force = qparams.get("force", "false").lower() == "true"
311 1
        try:
312 1
            pp = self.int_manager.get_proxy_port_or_raise(intf_id, "no_evc_id")
313 1
            if pp.evc_ids and not force:
314 1
                return JSONResponse(
315
                    {
316
                        "status_code": 409,
317
                        "code": 409,
318
                        "description": f"{pp} is in use on {len(pp.evc_ids)} EVCs",
319
                        "evc_ids": sorted(pp.evc_ids),
320
                    },
321
                    status_code=409,
322
                )
323
        except ProxyPortError as exc:
324
            raise HTTPException(404, detail=exc.message)
325
326 1
        try:
327 1
            await api.delete_proxy_port_metadata(intf_id)
328 1
            return JSONResponse("Operation successful")
329
        except ValueError as exc:
330
            raise HTTPException(404, detail=str(exc))
331
        except UnrecoverableError as exc:
332
            raise HTTPException(500, detail=str(exc))
333
334 1
    @rest("v1/uni/{interface_id}/proxy_port/{port_number:int}", methods=["POST"])
335 1
    async def add_proxy_port_metadata(self, request: Request) -> JSONResponse:
336
        """Add proxy port metadata."""
337 1
        intf_id = request.path_params["interface_id"]
338 1
        port_no = request.path_params["port_number"]
339 1
        qparams = request.query_params
340 1
        if not (intf := self.controller.get_interface_by_id(intf_id)):
341
            raise HTTPException(404, detail=f"Interface id {intf_id} not found")
342 1
        if "proxy_port" in intf.metadata and intf.metadata["proxy_port"] == port_no:
343 1
            return JSONResponse("Operation successful")
344
345 1
        force = qparams.get("force", "false").lower() == "true"
346 1
        try:
347 1
            pp = self.int_manager.get_proxy_port_or_raise(intf_id, "no_evc_id", port_no)
348 1
            if pp.status != EntityStatus.UP and not force:
349 1
                raise HTTPException(409, detail=f"{pp} status isn't UP")
350 1
            self.int_manager._validate_new_dedicated_proxy_port(intf, port_no)
351 1
        except ProxyPortShared as exc:
352 1
            raise HTTPException(409, detail=exc.message)
353 1
        except ProxyPortError as exc:
354
            raise HTTPException(404, detail=exc.message)
355
356 1
        try:
357 1
            await api.add_proxy_port_metadata(intf_id, port_no)
358 1
            return JSONResponse("Operation successful")
359
        except ValueError as exc:
360
            raise HTTPException(404, detail=str(exc))
361
        except UnrecoverableError as exc:
362
            raise HTTPException(500, detail=str(exc))
363
364 1
    @rest("v1/uni/proxy_port")
365 1
    async def list_uni_proxy_ports(self, _request: Request) -> JSONResponse:
366
        """List configured UNI proxy ports."""
367 1
        interfaces_proxy_ports = []
368 1
        for switch in self.controller.switches.copy().values():
369 1
            for intf in switch.interfaces.copy().values():
370 1
                if "proxy_port" in intf.metadata:
371 1
                    payload = {
372
                        "uni": {
373
                            "id": intf.id,
374
                            "status": intf.status.value,
375
                            "status_reason": sorted(intf.status_reason),
376
                        },
377
                        "proxy_port": {
378
                            "port_number": intf.metadata["proxy_port"],
379
                            "status": "DOWN",
380
                            "status_reason": [],
381
                        },
382
                    }
383 1
                    try:
384 1
                        pp = self.int_manager.get_proxy_port_or_raise(
385
                            intf.id, "no_evc_id"
386
                        )
387 1
                        payload["proxy_port"]["status"] = pp.status.value
388 1
                    except ProxyPortError as exc:
389 1
                        payload["proxy_port"]["status_reason"] = [exc.message]
390 1
                    interfaces_proxy_ports.append(payload)
391 1
        return JSONResponse(interfaces_proxy_ports)
392
393 1
    @alisten_to("kytos/mef_eline.evcs_loaded")
394 1
    async def on_mef_eline_evcs_loaded(self, event: KytosEvent) -> None:
395
        """Handle kytos/mef_eline.evcs_loaded."""
396 1
        self.int_manager.load_uni_src_proxy_ports(event.content)
397
398 1
    @alisten_to("kytos/of_multi_table.enable_table")
399 1
    async def on_table_enabled(self, event):
400
        """Handle of_multi_table.enable_table."""
401 1
        table_group = event.content.get("telemetry_int", {})
402 1
        if not table_group:
403 1
            return
404 1
        for group in table_group:
405 1
            if group not in settings.TABLE_GROUP_ALLOWED:
406 1
                log.error(
407
                    f'The table group "{group}" is not allowed for '
408
                    f"telemetry_int. Allowed table groups are "
409
                    f"{settings.TABLE_GROUP_ALLOWED}"
410
                )
411 1
                return
412 1
        self.int_manager.flow_builder.table_group.update(table_group)
413 1
        content = {"group_table": self.int_manager.flow_builder.table_group}
414 1
        event_out = KytosEvent(name="kytos/telemetry_int.enable_table", content=content)
415 1
        await self.controller.buffers.app.aput(event_out)
416
417 1
    @alisten_to("kytos/mef_eline.deleted")
418 1
    async def on_evc_deleted(self, event: KytosEvent) -> None:
419
        """On EVC deleted."""
420 1
        content = event.content
421 1
        if (
422
            "metadata" in content
423
            and "telemetry" in content["metadata"]
424
            and content["metadata"]["telemetry"]["enabled"]
425
        ):
426 1
            evc_id = content["id"]
427 1
            log.info(f"Handling mef_eline.deleted on EVC id: {evc_id}")
428 1
            await self.int_manager.disable_int({evc_id: content}, force=True)
429
430 1
    @alisten_to("kytos/mef_eline.deployed")
431 1
    async def on_evc_deployed(self, event: KytosEvent) -> None:
432
        """On EVC deployed."""
433 1
        content = event.content
434 1
        evc_id = content["id"]
435 1
        evcs = {evc_id: content}
436 1
        try:
437 1
            if (
438
                "metadata" in content
439
                and "telemetry" in content["metadata"]
440
                and content["metadata"]["telemetry"]["enabled"]
441
            ):
442 1
                log.info(f"Handling mef_eline.deployed on EVC id: {evc_id}")
443 1
                await self.int_manager.redeploy_int(evcs)
444 1
            elif (
445
                "metadata" in content
446
                and "telemetry_request" in content["metadata"]
447
                and "telemetry" not in content["metadata"]
448
            ):
449 1
                log.info(f"Handling mef_eline.deployed on EVC id: {evc_id}")
450 1
                await self.int_manager.enable_int(evcs, force=True)
451 1
        except EVCError as exc:
452 1
            log.error(
453
                f"Failed when handling mef_eline.deployed: {exc}. Analyze the error "
454
                f"and you'll need to enable or redeploy EVC {evc_id} later"
455
            )
456
457 1
    @alisten_to("kytos/mef_eline.undeployed")
458 1
    async def on_evc_undeployed(self, event: KytosEvent) -> None:
459
        """On EVC undeployed."""
460 1
        content = event.content
461 1
        if (
462
            not content["enabled"]
463
            and "metadata" in content
464
            and "telemetry" in content["metadata"]
465
            and content["metadata"]["telemetry"]["enabled"]
466
        ):
467 1
            metadata = {
468
                "telemetry": {
469
                    "enabled": True,
470
                    "status": "DOWN",
471
                    "status_reason": ["undeployed"],
472
                    "status_updated_at": datetime.utcnow().strftime(
473
                        "%Y-%m-%dT%H:%M:%S"
474
                    ),
475
                }
476
            }
477 1
            evc_id = content["id"]
478 1
            evcs = {evc_id: content}
479 1
            log.info(f"Handling mef_eline.undeployed on EVC id: {evc_id}")
480 1
            await self.int_manager.remove_int_flows(evcs, metadata, force=True)
481
482 1
    @alisten_to("kytos/mef_eline.(redeployed_link_down|redeployed_link_up)")
483 1
    async def on_evc_redeployed_link(self, event: KytosEvent) -> None:
484
        """On EVC redeployed_link_down|redeployed_link_up."""
485 1
        content = event.content
486 1
        if (
487
            content["enabled"]
488
            and "metadata" in content
489
            and "telemetry" in content["metadata"]
490
            and content["metadata"]["telemetry"]["enabled"]
491
        ):
492 1
            evc_id = content["id"]
493 1
            evcs = {evc_id: content}
494 1
            log.info(f"Handling {event.name}, EVC id: {evc_id}")
495 1
            try:
496 1
                await self.int_manager.redeploy_int(evcs)
497 1
            except EVCError as exc:
498 1
                log.error(
499
                    f"Failed to redeploy: {exc}. "
500
                    f"Analyze the error and you'll need to redeploy EVC {evc_id} later"
501
                )
502
503 1
    @alisten_to("kytos/mef_eline.error_redeploy_link_down")
504 1
    async def on_evc_error_redeployed_link_down(self, event: KytosEvent) -> None:
505
        """On EVC error_redeploy_link_down, this is supposed to happen when
506
        a path isn't when mef_eline handles a link down."""
507 1
        content = event.content
508 1
        if (
509
            content["enabled"]
510
            and "metadata" in content
511
            and "telemetry" in content["metadata"]
512
            and content["metadata"]["telemetry"]["enabled"]
513
        ):
514 1
            metadata = {
515
                "telemetry": {
516
                    "enabled": True,
517
                    "status": "DOWN",
518
                    "status_reason": ["redeployed_link_down_no_path"],
519
                    "status_updated_at": datetime.utcnow().strftime(
520
                        "%Y-%m-%dT%H:%M:%S"
521
                    ),
522
                }
523
            }
524 1
            evc_id = content["id"]
525 1
            evcs = {evc_id: content}
526 1
            log.info(
527
                f"Handling mef_eline.redeployed_link_down_no_path on EVC id: {evc_id}"
528
            )
529 1
            await self.int_manager.remove_int_flows(evcs, metadata, force=True)
530
531 1
    @alisten_to("kytos/mef_eline.failover_link_down")
532 1
    async def on_failover_link_down(self, event: KytosEvent):
533
        """Handle kytos/mef_eline.failover_link_down."""
534 1
        await self.int_manager.handle_failover_flows(
535
            event.content, event_name="failover_link_down"
536
        )
537
538 1
    @alisten_to("kytos/mef_eline.failover_old_path")
539 1
    async def on_failover_old_path(self, event: KytosEvent):
540
        """Handle kytos/mef_eline.failover_old_path."""
541 1
        await self.int_manager.handle_failover_flows(
542
            event.content, event_name="failover_old_path"
543
        )
544
545 1
    @alisten_to("kytos/mef_eline.failover_deployed")
546 1
    async def on_failover_deployed(self, event: KytosEvent):
547
        """Handle kytos/mef_eline.failover_deployed."""
548 1
        await self.int_manager.handle_failover_flows(
549
            event.content, event_name="failover_deployed"
550
        )
551
552 1
    @alisten_to("kytos/topology.link_down")
553 1
    async def on_link_down(self, event):
554
        """Handle topology.link_down."""
555 1
        await self.int_manager.handle_pp_link_down(event.content["link"])
556
557 1
    @alisten_to("kytos/topology.link_up")
558 1
    async def on_link_up(self, event):
559
        """Handle topology.link_up."""
560 1
        await self.int_manager.handle_pp_link_up(event.content["link"])
561
562 1
    @alisten_to("kytos/mef_eline.uni_active_updated")
563 1
    async def on_uni_active_updated(self, event: KytosEvent) -> None:
564
        """On mef_eline UNI active updated."""
565 1
        content = event.content
566 1
        if (
567
            "metadata" in content
568
            and "telemetry" in content["metadata"]
569
            and content["metadata"]["telemetry"]["enabled"]
570
        ):
571 1
            evc_id, active = content["id"], content["active"]
572 1
            log.info(
573
                f"Handling mef_eline.uni_active_updated active {active} "
574
                f"on EVC id: {evc_id}"
575
            )
576
577 1
            metadata = {
578
                "telemetry": {
579
                    "enabled": True,
580
                    "status": "UP" if active else "DOWN",
581
                    "status_reason": [] if active else ["uni_down"],
582
                    "status_updated_at": datetime.utcnow().strftime(
583
                        "%Y-%m-%dT%H:%M:%S"
584
                    ),
585
                }
586
            }
587 1
            await api.add_evcs_metadata({evc_id: content}, metadata)
588
589 1
    @alisten_to("kytos/flow_manager.flow.error")
590 1
    async def on_flow_mod_error(self, event: KytosEvent):
591
        """On flow mod errors.
592
593
        Only OFPT_ERRORs will be handled, telemetry_int already uses force: true
594
        """
595 1
        flow = event.content["flow"]
596 1
        if any(
597
            (
598
                event.content.get("error_exception"),
599
                event.content.get("error_command") != "add",
600
                flow.cookie >> 56 != settings.INT_COOKIE_PREFIX,
601
            )
602
        ):
603
            return
604
605 1
        async with self._ofpt_error_lock:
606 1
            evc_id = utils.get_id_from_cookie(flow.cookie)
607 1
            evc = await api.get_evc(evc_id, exclude_archived=False)
608 1
            if (
609
                not evc
610
                or "telemetry" not in evc[evc_id]["metadata"]
611
                or "enabled" not in evc[evc_id]["metadata"]["telemetry"]
612
                or not evc[evc_id]["metadata"]["telemetry"]["enabled"]
613
            ):
614
                return
615
616 1
            metadata = {
617
                "telemetry": {
618
                    "enabled": False,
619
                    "status": "DOWN",
620
                    "status_reason": ["ofpt_error"],
621
                    "status_updated_at": datetime.utcnow().strftime(
622
                        "%Y-%m-%dT%H:%M:%S"
623
                    ),
624
                }
625
            }
626 1
            log.error(
627
                f"Disabling EVC({evc_id}) due to OFPT_ERROR, "
628
                f"error_type: {event.content.get('error_type')}, "
629
                f"error_code: {event.content.get('error_code')}, "
630
                f"flow: {flow.as_dict()} "
631
            )
632
633 1
            evcs = {evc_id: {evc_id: evc_id}}
634 1
            await self.int_manager.remove_int_flows(evcs, metadata, force=True)
635
636 1
    @alisten_to("kytos/topology.interfaces.metadata.removed")
637 1
    async def on_intf_metadata_removed(self, event: KytosEvent) -> None:
638
        """On interface metadata removed."""
639 1
        await self.int_manager.handle_pp_metadata_removed(event.content["interface"])
640
641 1
    @alisten_to("kytos/topology.interfaces.metadata.added")
642 1
    async def on_intf_metadata_added(self, event: KytosEvent) -> None:
643
        """On interface metadata added."""
644
        await self.int_manager.handle_pp_metadata_added(event.content["interface"])
645