Passed
Pull Request — master (#68)
by Vinicius
06:29
created

build.main.Main.setup()   A

Complexity

Conditions 1

Size

Total Lines 11
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 3
CRAP Score 1

Importance

Changes 0
Metric Value
cc 1
eloc 3
nop 1
dl 0
loc 11
ccs 3
cts 3
cp 1
crap 1
rs 10
c 0
b 0
f 0
1
"""Main module of kytos/telemetry Network Application.
2
3
Napp to deploy In-band Network Telemetry over Ethernet Virtual Circuits
4
5
"""
6
7 1
import asyncio
8 1
import pathlib
9 1
from datetime import datetime
10
11 1
import napps.kytos.telemetry_int.kytos_api_helper as api
12 1
from napps.kytos.telemetry_int import settings, utils
13 1
from tenacity import RetryError
14
15 1
from kytos.core import KytosEvent, KytosNApp, log, rest
16 1
from kytos.core.helpers import alisten_to, avalidate_openapi_request, load_spec
17 1
from kytos.core.rest_api import HTTPException, JSONResponse, Request, aget_json_or_400
18
19 1
from .exceptions import (
20
    EVCHasINT,
21
    EVCHasNoINT,
22
    EVCNotFound,
23
    FlowsNotFound,
24
    ProxyPortNotFound,
25
    ProxyPortSameSourceIntraEVC,
26
    ProxyPortStatusNotUP,
27
    UnrecoverableError,
28
)
29 1
from .managers.int import INTManager
30
31
# pylint: disable=fixme
32
33
34 1
class Main(KytosNApp):
35
    """Main class of kytos/telemetry NApp.
36
37
    This class is the entry point for this NApp.
38
    """
39
40 1
    spec = load_spec(pathlib.Path(__file__).parent / "openapi.yml")
41
42 1
    def setup(self):
43
        """Replace the '__init__' method for the KytosNApp subclass.
44
45
        The setup method is automatically called by the controller when your
46
        application is loaded.
47
48
        So, if you have any setup routine, insert it here.
49
        """
50
51 1
        self.int_manager = INTManager(self.controller)
52 1
        self._ofpt_error_lock = asyncio.Lock()
53
54 1
    def execute(self):
55
        """Run after the setup method execution.
56
57
        You can also use this method in loop mode if you add to the above setup
58
        method a line like the following example:
59
60
            self.execute_as_loop(30)  # 30-second interval.
61
        """
62
63 1
    def shutdown(self):
64
        """Run when your NApp is unloaded.
65
66
        If you have some cleanup procedure, insert it here.
67
        """
68
69 1 View Code Duplication
    @rest("v1/evc/enable", methods=["POST"])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
70 1
    async def enable_telemetry(self, request: Request) -> JSONResponse:
71
        """REST to enable INT flows on EVCs.
72
73
        If a list of evc_ids is empty, it'll enable on non-INT EVCs.
74
        """
75 1
        await avalidate_openapi_request(self.spec, request)
76
77 1
        try:
78 1
            content = await aget_json_or_400(request)
79 1
            evc_ids = content["evc_ids"]
80 1
            force = content.get("force", False)
81 1
            if not isinstance(force, bool):
82
                raise TypeError(f"'force' wrong type: {type(force)} expected bool")
83
        except (TypeError, KeyError):
84
            raise HTTPException(400, detail=f"Invalid payload: {content}")
85
86 1
        try:
87 1
            evcs = (
88
                await api.get_evcs()
89
                if len(evc_ids) != 1
90
                else await api.get_evc(evc_ids[0])
91
            )
92
        except RetryError as exc:
93
            exc_error = str(exc.last_attempt.exception())
94
            log.error(exc_error)
95
            raise HTTPException(503, detail=exc_error)
96
97 1
        if evc_ids:
98 1
            evcs = {evc_id: evcs.get(evc_id, {}) for evc_id in evc_ids}
99
        else:
100
            evcs = {k: v for k, v in evcs.items() if not utils.has_int_enabled(v)}
101
            if not evcs:
102
                # There's no non-INT EVCs to get enabled.
103
                return JSONResponse(list(evcs.keys()))
104
105 1
        try:
106 1
            await self.int_manager.enable_int(evcs, force)
107
        except (EVCNotFound, FlowsNotFound, ProxyPortNotFound) as exc:
108
            raise HTTPException(404, detail=str(exc))
109
        except (EVCHasINT, ProxyPortStatusNotUP, ProxyPortSameSourceIntraEVC) as exc:
110
            raise HTTPException(409, detail=str(exc))
111
        except RetryError as exc:
112
            exc_error = str(exc.last_attempt.exception())
113
            log.error(exc_error)
114
            raise HTTPException(503, detail=exc_error)
115
        except UnrecoverableError as exc:
116
            exc_error = str(exc)
117
            log.error(exc_error)
118
            raise HTTPException(500, detail=exc_error)
119
120 1
        return JSONResponse(list(evcs.keys()), status_code=201)
121
122 1 View Code Duplication
    @rest("v1/evc/disable", methods=["POST"])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
123 1
    async def disable_telemetry(self, request: Request) -> JSONResponse:
124
        """REST to disable/remove INT flows for an EVC_ID
125
126
        If a list of evc_ids is empty, it'll disable on all INT EVCs.
127
        """
128 1
        await avalidate_openapi_request(self.spec, request)
129
130 1
        try:
131 1
            content = await aget_json_or_400(request)
132 1
            evc_ids = content["evc_ids"]
133 1
            force = content.get("force", False)
134 1
            if not isinstance(force, bool):
135
                raise TypeError(f"'force' wrong type: {type(force)} expected bool")
136
        except (TypeError, KeyError):
137
            raise HTTPException(400, detail=f"Invalid payload: {content}")
138
139 1
        try:
140 1
            evcs = (
141
                await api.get_evcs()
142
                if len(evc_ids) != 1
143
                else await api.get_evc(evc_ids[0])
144
            )
145
        except RetryError as exc:
146
            exc_error = str(exc.last_attempt.exception())
147
            log.error(exc_error)
148
            raise HTTPException(503, detail=exc_error)
149
150 1
        if evc_ids:
151 1
            evcs = {evc_id: evcs.get(evc_id, {}) for evc_id in evc_ids}
152
        else:
153
            evcs = {k: v for k, v in evcs.items() if utils.has_int_enabled(v)}
154
            if not evcs:
155
                # There's no INT EVCs to get disabled.
156
                return JSONResponse(list(evcs.keys()))
157
158 1
        try:
159 1
            await self.int_manager.disable_int(evcs, force)
160
        except EVCNotFound as exc:
161
            raise HTTPException(404, detail=str(exc))
162
        except EVCHasNoINT as exc:
163
            raise HTTPException(409, detail=str(exc))
164
        except RetryError as exc:
165
            exc_error = str(exc.last_attempt.exception())
166
            log.error(exc_error)
167
            raise HTTPException(503, detail=exc_error)
168
        except UnrecoverableError as exc:
169
            exc_error = str(exc)
170
            log.error(exc_error)
171
            raise HTTPException(500, detail=exc_error)
172
173 1
        return JSONResponse(list(evcs.keys()))
174
175 1
    @rest("v1/evc")
176 1
    async def get_evcs(self, _request: Request) -> JSONResponse:
177
        """REST to return the list of EVCs with INT enabled"""
178 1
        try:
179 1
            evcs = await api.get_evcs(**{"metadata.telemetry.enabled": "true"})
180 1
            return JSONResponse(evcs)
181
        except RetryError as exc:
182
            exc_error = str(exc.last_attempt.exception())
183
            log.error(exc_error)
184
            raise HTTPException(503, detail=exc_error)
185
        except UnrecoverableError as exc:
186
            exc_error = str(exc)
187
            log.error(exc_error)
188
            raise HTTPException(500, detail=exc_error)
189
190 1
    @rest("v1/sync")
191 1
    def sync_flows(self, _request: Request) -> JSONResponse:
192
        """Endpoint to force the telemetry napp to search for INT flows and delete them
193
        accordingly to the evc metadata."""
194
195
        # TODO
196
        # for evc_id in get_evcs_ids():
197
        return JSONResponse("TBD")
198
199 1
    @rest("v1/evc/update")
200 1
    def update_evc(self, _request: Request) -> JSONResponse:
201
        """If an EVC changed from unidirectional to bidirectional telemetry,
202
        make the change."""
203
        return JSONResponse({})
204
205 1
    @alisten_to("kytos/mef_eline.evcs_loaded")
206 1
    async def on_mef_eline_evcs_loaded(self, event: KytosEvent) -> None:
207
        """Handle kytos/mef_eline.evcs_loaded."""
208 1
        self.int_manager.load_uni_src_proxy_ports(event.content)
209
210 1
    @alisten_to("kytos/of_multi_table.enable_table")
211 1
    async def on_table_enabled(self, event):
212
        """Handle of_multi_table.enable_table."""
213 1
        table_group = event.content.get("telemetry_int", {})
214 1
        if not table_group:
215
            return
216 1
        for group in table_group:
217 1
            if group not in settings.TABLE_GROUP_ALLOWED:
218 1
                log.error(
219
                    f'The table group "{group}" is not allowed for '
220
                    f"telemetry_int. Allowed table groups are "
221
                    f"{settings.TABLE_GROUP_ALLOWED}"
222
                )
223 1
                return
224 1
        self.int_manager.flow_builder.table_group.update(table_group)
225 1
        content = {"group_table": self.int_manager.flow_builder.table_group}
226 1
        event_out = KytosEvent(name="kytos/telemetry_int.enable_table", content=content)
227 1
        await self.controller.buffers.app.aput(event_out)
228
229 1
    @alisten_to("kytos/mef_eline.deleted")
230 1
    async def on_evc_deleted(self, event: KytosEvent) -> None:
231
        """On EVC deleted."""
232
        content = event.content
233
        if (
234
            "metadata" in content
235
            and "telemetry" in content["metadata"]
236
            and content["metadata"]["telemetry"]["enabled"]
237
        ):
238
            evc_id = content["evc_id"]
239
            log.info(f"Event mef_eline.deleted on EVC id: {evc_id}")
240
            await self.int_manager.disable_int({evc_id: content}, force=True)
241
242 1
    @alisten_to("kytos/topology.link_down")
243 1
    async def on_link_down(self, event):
244
        """Handle topology.link_down."""
245
        await self.int_manager.handle_pp_link_down(event.content["link"])
246
247 1
    @alisten_to("kytos/topology.link_up")
248 1
    async def on_link_up(self, event):
249
        """Handle topology.link_up."""
250
        await self.int_manager.handle_pp_link_up(event.content["link"])
251
252 1
    @alisten_to("kytos/flow_manager.flow.error")
253 1
    async def on_flow_mod_error(self, event: KytosEvent):
254
        """On flow mod errors.
255
256
        Only OFPT_ERRORs will be handled, telemetry_int already uses force: true
257
        """
258 1
        flow = event.content["flow"]
259 1
        if any(
260
            (
261
                event.content.get("error_exception"),
262
                event.content.get("error_command") != "add",
263
                flow.cookie >> 56 != settings.INT_COOKIE_PREFIX,
264
            )
265
        ):
266
            return
267
268 1
        async with self._ofpt_error_lock:
269 1
            evc_id = utils.get_id_from_cookie(flow.cookie)
270 1
            evc = await api.get_evc(evc_id, exclude_archived=False)
271 1
            if (
272
                not evc
273
                or "telemetry" not in evc[evc_id]["metadata"]
274
                or "enabled" not in evc[evc_id]["metadata"]["telemetry"]
275
                or not evc[evc_id]["metadata"]["telemetry"]["enabled"]
276
            ):
277
                return
278
279 1
            metadata = {
280
                "telemetry": {
281
                    "enabled": False,
282
                    "status": "DOWN",
283
                    "status_reason": ["ofpt_error"],
284
                    "status_updated_at": datetime.utcnow().strftime(
285
                        "%Y-%m-%dT%H:%M:%S"
286
                    ),
287
                }
288
            }
289 1
            log.error(
290
                f"Disabling EVC({evc_id}) due to OFPT_ERROR, "
291
                f"error_type: {event.content.get('error_type')}, "
292
                f"error_code: {event.content.get('error_code')}, "
293
                f"flow: {flow.as_dict()} "
294
            )
295
296 1
            evcs = {evc_id: {evc_id: evc_id}}
297 1
            await self.int_manager.remove_int_flows(evcs, metadata, force=True)
298
299
    # Event-driven methods: future
300 1
    def listen_for_new_evcs(self):
301
        """Change newly created EVC to INT-enabled EVC based on the metadata field
302
        (future)"""
303
        pass
304
305 1
    def listen_for_evc_change(self):
306
        """Change newly created EVC to INT-enabled EVC based on the
307
        metadata field (future)"""
308
        pass
309
310 1
    def listen_for_path_changes(self):
311
        """Change EVC's new path to INT-enabled EVC based on the metadata field
312
        when there is a path change. (future)"""
313
        pass
314
315 1
    def listen_for_topology_changes(self):
316
        """If the topology changes, make sure it is not the loop ports.
317
        If so, update proxy ports"""
318
        pass
319