Passed
Push — master ( a932fa...68b174 )
by Vinicius
01:56 queued 20s
created

build.main.Main.evc_compare()   A

Complexity

Conditions 3

Size

Total Lines 45
Code Lines 27

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 6
CRAP Score 4.679

Importance

Changes 0
Metric Value
cc 3
eloc 27
nop 2
dl 0
loc 45
ccs 6
cts 14
cp 0.4286
crap 4.679
rs 9.232
c 0
b 0
f 0
1
"""Main module of kytos/telemetry Network Application.
2
3
Napp to deploy In-band Network Telemetry over Ethernet Virtual Circuits
4
5
"""
6
7 1
import asyncio
8 1
import pathlib
9 1
from datetime import datetime
10
11 1
import napps.kytos.telemetry_int.kytos_api_helper as api
12 1
from napps.kytos.telemetry_int import settings, utils
13 1
from tenacity import RetryError
14
15 1
from kytos.core import KytosEvent, KytosNApp, log, rest
16 1
from kytos.core.helpers import alisten_to, avalidate_openapi_request, load_spec
17 1
from kytos.core.rest_api import HTTPException, JSONResponse, Request, aget_json_or_400
18
19 1
from .exceptions import (
20
    EVCHasINT,
21
    EVCHasNoINT,
22
    EVCNotFound,
23
    FlowsNotFound,
24
    ProxyPortNotFound,
25
    ProxyPortSameSourceIntraEVC,
26
    ProxyPortStatusNotUP,
27
    UnrecoverableError,
28
)
29 1
from .managers.int import INTManager
30
31
# pylint: disable=fixme
32
33
34 1
class Main(KytosNApp):
35
    """Main class of kytos/telemetry NApp.
36
37
    This class is the entry point for this NApp.
38
    """
39
40 1
    spec = load_spec(pathlib.Path(__file__).parent / "openapi.yml")
41
42 1
    def setup(self):
43
        """Replace the '__init__' method for the KytosNApp subclass.
44
45
        The setup method is automatically called by the controller when your
46
        application is loaded.
47
48
        So, if you have any setup routine, insert it here.
49
        """
50
51 1
        self.int_manager = INTManager(self.controller)
52 1
        self._ofpt_error_lock = asyncio.Lock()
53
54 1
    def execute(self):
55
        """Run after the setup method execution.
56
57
        You can also use this method in loop mode if you add to the above setup
58
        method a line like the following example:
59
60
            self.execute_as_loop(30)  # 30-second interval.
61
        """
62
63 1
    def shutdown(self):
64
        """Run when your NApp is unloaded.
65
66
        If you have some cleanup procedure, insert it here.
67
        """
68
69 1
    @rest("v1/evc/enable", methods=["POST"])
70 1
    async def enable_telemetry(self, request: Request) -> JSONResponse:
71
        """REST to enable INT flows on EVCs.
72
73
        If a list of evc_ids is empty, it'll enable on non-INT EVCs.
74
        """
75 1
        await avalidate_openapi_request(self.spec, request)
76
77 1
        try:
78 1
            content = await aget_json_or_400(request)
79 1
            evc_ids = content["evc_ids"]
80 1
            force = content.get("force", False)
81 1
            if not isinstance(force, bool):
82
                raise TypeError(f"'force' wrong type: {type(force)} expected bool")
83
        except (TypeError, KeyError):
84
            raise HTTPException(400, detail=f"Invalid payload: {content}")
85
86 1
        try:
87 1
            evcs = (
88
                await api.get_evcs()
89
                if len(evc_ids) != 1
90
                else await api.get_evc(evc_ids[0])
91
            )
92
        except RetryError as exc:
93
            exc_error = str(exc.last_attempt.exception())
94
            log.error(exc_error)
95
            raise HTTPException(503, detail=exc_error)
96
97 1
        if evc_ids:
98 1
            evcs = {evc_id: evcs.get(evc_id, {}) for evc_id in evc_ids}
99
        else:
100
            evcs = {k: v for k, v in evcs.items() if not utils.has_int_enabled(v)}
101
            if not evcs:
102
                # There's no non-INT EVCs to get enabled.
103
                return JSONResponse(list(evcs.keys()))
104
105 1
        try:
106
            # First, it tries to get and remove the existing INT flows like mef_eline
