Passed
Push — master ( fb285f...124bde )
by Vinicius
06:32 queued 04:09
created

build.main.Main.evc_compare()   A

Complexity

Conditions 3

Size

Total Lines 45
Code Lines 27

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 6
CRAP Score 4.679

Importance

Changes 0
Metric Value
cc 3
eloc 27
nop 2
dl 0
loc 45
ccs 6
cts 14
cp 0.4286
crap 4.679
rs 9.232
c 0
b 0
f 0
1
"""Main module of kytos/telemetry Network Application.
2
3
Napp to deploy In-band Network Telemetry over Ethernet Virtual Circuits
4
5
"""
6
7 1
import asyncio
8 1
import pathlib
9 1
from datetime import datetime
10
11 1
import napps.kytos.telemetry_int.kytos_api_helper as api
12 1
from napps.kytos.telemetry_int import settings, utils
13 1
from tenacity import RetryError
14
15 1
from kytos.core import KytosEvent, KytosNApp, log, rest
16 1
from kytos.core.helpers import alisten_to, avalidate_openapi_request, load_spec
17 1
from kytos.core.rest_api import HTTPException, JSONResponse, Request, aget_json_or_400
18
19 1
from .exceptions import (
20
    EVCError,
21
    EVCHasINT,
22
    EVCHasNoINT,
23
    EVCNotFound,
24
    FlowsNotFound,
25
    ProxyPortNotFound,
26
    ProxyPortSameSourceIntraEVC,
27
    ProxyPortStatusNotUP,
28
    UnrecoverableError,
29
)
30 1
from .managers.int import INTManager
31
32
# pylint: disable=fixme
33
34
35 1
class Main(KytosNApp):
36
    """Main class of kytos/telemetry NApp.
37
38
    This class is the entry point for this NApp.
39
    """
40
41 1
    spec = load_spec(pathlib.Path(__file__).parent / "openapi.yml")
42
43 1
    def setup(self):
44
        """Replace the '__init__' method for the KytosNApp subclass.
45
46
        The setup method is automatically called by the controller when your
47
        application is loaded.
48
49
        So, if you have any setup routine, insert it here.
50
        """
51
52 1
        self.int_manager = INTManager(self.controller)
53 1
        self._ofpt_error_lock = asyncio.Lock()
54
55 1
    def execute(self):
56
        """Run after the setup method execution.
57
58
        You can also use this method in loop mode if you add to the above setup
59
        method a line like the following example:
60
61
            self.execute_as_loop(30)  # 30-second interval.
62
        """
63
64 1
    def shutdown(self):
65
        """Run when your NApp is unloaded.
66
67
        If you have some cleanup procedure, insert it here.
68
        """
69
70 1
    @rest("v1/evc/enable", methods=["POST"])
71 1
    async def enable_telemetry(self, request: Request) -> JSONResponse:
72
        """REST to enable INT flows on EVCs.
73
74
        If a list of evc_ids is empty, it'll enable on non-INT EVCs.
75
        """
76 1
        await avalidate_openapi_request(self.spec, request)
77
78 1
        try:
79 1
            content = await aget_json_or_400(request)
80 1
            evc_ids = content["evc_ids"]
81 1
            force = content.get("force", False)
82 1
            if not isinstance(force, bool):
83
                raise TypeError(f"'force' wrong type: {type(force)} expected bool")
84
        except (TypeError, KeyError):
85
            raise HTTPException(400, detail=f"Invalid payload: {content}")
86
87 1
        try:
88 1
            evcs = (
89
                await api.get_evcs()
90
                if len(evc_ids) != 1
91
                else await api.get_evc(evc_ids[0])
92
            )
93
        except RetryError as exc:
94
            exc_error = str(exc.last_attempt.exception())
95
            log.error(exc_error)
96
            raise HTTPException(503, detail=exc_error)
97
98 1
        if evc_ids:
99 1
            evcs = {evc_id: evcs.get(evc_id, {}) for evc_id in evc_ids}
100
        else:
101
            evcs = {k: v for k, v in evcs.items() if not utils.has_int_enabled(v)}
102
            if not evcs:
103
                # There's no non-INT EVCs to get enabled.
104
                return JSONResponse(list(evcs.keys()))
105
106 1
        try:
107
            # First, it tries to get and remove the existing INT flows like mef_eline
108 1
            stored_flows = await api.get_stored_flows(
109
                [
110
                    utils.get_cookie(evc_id, settings.INT_COOKIE_PREFIX)
111
                    for evc_id in evcs
112
                ]
113
            )
