Passed
Push — master ( 4bf6f4...81fe6d )
by Vinicius
02:40 queued 17s
created

build.main.Main.on_failover_old_path()   A

Complexity

Conditions 1

Size

Total Lines 5
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 3
CRAP Score 1

Importance

Changes 0
Metric Value
cc 1
eloc 4
nop 2
dl 0
loc 5
ccs 3
cts 3
cp 1
crap 1
rs 10
c 0
b 0
f 0
1
"""Main module of kytos/telemetry Network Application.
2
3
Napp to deploy In-band Network Telemetry over Ethernet Virtual Circuits
4
5
"""
6
7 1
import asyncio
8 1
import pathlib
9 1
from datetime import datetime
10
11 1
import napps.kytos.telemetry_int.kytos_api_helper as api
12 1
from napps.kytos.telemetry_int import settings, utils
13 1
from tenacity import RetryError
14
15 1
from kytos.core import KytosEvent, KytosNApp, log, rest
16 1
from kytos.core.helpers import alisten_to, avalidate_openapi_request, load_spec
17 1
from kytos.core.rest_api import HTTPException, JSONResponse, Request, aget_json_or_400
18
19 1
from .exceptions import (
20
    EVCError,
21
    EVCHasINT,
22
    EVCHasNoINT,
23
    EVCNotFound,
24
    FlowsNotFound,
25
    ProxyPortNotFound,
26
    ProxyPortSameSourceIntraEVC,
27
    ProxyPortStatusNotUP,
28
    UnrecoverableError,
29
)
30 1
from .managers.int import INTManager
31
32
# pylint: disable=fixme
33
34
35 1
class Main(KytosNApp):
36
    """Main class of kytos/telemetry NApp.
37
38
    This class is the entry point for this NApp.
39
    """
40
41 1
    spec = load_spec(pathlib.Path(__file__).parent / "openapi.yml")
42
43 1
    def setup(self):
44
        """Replace the '__init__' method for the KytosNApp subclass.
45
46
        The setup method is automatically called by the controller when your
47
        application is loaded.
48
49
        So, if you have any setup routine, insert it here.
50
        """
51
52 1
        self.int_manager = INTManager(self.controller)
53 1
        self._ofpt_error_lock = asyncio.Lock()
54
55 1
    def execute(self):
56
        """Run after the setup method execution.
57
58
        You can also use this method in loop mode if you add to the above setup
59
        method a line like the following example:
60
61
            self.execute_as_loop(30)  # 30-second interval.
62
        """
63
64 1
    def shutdown(self):
65
        """Run when your NApp is unloaded.
66
67
        If you have some cleanup procedure, insert it here.
68
        """
69
70 1
    @rest("v1/evc/enable", methods=["POST"])
71 1
    async def enable_telemetry(self, request: Request) -> JSONResponse:
72
        """REST to enable INT flows on EVCs.
73
74
        If a list of evc_ids is empty, it'll enable on non-INT EVCs.
75
        """
76 1
        await avalidate_openapi_request(self.spec, request)
77
78 1
        try:
79 1
            content = await aget_json_or_400(request)
80 1
            evc_ids = content["evc_ids"]
81 1
            force = content.get("force", False)
82 1
            if not isinstance(force, bool):
83
                raise TypeError(f"'force' wrong type: {type(force)} expected bool")
84
        except (TypeError, KeyError):
85
            raise HTTPException(400, detail=f"Invalid payload: {content}")
86
87 1
        try:
88 1
            evcs = (
89
                await api.get_evcs()
90
                if len(evc_ids) != 1
91
                else await api.get_evc(evc_ids[0])
92
            )
93
        except RetryError as exc:
94
            exc_error = str(exc.last_attempt.exception())
95
            log.error(exc_error)
96
            raise HTTPException(503, detail=exc_error)
97
98 1
        if evc_ids:
99 1
            evcs = {evc_id: evcs.get(evc_id, {}) for evc_id in evc_ids}
100
        else:
101
            evcs = {k: v for k, v in evcs.items() if not utils.has_int_enabled(v)}
102
            if not evcs:
103
                # There's no non-INT EVCs to get enabled.
104
                return JSONResponse(list(evcs.keys()))
105
106 1
        try:
107
            # First, it tries to get and remove the existing INT flows like mef_eline
108 1
            stored_flows = await api.get_stored_flows(
109
                [
110
                    utils.get_cookie(evc_id, settings.INT_COOKIE_PREFIX)
111
                    for evc_id in evcs
112
                ]
113
            )
114 1
            await self.int_manager._remove_int_flows_by_cookies(stored_flows)
115 1
            await self.int_manager.enable_int(evcs, force)
116
        except (EVCNotFound, FlowsNotFound, ProxyPortNotFound) as exc:
117
            raise HTTPException(404, detail=str(exc))
118
        except (EVCHasINT, ProxyPortStatusNotUP, ProxyPortSameSourceIntraEVC) as exc:
119
            raise HTTPException(409, detail=str(exc))
120
        except RetryError as exc:
121
            exc_error = str(exc.last_attempt.exception())
122
            log.error(exc_error)
123
            raise HTTPException(503, detail=exc_error)
124
        except UnrecoverableError as exc:
125
            exc_error = str(exc)
126
            log.error(exc_error)
127
            raise HTTPException(500, detail=exc_error)
128
129 1
        return JSONResponse(list(evcs.keys()), status_code=201)
130
131 1
    @rest("v1/evc/disable", methods=["POST"])
132 1
    async def disable_telemetry(self, request: Request) -> JSONResponse:
133
        """REST to disable/remove INT flows for an EVC_ID
134
135
        If a list of evc_ids is empty, it'll disable on all INT EVCs.
136
        """
137 1
        await avalidate_openapi_request(self.spec, request)
138
139 1
        try:
140 1
            content = await aget_json_or_400(request)
141 1
            evc_ids = content["evc_ids"]
142 1
            force = content.get("force", False)
143 1
            if not isinstance(force, bool):
144
                raise TypeError(f"'force' wrong type: {type(force)} expected bool")
145
        except (TypeError, KeyError):
146
            raise HTTPException(400, detail=f"Invalid payload: {content}")
147
148 1
        try:
149 1
            evcs = (
150
                await api.get_evcs()
151
                if len(evc_ids) != 1
152
                else await api.get_evc(evc_ids[0])
153
            )
154
        except RetryError as exc:
155
            exc_error = str(exc.last_attempt.exception())
156
            log.error(exc_error)
157
            raise HTTPException(503, detail=exc_error)
158
159 1
        if evc_ids:
160 1
            evcs = {evc_id: evcs.get(evc_id, {}) for evc_id in evc_ids}
161
        else:
162
            evcs = {k: v for k, v in evcs.items() if utils.has_int_enabled(v)}
163
            if not evcs:
164
                # There's no INT EVCs to get disabled.
165
                return JSONResponse(list(evcs.keys()))
166
167 1
        try:
168 1
            await self.int_manager.disable_int(evcs, force)
169
        except EVCNotFound as exc:
170
            raise HTTPException(404, detail=str(exc))
171
        except EVCHasNoINT as exc:
172
            raise HTTPException(409, detail=str(exc))
173
        except RetryError as exc:
174
            exc_error = str(exc.last_attempt.exception())
175
            log.error(exc_error)
176
            raise HTTPException(503, detail=exc_error)
177
        except UnrecoverableError as exc:
178
            exc_error = str(exc)
179
            log.error(exc_error)
180
            raise HTTPException(500, detail=exc_error)
181
182 1
        return JSONResponse(list(evcs.keys()))
183
184 1
    @rest("v1/evc")
185 1
    async def get_evcs(self, _request: Request) -> JSONResponse:
186
        """REST to return the list of EVCs with INT enabled"""
187 1
        try:
188 1
            evcs = await api.get_evcs(**{"metadata.telemetry.enabled": "true"})
189 1
            return JSONResponse(evcs)
190
        except RetryError as exc:
191
            exc_error = str(exc.last_attempt.exception())
192
            log.error(exc_error)
193
            raise HTTPException(503, detail=exc_error)