107 1
            stored_flows = await api.get_stored_flows(
108
                [
109
                    utils.get_cookie(evc_id, settings.INT_COOKIE_PREFIX)
110
                    for evc_id in evcs
111
                ]
112
            )
113 1
            await self.int_manager._remove_int_flows(stored_flows)
114 1
            await self.int_manager.enable_int(evcs, force)
115
        except (EVCNotFound, FlowsNotFound, ProxyPortNotFound) as exc:
116
            raise HTTPException(404, detail=str(exc))
117
        except (EVCHasINT, ProxyPortStatusNotUP, ProxyPortSameSourceIntraEVC) as exc:
118
            raise HTTPException(409, detail=str(exc))
119
        except RetryError as exc:
120
            exc_error = str(exc.last_attempt.exception())
121
            log.error(exc_error)
122
            raise HTTPException(503, detail=exc_error)
123
        except UnrecoverableError as exc:
124
            exc_error = str(exc)
125
            log.error(exc_error)
126
            raise HTTPException(500, detail=exc_error)
127
128 1
        return JSONResponse(list(evcs.keys()), status_code=201)
129
130 1
    @rest("v1/evc/disable", methods=["POST"])
131 1
    async def disable_telemetry(self, request: Request) -> JSONResponse:
132
        """REST to disable/remove INT flows for an EVC_ID
133
134
        If a list of evc_ids is empty, it'll disable on all INT EVCs.
135
        """
136 1
        await avalidate_openapi_request(self.spec, request)
137
138 1
        try:
139 1
            content = await aget_json_or_400(request)
140 1
            evc_ids = content["evc_ids"]
141 1
            force = content.get("force", False)
142 1
            if not isinstance(force, bool):
143
                raise TypeError(f"'force' wrong type: {type(force)} expected bool")
144
        except (TypeError, KeyError):
145
            raise HTTPException(400, detail=f"Invalid payload: {content}")
146
147 1
        try:
148 1
            evcs = (
149
                await api.get_evcs()
150
                if len(evc_ids) != 1
151
                else await api.get_evc(evc_ids[0])
152
            )
153
        except RetryError as exc:
154
            exc_error = str(exc.last_attempt.exception())
155
            log.error(exc_error)
156
            raise HTTPException(503, detail=exc_error)
157
158 1
        if evc_ids:
159 1
            evcs = {evc_id: evcs.get(evc_id, {}) for evc_id in evc_ids}
160
        else:
161
            evcs = {k: v for k, v in evcs.items() if utils.has_int_enabled(v)}
162
            if not evcs:
163
                # There's no INT EVCs to get disabled.
164
                return JSONResponse(list(evcs.keys()))
165
166 1
        try:
167 1
            await self.int_manager.disable_int(evcs, force)
168
        except EVCNotFound as exc:
169
            raise HTTPException(404, detail=str(exc))
170
        except EVCHasNoINT as exc:
171
            raise HTTPException(409, detail=str(exc))
172
        except RetryError as exc:
173
            exc_error = str(exc.last_attempt.exception())
174
            log.error(exc_error)
175
            raise HTTPException(503, detail=exc_error)
176
        except UnrecoverableError as exc:
177
            exc_error = str(exc)
178
            log.error(exc_error)
179
            raise HTTPException(500, detail=exc_error)
180
181 1
        return JSONResponse(list(evcs.keys()))
182
183 1
    @rest("v1/evc")
184 1
    async def get_evcs(self, _request: Request) -> JSONResponse:
185
        """REST to return the list of EVCs with INT enabled"""
186 1
        try:
187 1
            evcs = await api.get_evcs(**{"metadata.telemetry.enabled": "true"})
188 1
            return JSONResponse(evcs)
189
        except RetryError as exc:
190
            exc_error = str(exc.last_attempt.exception())
191
            log.error(exc_error)
192
            raise HTTPException(503, detail=exc_error)
193
        except UnrecoverableError as exc:
194
            exc_error = str(exc)
195
            log.error(exc_error)
196
            raise HTTPException(500, detail=exc_error)
197
198 1
    @rest("v1/evc/redeploy", methods=["PATCH"])
199 1
    async def redeploy_telemetry(self, request: Request) -> JSONResponse:
200
        """REST to redeploy INT on EVCs.
201
202
        If a list of evc_ids is empty, it'll redeploy on all INT EVCs.
203
        """
204 1
        await avalidate_openapi_request(self.spec, request)
205
206 1
        try:
207 1
            content = await aget_json_or_400(request)