114 1
            await self.int_manager._remove_int_flows(stored_flows)
115 1
            await self.int_manager.enable_int(evcs, force)
116
        except (EVCNotFound, FlowsNotFound, ProxyPortNotFound) as exc:
117
            raise HTTPException(404, detail=str(exc))
118
        except (EVCHasINT, ProxyPortStatusNotUP, ProxyPortSameSourceIntraEVC) as exc:
119
            raise HTTPException(409, detail=str(exc))
120
        except RetryError as exc:
121
            exc_error = str(exc.last_attempt.exception())
122
            log.error(exc_error)
123
            raise HTTPException(503, detail=exc_error)
124
        except UnrecoverableError as exc:
125
            exc_error = str(exc)
126
            log.error(exc_error)
127
            raise HTTPException(500, detail=exc_error)
128
129 1
        return JSONResponse(list(evcs.keys()), status_code=201)
130
131 1
    @rest("v1/evc/disable", methods=["POST"])
132 1
    async def disable_telemetry(self, request: Request) -> JSONResponse:
133
        """REST to disable/remove INT flows for an EVC_ID
134
135
        If a list of evc_ids is empty, it'll disable on all INT EVCs.
136
        """
137 1
        await avalidate_openapi_request(self.spec, request)
138
139 1
        try:
140 1
            content = await aget_json_or_400(request)
141 1
            evc_ids = content["evc_ids"]
142 1
            force = content.get("force", False)
143 1
            if not isinstance(force, bool):
144
                raise TypeError(f"'force' wrong type: {type(force)} expected bool")
145
        except (TypeError, KeyError):
146
            raise HTTPException(400, detail=f"Invalid payload: {content}")
147
148 1
        try:
149 1
            evcs = (
150
                await api.get_evcs()
151
                if len(evc_ids) != 1
152
                else await api.get_evc(evc_ids[0])
153
            )
154
        except RetryError as exc:
155
            exc_error = str(exc.last_attempt.exception())
156
            log.error(exc_error)
157
            raise HTTPException(503, detail=exc_error)
158
159 1
        if evc_ids:
160 1
            evcs = {evc_id: evcs.get(evc_id, {}) for evc_id in evc_ids}
161
        else:
162
            evcs = {k: v for k, v in evcs.items() if utils.has_int_enabled(v)}
163
            if not evcs:
164
                # There's no INT EVCs to get disabled.
165
                return JSONResponse(list(evcs.keys()))
166
167 1
        try:
168 1
            await self.int_manager.disable_int(evcs, force)
169
        except EVCNotFound as exc:
170
            raise HTTPException(404, detail=str(exc))
171
        except EVCHasNoINT as exc:
172
            raise HTTPException(409, detail=str(exc))
173
        except RetryError as exc:
174
            exc_error = str(exc.last_attempt.exception())
175
            log.error(exc_error)
176
            raise HTTPException(503, detail=exc_error)
177
        except UnrecoverableError as exc:
178
            exc_error = str(exc)
179
            log.error(exc_error)
180
            raise HTTPException(500, detail=exc_error)
181
182 1
        return JSONResponse(list(evcs.keys()))
183
184 1
    @rest("v1/evc")
185 1
    async def get_evcs(self, _request: Request) -> JSONResponse:
186
        """REST to return the list of EVCs with INT enabled"""
187 1
        try:
188 1
            evcs = await api.get_evcs(**{"metadata.telemetry.enabled": "true"})
189 1
            return JSONResponse(evcs)
190
        except RetryError as exc:
191
            exc_error = str(exc.last_attempt.exception())
192
            log.error(exc_error)
193
            raise HTTPException(503, detail=exc_error)
194
        except UnrecoverableError as exc:
195
            exc_error = str(exc)
196
            log.error(exc_error)
197
            raise HTTPException(500, detail=exc_error)
198
199 1
    @rest("v1/evc/redeploy", methods=["PATCH"])
200 1
    async def redeploy_telemetry(self, request: Request) -> JSONResponse:
201
        """REST to redeploy INT on EVCs.
202
203
        If a list of evc_ids is empty, it'll redeploy on all INT EVCs.
204
        """
205 1
        await avalidate_openapi_request(self.spec, request)
206
207 1
        try:
208 1
            content = await aget_json_or_400(request)
209 1
            evc_ids = content["evc_ids"]
210
        except (TypeError, KeyError):
211
            raise HTTPException(400, detail=f"Invalid payload: {content}")