194
        except UnrecoverableError as exc:
195
            exc_error = str(exc)
196
            log.error(exc_error)
197
            raise HTTPException(500, detail=exc_error)
198
199 1
    @rest("v1/evc/redeploy", methods=["PATCH"])
200 1
    async def redeploy_telemetry(self, request: Request) -> JSONResponse:
201
        """REST to redeploy INT on EVCs.
202
203
        If a list of evc_ids is empty, it'll redeploy on all INT EVCs.
204
        """
205 1
        await avalidate_openapi_request(self.spec, request)
206
207 1
        try:
208 1
            content = await aget_json_or_400(request)
209 1
            evc_ids = content["evc_ids"]
210
        except (TypeError, KeyError):
211
            raise HTTPException(400, detail=f"Invalid payload: {content}")
212
213 1
        try:
214 1
            evcs = (
215
                await api.get_evcs()
216
                if len(evc_ids) != 1
217
                else await api.get_evc(evc_ids[0])
218
            )
219
        except RetryError as exc:
220
            exc_error = str(exc.last_attempt.exception())
221
            log.error(exc_error)
222
            raise HTTPException(503, detail=exc_error)
223
224 1
        if evc_ids:
225 1
            evcs = {evc_id: evcs.get(evc_id, {}) for evc_id in evc_ids}
226
        else:
227
            evcs = {k: v for k, v in evcs.items() if utils.has_int_enabled(v)}
228
            if not evcs:
229
                raise HTTPException(404, detail="There aren't INT EVCs to redeploy")
230
231 1
        try:
232 1
            await self.int_manager.redeploy_int(evcs)
233 1
        except (EVCNotFound, FlowsNotFound, ProxyPortNotFound) as exc:
234
            raise HTTPException(404, detail=str(exc))
235 1
        except (EVCHasNoINT, ProxyPortSameSourceIntraEVC) as exc:
236 1
            raise HTTPException(409, detail=str(exc))
237
        except RetryError as exc:
238
            exc_error = str(exc.last_attempt.exception())
239
            log.error(exc_error)
240
            raise HTTPException(503, detail=exc_error)
241
        except UnrecoverableError as exc:
242
            exc_error = str(exc)
243
            log.error(exc_error)
244
            raise HTTPException(500, detail=exc_error)
245
246 1
        return JSONResponse(list(evcs.keys()), status_code=201)
247
248 1
    @rest("v1/evc/compare")
249 1
    async def evc_compare(self, _request: Request) -> JSONResponse:
250
        """List and compare which INT EVCs have flows installed comparing with
251
        mef_eline flows and telemetry metadata. You should use this endpoint
252
        to confirm if both the telemetry metadata is still coherent and also
253
        the minimum expected number of flows. A list of EVCs will get returned
254
        with the inconsistent INT EVCs. If you encounter any inconsistent
255
        EVC you need to analyze the situation and then decide if you'd
256
        like to force enable or disable INT.
257
        """
258
259 1
        try:
260 1
            int_flows, mef_flows, evcs = await asyncio.gather(
261
                api.get_stored_flows(
262
                    [
263
                        (
264
                            settings.INT_COOKIE_PREFIX << 56,
265
                            settings.INT_COOKIE_PREFIX << 56 | 0xFFFFFFFFFFFFFF,
266
                        ),
267
                    ]
268
                ),
269
                api.get_stored_flows(
270
                    [
271
                        (
272
                            settings.MEF_COOKIE_PREFIX << 56,
273
                            settings.MEF_COOKIE_PREFIX << 56 | 0xFFFFFFFFFFFFFF,
274
                        ),
275
                    ]
276
                ),
277
                api.get_evcs(),
278
            )
279
        except RetryError as exc:
280
            exc_error = str(exc.last_attempt.exception())
281
            log.error(exc_error)
282
            raise HTTPException(503, detail=exc_error)
283
        except UnrecoverableError as exc:
284
            exc_error = str(exc)
285
            log.error(exc_error)
286
            raise HTTPException(500, detail=exc_error)