208 1
            evc_ids = content["evc_ids"]
209
        except (TypeError, KeyError):
210
            raise HTTPException(400, detail=f"Invalid payload: {content}")
211
212 1
        try:
213 1
            evcs = (
214
                await api.get_evcs()
215
                if len(evc_ids) != 1
216
                else await api.get_evc(evc_ids[0])
217
            )
218
        except RetryError as exc:
219
            exc_error = str(exc.last_attempt.exception())
220
            log.error(exc_error)
221
            raise HTTPException(503, detail=exc_error)
222
223 1
        if evc_ids:
224 1
            evcs = {evc_id: evcs.get(evc_id, {}) for evc_id in evc_ids}
225
        else:
226
            evcs = {k: v for k, v in evcs.items() if utils.has_int_enabled(v)}
227
            if not evcs:
228
                raise HTTPException(404, detail="There aren't INT EVCs to redeploy")
229
230 1
        try:
231 1
            await self.int_manager.redeploy_int(evcs)
232
        except (EVCNotFound, FlowsNotFound, ProxyPortNotFound) as exc:
233
            raise HTTPException(404, detail=str(exc))
234
        except ProxyPortSameSourceIntraEVC as exc:
235
            raise HTTPException(409, detail=str(exc))
236
        except RetryError as exc:
237
            exc_error = str(exc.last_attempt.exception())
238
            log.error(exc_error)
239
            raise HTTPException(503, detail=exc_error)
240
        except UnrecoverableError as exc:
241
            exc_error = str(exc)
242
            log.error(exc_error)
243
            raise HTTPException(500, detail=exc_error)
244
245 1
        return JSONResponse(list(evcs.keys()), status_code=201)
246
247 1
    @rest("v1/evc/compare")
248 1
    async def evc_compare(self, _request: Request) -> JSONResponse:
249
        """List and compare which INT EVCs have flows installed comparing with
250
        mef_eline flows and telemetry metadata. You should use this endpoint
251
        to confirm if both the telemetry metadata is still coherent and also
252
        the minimum expected number of flows. A list of EVCs will get returned
253
        with the inconsistent INT EVCs. If you encounter any inconsistent
254
        EVC you need to analyze the situation and then decide if you'd
255
        like to force enable or disable INT.
256
        """
257
258 1
        try:
259 1
            int_flows, mef_flows, evcs = await asyncio.gather(
260
                api.get_stored_flows(
261
                    [
262
                        (
263
                            settings.INT_COOKIE_PREFIX << 56,
264
                            settings.INT_COOKIE_PREFIX << 56 | 0xFFFFFFFFFFFFFF,
265
                        ),
266
                    ]
267
                ),
268
                api.get_stored_flows(
269
                    [
270
                        (
271
                            settings.MEF_COOKIE_PREFIX << 56,
272
                            settings.MEF_COOKIE_PREFIX << 56 | 0xFFFFFFFFFFFFFF,
273
                        ),
274
                    ]
275
                ),
276
                api.get_evcs(),
277
            )
278
        except RetryError as exc:
279
            exc_error = str(exc.last_attempt.exception())
280
            log.error(exc_error)
281
            raise HTTPException(503, detail=exc_error)
282
        except UnrecoverableError as exc:
283
            exc_error = str(exc)
284
            log.error(exc_error)
285
            raise HTTPException(500, detail=exc_error)
286
287 1
        response = [
288
            {"id": k, "name": evcs[k]["name"], "compare_reason": v}
289
            for k, v in self.int_manager.evc_compare(int_flows, mef_flows, evcs).items()
290
        ]
291 1
        return JSONResponse(response)
292
293 1
    @alisten_to("kytos/mef_eline.evcs_loaded")
294 1
    async def on_mef_eline_evcs_loaded(self, event: KytosEvent) -> None:
295
        """Handle kytos/mef_eline.evcs_loaded."""
296 1
        self.int_manager.load_uni_src_proxy_ports(event.content)
297
298 1
    @alisten_to("kytos/of_multi_table.enable_table")
299 1
    async def on_table_enabled(self, event):
300
        """Handle of_multi_table.enable_table."""
301 1
        table_group = event.content.get("telemetry_int", {})
302 1
        if not table_group:
303
            return
304 1
        for group in table_group:
305 1
            if group not in settings.TABLE_GROUP_ALLOWED:
306 1
                log.error(
307
                    f'The table group "{group}" is not allowed for '
308
                    f"telemetry_int. Allowed table groups are "
309
                    f"{settings.TABLE_GROUP_ALLOWED}"
310
                )
311 1
                return
312 1
        self.int_manager.flow_builder.table_group.update(table_group)
313 1
        content = {"group_table": self.int_manager.flow_builder.table_group}
314 1
        event_out = KytosEvent(name="kytos/telemetry_int.enable_table", content=content)
315 1
        await self.controller.buffers.app.aput(event_out)
316
317 1
    @alisten_to("kytos/mef_eline.deleted")
318 1
    async def on_evc_deleted(self, event: KytosEvent) -> None:
319
        """On EVC deleted."""
320
        content = event.content
321
        if (
322
            "metadata" in content
323
            and "telemetry" in content["metadata"]
324
            and content["metadata"]["telemetry"]["enabled"]
325
        ):
326
            evc_id = content["evc_id"]
327
            log.info(f"Event mef_eline.deleted on EVC id: {evc_id}")
328
            await self.int_manager.disable_int({evc_id: content}, force=True)
329
330 1
    @alisten_to("kytos/topology.link_down")
331 1
    async def on_link_down(self, event):
332
        """Handle topology.link_down."""
333
        await self.int_manager.handle_pp_link_down(event.content["link"])
334
335 1
    @alisten_to("kytos/topology.link_up")
336 1
    async def on_link_up(self, event):
337
        """Handle topology.link_up."""
338
        await self.int_manager.handle_pp_link_up(event.content["link"])
339
340 1
    @alisten_to("kytos/flow_manager.flow.error")
341 1
    async def on_flow_mod_error(self, event: KytosEvent):
342
        """On flow mod errors.
343
344
        Only OFPT_ERRORs will be handled, telemetry_int already uses force: true
345
        """
346 1
        flow = event.content["flow"]
347 1
        if any(
348
            (
349
                event.content.get("error_exception"),
350
                event.content.get("error_command") != "add",
351
                flow.cookie >> 56 != settings.INT_COOKIE_PREFIX,
352
            )
353
        ):
354
            return
355
356 1
        async with self._ofpt_error_lock:
357 1
            evc_id = utils.get_id_from_cookie(flow.cookie)
358 1
            evc = await api.get_evc(evc_id, exclude_archived=False)
359 1
            if (
360
                not evc
361
                or "telemetry" not in evc[evc_id]["metadata"]
362
                or "enabled" not in evc[evc_id]["metadata"]["telemetry"]
363
                or not evc[evc_id]["metadata"]["telemetry"]["enabled"]
364
            ):
365
                return
366
367 1
            metadata = {
368
                "telemetry": {
369
                    "enabled": False,
370
                    "status": "DOWN",
371
                    "status_reason": ["ofpt_error"],
372
                    "status_updated_at": datetime.utcnow().strftime(
373
                        "%Y-%m-%dT%H:%M:%S"
374
                    ),
375
                }
376
            }
377 1
            log.error(
378
                f"Disabling EVC({evc_id}) due to OFPT_ERROR, "
379
                f"error_type: {event.content.get('error_type')}, "
380
                f"error_code: {event.content.get('error_code')}, "
381
                f"flow: {flow.as_dict()} "
382
            )
383
384 1
            evcs = {evc_id: {evc_id: evc_id}}
385 1
            await self.int_manager.remove_int_flows(evcs, metadata, force=True)
386
387 1
    @alisten_to("kytos/topology.interfaces.metadata.removed")
388 1
    async def on_intf_metadata_removed(self, event: KytosEvent) -> None:
389
        """On interface metadata removed."""
390 1
        await self.int_manager.handle_pp_metadata_removed(event.content["interface"])
391
392
    # Event-driven methods: future
393 1
    def listen_for_new_evcs(self):
394
        """Change newly created EVC to INT-enabled EVC based on the metadata field
395
        (future)"""
396
        pass
397
398 1
    def listen_for_evc_change(self):
399
        """Change newly created EVC to INT-enabled EVC based on the
400
        metadata field (future)"""
401
        pass
402
403 1
    def listen_for_path_changes(self):
404
        """Change EVC's new path to INT-enabled EVC based on the metadata field
405
        when there is a path change. (future)"""
406
        pass
407
408 1
    def listen_for_topology_changes(self):
409
        """If the topology changes, make sure it is not the loop ports.
410
        If so, update proxy ports"""
411
        pass
412