212
213 1
        try:
214 1
            evcs = (
215
                await api.get_evcs()
216
                if len(evc_ids) != 1
217
                else await api.get_evc(evc_ids[0])
218
            )
219
        except RetryError as exc:
220
            exc_error = str(exc.last_attempt.exception())
221
            log.error(exc_error)
222
            raise HTTPException(503, detail=exc_error)
223
224 1
        if evc_ids:
225 1
            evcs = {evc_id: evcs.get(evc_id, {}) for evc_id in evc_ids}
226
        else:
227
            evcs = {k: v for k, v in evcs.items() if utils.has_int_enabled(v)}
228
            if not evcs:
229
                raise HTTPException(404, detail="There aren't INT EVCs to redeploy")
230
231 1
        try:
232 1
            await self.int_manager.redeploy_int(evcs)
233
        except (EVCNotFound, FlowsNotFound, ProxyPortNotFound) as exc:
234
            raise HTTPException(404, detail=str(exc))
235
        except ProxyPortSameSourceIntraEVC as exc:
236
            raise HTTPException(409, detail=str(exc))
237
        except RetryError as exc:
238
            exc_error = str(exc.last_attempt.exception())
239
            log.error(exc_error)
240
            raise HTTPException(503, detail=exc_error)
241
        except UnrecoverableError as exc:
242
            exc_error = str(exc)
243
            log.error(exc_error)
244
            raise HTTPException(500, detail=exc_error)
245
246 1
        return JSONResponse(list(evcs.keys()), status_code=201)
247
248 1
    @rest("v1/evc/compare")
249 1
    async def evc_compare(self, _request: Request) -> JSONResponse:
250
        """List and compare which INT EVCs have flows installed comparing with
251
        mef_eline flows and telemetry metadata. You should use this endpoint
252
        to confirm if both the telemetry metadata is still coherent and also
253
        the minimum expected number of flows. A list of EVCs will get returned
254
        with the inconsistent INT EVCs. If you encounter any inconsistent
255
        EVC you need to analyze the situation and then decide if you'd
256
        like to force enable or disable INT.
257
        """
258
259 1
        try:
260 1
            int_flows, mef_flows, evcs = await asyncio.gather(
261
                api.get_stored_flows(
262
                    [
263
                        (
264
                            settings.INT_COOKIE_PREFIX << 56,
265
                            settings.INT_COOKIE_PREFIX << 56 | 0xFFFFFFFFFFFFFF,
266
                        ),
267
                    ]
268
                ),
269
                api.get_stored_flows(
270
                    [
271
                        (
272
                            settings.MEF_COOKIE_PREFIX << 56,
273
                            settings.MEF_COOKIE_PREFIX << 56 | 0xFFFFFFFFFFFFFF,
274
                        ),
275
                    ]
276
                ),
277
                api.get_evcs(),
278
            )
279
        except RetryError as exc:
280
            exc_error = str(exc.last_attempt.exception())
281
            log.error(exc_error)
282
            raise HTTPException(503, detail=exc_error)
283
        except UnrecoverableError as exc:
284
            exc_error = str(exc)
285
            log.error(exc_error)
286
            raise HTTPException(500, detail=exc_error)
287
288 1
        response = [
289
            {"id": k, "name": evcs[k]["name"], "compare_reason": v}
290
            for k, v in self.int_manager.evc_compare(int_flows, mef_flows, evcs).items()
291
        ]
292 1
        return JSONResponse(response)
293
294 1
    @alisten_to("kytos/mef_eline.evcs_loaded")
295 1
    async def on_mef_eline_evcs_loaded(self, event: KytosEvent) -> None:
296
        """Handle kytos/mef_eline.evcs_loaded."""
297 1
        self.int_manager.load_uni_src_proxy_ports(event.content)
298
299 1
    @alisten_to("kytos/of_multi_table.enable_table")
300 1
    async def on_table_enabled(self, event):
301
        """Handle of_multi_table.enable_table."""
302 1
        table_group = event.content.get("telemetry_int", {})
303 1
        if not table_group:
304 1
            return
305 1
        for group in table_group:
306 1
            if group not in settings.TABLE_GROUP_ALLOWED:
307 1
                log.error(
308
                    f'The table group "{group}" is not allowed for '
309
                    f"telemetry_int. Allowed table groups are "
310
                    f"{settings.TABLE_GROUP_ALLOWED}"
311
                )
312 1
                return
313 1
        self.int_manager.flow_builder.table_group.update(table_group)