287
288 1
        response = [
289
            {"id": k, "name": evcs[k]["name"], "compare_reason": v}
290
            for k, v in self.int_manager.evc_compare(int_flows, mef_flows, evcs).items()
291
        ]
292 1
        return JSONResponse(response)
293
294 1
    @alisten_to("kytos/mef_eline.evcs_loaded")
295 1
    async def on_mef_eline_evcs_loaded(self, event: KytosEvent) -> None:
296
        """Handle kytos/mef_eline.evcs_loaded."""
297 1
        self.int_manager.load_uni_src_proxy_ports(event.content)
298
299 1
    @alisten_to("kytos/of_multi_table.enable_table")
300 1
    async def on_table_enabled(self, event):
301
        """Handle of_multi_table.enable_table."""
302 1
        table_group = event.content.get("telemetry_int", {})
303 1
        if not table_group:
304 1
            return
305 1
        for group in table_group:
306 1
            if group not in settings.TABLE_GROUP_ALLOWED:
307 1
                log.error(
308
                    f'The table group "{group}" is not allowed for '
309
                    f"telemetry_int. Allowed table groups are "
310
                    f"{settings.TABLE_GROUP_ALLOWED}"
311
                )
312 1
                return
313 1
        self.int_manager.flow_builder.table_group.update(table_group)
314 1
        content = {"group_table": self.int_manager.flow_builder.table_group}
315 1
        event_out = KytosEvent(name="kytos/telemetry_int.enable_table", content=content)
316 1
        await self.controller.buffers.app.aput(event_out)
317
318 1
    @alisten_to("kytos/mef_eline.deleted")
319 1
    async def on_evc_deleted(self, event: KytosEvent) -> None:
320
        """On EVC deleted."""
321 1
        content = event.content
322 1
        if (
323
            "metadata" in content
324
            and "telemetry" in content["metadata"]
325
            and content["metadata"]["telemetry"]["enabled"]
326
        ):
327 1
            evc_id = content["id"]
328 1
            log.info(f"Handling mef_eline.deleted on EVC id: {evc_id}")
329 1
            await self.int_manager.disable_int({evc_id: content}, force=True)
330
331 1
    @alisten_to("kytos/mef_eline.deployed")
332 1
    async def on_evc_deployed(self, event: KytosEvent) -> None:
333
        """On EVC deployed."""
334 1
        content = event.content
335 1
        evc_id = content["id"]
336 1
        evcs = {evc_id: content}
337 1
        try:
338 1
            if (
339
                "metadata" in content
340
                and "telemetry" in content["metadata"]
341
                and content["metadata"]["telemetry"]["enabled"]
342
            ):
343 1
                log.info(f"Handling mef_eline.deployed on EVC id: {evc_id}")
344 1
                await self.int_manager.redeploy_int(evcs)
345 1
            elif (
346
                "metadata" in content
347
                and "telemetry_request" in content["metadata"]
348
                and "telemetry" not in content["metadata"]
349
            ):
350 1
                log.info(f"Handling mef_eline.deployed on EVC id: {evc_id}")
351 1
                await self.int_manager.enable_int(evcs, force=True)
352
        except EVCError as exc:
353
            log.error(
354
                f"Failed when handling mef_eline.deployed: {exc}. Analyze the error "
355
                f"and you'll need to enable or redeploy EVC {evc_id} later"
356
            )
357
358 1 View Code Duplication
    @alisten_to("kytos/mef_eline.undeployed")
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
359 1
    async def on_evc_undeployed(self, event: KytosEvent) -> None:
360
        """On EVC undeployed."""
361 1
        content = event.content
362 1
        if (
363
            not content["enabled"]
364
            and "metadata" in content
365
            and "telemetry" in content["metadata"]
366
            and content["metadata"]["telemetry"]["enabled"]
367
        ):
368 1
            metadata = {
369
                "telemetry": {
370
                    "enabled": True,
371
                    "status": "DOWN",
372
                    "status_reason": ["undeployed"],
373
                    "status_updated_at": datetime.utcnow().strftime(
374
                        "%Y-%m-%dT%H:%M:%S"
375
                    ),
376
                }
377
            }
378 1
            evc_id = content["id"]
379 1
            evcs = {evc_id: content}
380 1
            log.info(f"Handling mef_eline.undeployed on EVC id: {evc_id}")
381 1
            await self.int_manager.remove_int_flows(evcs, metadata, force=True)
382
383 1
    @alisten_to("kytos/mef_eline.(redeployed_link_down|redeployed_link_up)")
384 1
    async def on_evc_redeployed_link(self, event: KytosEvent) -> None:
385
        """On EVC redeployed_link_down|redeployed_link_up."""
386 1
        content = event.content
387 1
        if (
388
            content["enabled"]
389
            and "metadata" in content
390
            and "telemetry" in content["metadata"]
391
            and content["metadata"]["telemetry"]["enabled"]
392
        ):
393 1
            evc_id = content["id"]
394 1
            evcs = {evc_id: content}
395 1
            log.info(f"Handling {event.name}, EVC id: {evc_id}")
396 1
            try:
397 1
                await self.int_manager.redeploy_int(evcs)
398
            except EVCError as exc:
399
                log.error(
400
                    f"Failed to redeploy: {exc}. "
401
                    f"Analyze the error and you'll need to redeploy EVC {evc_id} later"
402
                )
403
404 1 View Code Duplication
    @alisten_to("kytos/mef_eline.error_redeploy_link_down")
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
405 1
    async def on_evc_error_redeployed_link_down(self, event: KytosEvent) -> None:
406
        """On EVC error_redeploy_link_down, this is supposed to happen when
407
        a path isn't when mef_eline handles a link down."""
408 1
        content = event.content
409 1
        if (
410
            content["enabled"]
411
            and "metadata" in content
412
            and "telemetry" in content["metadata"]
413
            and content["metadata"]["telemetry"]["enabled"]
414
        ):
415 1
            metadata = {
416
                "telemetry": {
417
                    "enabled": True,
418
                    "status": "DOWN",
419
                    "status_reason": ["redeployed_link_down_no_path"],
420
                    "status_updated_at": datetime.utcnow().strftime(
421
                        "%Y-%m-%dT%H:%M:%S"
422
                    ),
423
                }
424
            }
425 1
            evc_id = content["id"]
426 1
            evcs = {evc_id: content}
427 1
            log.info(
428
                f"Handling mef_eline.redeployed_link_down_no_path on EVC id: {evc_id}"
429
            )
430 1
            await self.int_manager.remove_int_flows(evcs, metadata, force=True)
431
432 1
    @alisten_to("kytos/mef_eline.failover_link_down")
433 1
    async def on_failover_link_down(self, event: KytosEvent):
434
        """Handle kytos/mef_eline.failover_link_down."""
435 1
        await self.int_manager.handle_failover_flows(
436
            event.content, event_name="failover_link_down"
437
        )
438
439 1
    @alisten_to("kytos/mef_eline.failover_old_path")
440 1
    async def on_failover_old_path(self, event: KytosEvent):
441
        """Handle kytos/mef_eline.failover_old_path."""
442 1
        await self.int_manager.handle_failover_flows(
443
            event.content, event_name="failover_old_path"
444
        )
445
446 1
    @alisten_to("kytos/mef_eline.failover_deployed")
447 1
    async def on_failover_deployed(self, event: KytosEvent):
448
        """Handle kytos/mef_eline.failover_deployed."""
449 1
        await self.int_manager.handle_failover_flows(
450
            event.content, event_name="failover_deployed"
451
        )
452
453 1
    @alisten_to("kytos/topology.link_down")
454 1
    async def on_link_down(self, event):
455
        """Handle topology.link_down."""
456 1
        await self.int_manager.handle_pp_link_down(event.content["link"])
457
458 1
    @alisten_to("kytos/topology.link_up")
459 1
    async def on_link_up(self, event):
460
        """Handle topology.link_up."""
461 1
        await self.int_manager.handle_pp_link_up(event.content["link"])
462
463 1
    @alisten_to("kytos/mef_eline.uni_active_updated")
464 1
    async def on_uni_active_updated(self, event: KytosEvent) -> None:
465
        """On mef_eline UNI active updated."""
466 1
        content = event.content
467 1
        if (
468
            "metadata" in content
469
            and "telemetry" in content["metadata"]
470
            and content["metadata"]["telemetry"]["enabled"]
471
        ):
472 1
            evc_id, active = content["id"], content["active"]
473 1
            log.info(
474
                f"Handling mef_eline.uni_active_updated active {active} "
475
                f"on EVC id: {evc_id}"
476
            )
477
478 1
            metadata = {
479
                "telemetry": {
480
                    "enabled": True,
481
                    "status": "UP" if active else "DOWN",
482
                    "status_reason": [] if active else ["uni_down"],
483
                    "status_updated_at": datetime.utcnow().strftime(
484
                        "%Y-%m-%dT%H:%M:%S"
485
                    ),
486
                }
487
            }
488 1
            await api.add_evcs_metadata({evc_id: content}, metadata)
489
490 1
    @alisten_to("kytos/flow_manager.flow.error")
491 1
    async def on_flow_mod_error(self, event: KytosEvent):
492
        """On flow mod errors.
493
494
        Only OFPT_ERRORs will be handled, telemetry_int already uses force: true
495
        """
496 1
        flow = event.content["flow"]
497 1
        if any(
498
            (
499
                event.content.get("error_exception"),
500
                event.content.get("error_command") != "add",
501
                flow.cookie >> 56 != settings.INT_COOKIE_PREFIX,
502
            )
503
        ):
504
            return
505
506 1
        async with self._ofpt_error_lock:
507 1
            evc_id = utils.get_id_from_cookie(flow.cookie)
508 1
            evc = await api.get_evc(evc_id, exclude_archived=False)
509 1
            if (
510
                not evc
511
                or "telemetry" not in evc[evc_id]["metadata"]
512
                or "enabled" not in evc[evc_id]["metadata"]["telemetry"]
513
                or not evc[evc_id]["metadata"]["telemetry"]["enabled"]
514
            ):
515
                return
516
517 1
            metadata = {
518
                "telemetry": {
519
                    "enabled": False,
520
                    "status": "DOWN",
521
                    "status_reason": ["ofpt_error"],
522
                    "status_updated_at": datetime.utcnow().strftime(
523
                        "%Y-%m-%dT%H:%M:%S"
524
                    ),
525
                }
526
            }
527 1
            log.error(
528
                f"Disabling EVC({evc_id}) due to OFPT_ERROR, "
529
                f"error_type: {event.content.get('error_type')}, "
530
                f"error_code: {event.content.get('error_code')}, "
531
                f"flow: {flow.as_dict()} "
532
            )
533
534 1
            evcs = {evc_id: {evc_id: evc_id}}
535 1
            await self.int_manager.remove_int_flows(evcs, metadata, force=True)
536
537 1
    @alisten_to("kytos/topology.interfaces.metadata.removed")
538 1
    async def on_intf_metadata_removed(self, event: KytosEvent) -> None:
539
        """On interface metadata removed."""
540 1
        await self.int_manager.handle_pp_metadata_removed(event.content["interface"])
541
542 1
    @alisten_to("kytos/topology.interfaces.metadata.added")
543 1
    async def on_intf_metadata_added(self, event: KytosEvent) -> None:
544
        """On interface metadata added."""
545 1
        await self.int_manager.handle_pp_metadata_added(event.content["interface"])
546
547
    # Event-driven methods: future
548 1
    def listen_for_new_evcs(self):
549
        """Change newly created EVC to INT-enabled EVC based on the metadata field
550
        (future)"""
551
        pass
552
553 1
    def listen_for_evc_change(self):
554
        """Change newly created EVC to INT-enabled EVC based on the
555
        metadata field (future)"""
556
        pass
557
558 1
    def listen_for_path_changes(self):
559
        """Change EVC's new path to INT-enabled EVC based on the metadata field
560
        when there is a path change. (future)"""
561
        pass
562
563 1
    def listen_for_topology_changes(self):
564
        """If the topology changes, make sure it is not the loop ports.
565
        If so, update proxy ports"""
566
        pass
567