314 1
        content = {"group_table": self.int_manager.flow_builder.table_group}
315 1
        event_out = KytosEvent(name="kytos/telemetry_int.enable_table", content=content)
316 1
        await self.controller.buffers.app.aput(event_out)
317
318 1
    @alisten_to("kytos/mef_eline.deleted")
319 1
    async def on_evc_deleted(self, event: KytosEvent) -> None:
320
        """On EVC deleted."""
321 1
        content = event.content
322 1
        if (
323
            "metadata" in content
324
            and "telemetry" in content["metadata"]
325
            and content["metadata"]["telemetry"]["enabled"]
326
        ):
327 1
            evc_id = content["evc_id"]
328 1
            log.info(f"Handling mef_eline.deleted on EVC id: {evc_id}")
329 1
            await self.int_manager.disable_int({evc_id: content}, force=True)
330
331 1 View Code Duplication
    @alisten_to("kytos/mef_eline.undeployed")
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
332 1
    async def on_evc_undeployed(self, event: KytosEvent) -> None:
333
        """On EVC undeployed."""
334 1
        content = event.content
335 1
        if (
336
            not content["enabled"]
337
            and "metadata" in content
338
            and "telemetry" in content["metadata"]
339
            and content["metadata"]["telemetry"]["enabled"]
340
        ):
341 1
            metadata = {
342
                "telemetry": {
343
                    "enabled": True,
344
                    "status": "DOWN",
345
                    "status_reason": ["undeployed"],
346
                    "status_updated_at": datetime.utcnow().strftime(
347
                        "%Y-%m-%dT%H:%M:%S"
348
                    ),
349
                }
350
            }
351 1
            evc_id = content["evc_id"]
352 1
            evcs = {evc_id: content}
353 1
            log.info(f"Handling mef_eline.undeployed on EVC id: {evc_id}")
354 1
            await self.int_manager.remove_int_flows(evcs, metadata, force=True)
355
356 1
    @alisten_to("kytos/mef_eline.(redeployed_link_down|redeployed_link_up)")
357 1
    async def on_evc_redeployed_link(self, event: KytosEvent) -> None:
358
        """On EVC redeployed_link_down|redeployed_link_up."""
359 1
        content = event.content
360 1
        if (
361
            content["enabled"]
362
            and "metadata" in content
363
            and "telemetry" in content["metadata"]
364
            and content["metadata"]["telemetry"]["enabled"]
365
        ):
366 1
            evc_id = content["evc_id"]
367 1
            content["id"] = evc_id
368 1
            evcs = {evc_id: content}
369 1
            log.info(f"Handling {event.name}, EVC id: {evc_id}")
370 1
            try:
371 1
                await self.int_manager.redeploy_int(evcs)
372
            except EVCError as exc:
373
                log.error(
374
                    f"Failed to redeploy: {exc}. "
375
                    f"Analyze the error and you'll need to redeploy EVC {evc_id} later"
376
                )
377
378 1 View Code Duplication
    @alisten_to("kytos/mef_eline.error_redeploy_link_down")
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
379 1
    async def on_evc_error_redeployed_link_down(self, event: KytosEvent) -> None:
380
        """On EVC error_redeploy_link_down, this is supposed to happen when
381
        a path isn't when mef_eline handles a link down."""
382 1
        content = event.content
383 1
        if (
384
            content["enabled"]
385
            and "metadata" in content
386
            and "telemetry" in content["metadata"]
387
            and content["metadata"]["telemetry"]["enabled"]
388
        ):
389 1
            metadata = {
390
                "telemetry": {
391
                    "enabled": True,
392
                    "status": "DOWN",
393
                    "status_reason": ["redeployed_link_down_no_path"],
394
                    "status_updated_at": datetime.utcnow().strftime(
395
                        "%Y-%m-%dT%H:%M:%S"
396
                    ),
397
                }
398
            }
399 1
            evc_id = content["evc_id"]
400 1
            evcs = {evc_id: content}
401 1
            log.info(
402
                f"Handling mef_eline.redeployed_link_down_no_path on EVC id: {evc_id}"
403
            )
404 1
            await self.int_manager.remove_int_flows(evcs, metadata, force=True)
405
406 1
    @alisten_to("kytos/topology.link_down")
407 1
    async def on_link_down(self, event):
408
        """Handle topology.link_down."""
409 1
        await self.int_manager.handle_pp_link_down(event.content["link"])
410
411 1
    @alisten_to("kytos/topology.link_up")
412 1
    async def on_link_up(self, event):
413
        """Handle topology.link_up."""
414 1
        await self.int_manager.handle_pp_link_up(event.content["link"])
415
416 1
    @alisten_to("kytos/mef_eline.uni_active_updated")
417 1
    async def on_uni_active_updated(self, event: KytosEvent) -> None:
418
        """On mef_eline UNI active updated."""
419 1
        content = event.content
420 1
        if (
421
            "metadata" in content
422
            and "telemetry" in content["metadata"]
423
            and content["metadata"]["telemetry"]["enabled"]
424
        ):
425 1
            evc_id, active = content["evc_id"], content["active"]
426 1
            log.info(
427
                f"Handling mef_eline.uni_active_updated active {active} "
428
                f"on EVC id: {evc_id}"
429
            )
430
431 1
            metadata = {
432
                "telemetry": {
433
                    "enabled": True,
434
                    "status": "UP" if active else "DOWN",
435
                    "status_reason": [] if active else ["uni_down"],
436
                    "status_updated_at": datetime.utcnow().strftime(
437
                        "%Y-%m-%dT%H:%M:%S"
438
                    ),
439
                }
440
            }
441 1
            await api.add_evcs_metadata({evc_id: content}, metadata)
442
443 1
    @alisten_to("kytos/flow_manager.flow.error")
444 1
    async def on_flow_mod_error(self, event: KytosEvent):
445
        """On flow mod errors.
446
447
        Only OFPT_ERRORs will be handled, telemetry_int already uses force: true
448
        """
449 1
        flow = event.content["flow"]
450 1
        if any(
451
            (
452
                event.content.get("error_exception"),
453
                event.content.get("error_command") != "add",
454
                flow.cookie >> 56 != settings.INT_COOKIE_PREFIX,
455
            )
456
        ):
457
            return
458
459 1
        async with self._ofpt_error_lock:
460 1
            evc_id = utils.get_id_from_cookie(flow.cookie)
461 1
            evc = await api.get_evc(evc_id, exclude_archived=False)
462 1
            if (
463
                not evc
464
                or "telemetry" not in evc[evc_id]["metadata"]
465
                or "enabled" not in evc[evc_id]["metadata"]["telemetry"]
466
                or not evc[evc_id]["metadata"]["telemetry"]["enabled"]
467
            ):
468
                return
469
470 1
            metadata = {
471
                "telemetry": {
472
                    "enabled": False,
473
                    "status": "DOWN",
474
                    "status_reason": ["ofpt_error"],
475
                    "status_updated_at": datetime.utcnow().strftime(
476
                        "%Y-%m-%dT%H:%M:%S"
477
                    ),
478
                }
479
            }
480 1
            log.error(
481
                f"Disabling EVC({evc_id}) due to OFPT_ERROR, "
482
                f"error_type: {event.content.get('error_type')}, "
483
                f"error_code: {event.content.get('error_code')}, "
484
                f"flow: {flow.as_dict()} "
485
            )
486
487 1
            evcs = {evc_id: {evc_id: evc_id}}
488 1
            await self.int_manager.remove_int_flows(evcs, metadata, force=True)
489
490 1
    @alisten_to("kytos/topology.interfaces.metadata.removed")
491 1
    async def on_intf_metadata_removed(self, event: KytosEvent) -> None:
492
        """On interface metadata removed."""
493 1
        await self.int_manager.handle_pp_metadata_removed(event.content["interface"])
494
495 1
    @alisten_to("kytos/topology.interfaces.metadata.added")
496 1
    async def on_intf_metadata_added(self, event: KytosEvent) -> None:
497
        """On interface metadata added."""
498 1
        await self.int_manager.handle_pp_metadata_added(event.content["interface"])
499
500
    # Event-driven methods: future
501 1
    def listen_for_new_evcs(self):
502
        """Change newly created EVC to INT-enabled EVC based on the metadata field
503
        (future)"""
504
        pass
505
506 1
    def listen_for_evc_change(self):
507
        """Change newly created EVC to INT-enabled EVC based on the
508
        metadata field (future)"""
509
        pass
510
511 1
    def listen_for_path_changes(self):
512
        """Change EVC's new path to INT-enabled EVC based on the metadata field
513
        when there is a path change. (future)"""
514
        pass
515
516 1
    def listen_for_topology_changes(self):
517
        """If the topology changes, make sure it is not the loop ports.
518
        If so, update proxy ports"""
519
        pass
520