Passed
Push — master ( 925753...cbc6a7 )
by Vinicius
05:16 queued 03:28
created

build.main.Main.get_evcs()   A

Complexity

Conditions 1

Size

Total Lines 4
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 1.037

Importance

Changes 0
Metric Value
cc 1
eloc 3
nop 2
dl 0
loc 4
ccs 2
cts 3
cp 0.6667
crap 1.037
rs 10
c 0
b 0
f 0
1
"""Main module of kytos/telemetry Network Application.
2
3
Napp to deploy In-band Network Telemetry over Ethernet Virtual Circuits
4
5
"""
6
7 1
import asyncio
8 1
import pathlib
9 1
from datetime import datetime
10
11 1
import napps.kytos.telemetry_int.kytos_api_helper as api
12 1
from napps.kytos.telemetry_int import settings, utils
13 1
from tenacity import RetryError
14
15 1
from kytos.core import KytosEvent, KytosNApp, log, rest
16 1
from kytos.core.helpers import alisten_to, avalidate_openapi_request, load_spec
17 1
from kytos.core.rest_api import HTTPException, JSONResponse, Request, aget_json_or_400
18
19 1
from .exceptions import (
20
    EVCHasINT,
21
    EVCHasNoINT,
22
    EVCNotFound,
23
    FlowsNotFound,
24
    ProxyPortNotFound,
25
    ProxyPortStatusNotUP,
26
    UnrecoverableError,
27
)
28 1
from .managers.int import INTManager
29
30
# pylint: disable=fixme
31
32
33 1
class Main(KytosNApp):
34
    """Main class of kytos/telemetry NApp.
35
36
    This class is the entry point for this NApp.
37
    """
38
39 1
    spec = load_spec(pathlib.Path(__file__).parent / "openapi.yml")
40
41 1
    def setup(self):
42
        """Replace the '__init__' method for the KytosNApp subclass.
43
44
        The setup method is automatically called by the controller when your
45
        application is loaded.
46
47
        So, if you have any setup routine, insert it here.
48
        """
49
50 1
        self.int_manager = INTManager(self.controller)
51 1
        self._ofpt_error_lock = asyncio.Lock()
52
53 1
    def execute(self):
54
        """Run after the setup method execution.
55
56
        You can also use this method in loop mode if you add to the above setup
57
        method a line like the following example:
58
59
            self.execute_as_loop(30)  # 30-second interval.
60
        """
61
62 1
    def shutdown(self):
63
        """Run when your NApp is unloaded.
64
65
        If you have some cleanup procedure, insert it here.
66
        """
67
68 1 View Code Duplication
    @rest("v1/evc/enable", methods=["POST"])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
69 1
    async def enable_telemetry(self, request: Request) -> JSONResponse:
70
        """REST to enable INT flows on EVCs.
71
72
        If a list of evc_ids is empty, it'll enable on non-INT EVCs.
73
        """
74 1
        await avalidate_openapi_request(self.spec, request)
75
76 1
        try:
77 1
            content = await aget_json_or_400(request)
78 1
            evc_ids = content["evc_ids"]
79 1
            force = content.get("force", False)
80 1
            if not isinstance(force, bool):
81
                raise TypeError(f"'force' wrong type: {type(force)} expected bool")
82
        except (TypeError, KeyError):
83
            raise HTTPException(400, detail=f"Invalid payload: {content}")
84
85 1
        try:
86 1
            evcs = (
87
                await api.get_evcs()
88
                if len(evc_ids) != 1
89
                else await api.get_evc(evc_ids[0])
90
            )
91
        except RetryError as exc:
92
            exc_error = str(exc.last_attempt.exception())
93
            log.error(exc_error)
94
            raise HTTPException(503, detail=exc_error)
95
96 1
        if evc_ids:
97 1
            evcs = {evc_id: evcs.get(evc_id, {}) for evc_id in evc_ids}
98
        else:
99
            evcs = {k: v for k, v in evcs.items() if not utils.has_int_enabled(v)}
100
            if not evcs:
101
                # There's no non-INT EVCs to get enabled.
102
                return JSONResponse(list(evcs.keys()))
103
104 1
        try:
105 1
            await self.int_manager.enable_int(evcs, force)
106
        except (EVCNotFound, FlowsNotFound, ProxyPortNotFound) as exc:
107
            raise HTTPException(404, detail=str(exc))
108
        except (EVCHasINT, ProxyPortStatusNotUP) as exc:
109
            raise HTTPException(409, detail=str(exc))
110
        except RetryError as exc:
111
            exc_error = str(exc.last_attempt.exception())
112
            log.error(exc_error)
113
            raise HTTPException(503, detail=exc_error)
114
        except UnrecoverableError as exc:
115
            exc_error = str(exc)
116
            log.error(exc_error)
117
            raise HTTPException(500, detail=exc_error)
118
119 1
        return JSONResponse(list(evcs.keys()), status_code=201)
120
121 1 View Code Duplication
    @rest("v1/evc/disable", methods=["POST"])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
122 1
    async def disable_telemetry(self, request: Request) -> JSONResponse:
123
        """REST to disable/remove INT flows for an EVC_ID
124
125
        If a list of evc_ids is empty, it'll disable on all INT EVCs.
126
        """
127 1
        await avalidate_openapi_request(self.spec, request)
128
129 1
        try:
130 1
            content = await aget_json_or_400(request)
131 1
            evc_ids = content["evc_ids"]
132 1
            force = content.get("force", False)
133 1
            if not isinstance(force, bool):
134
                raise TypeError(f"'force' wrong type: {type(force)} expected bool")
135
        except (TypeError, KeyError):
136
            raise HTTPException(400, detail=f"Invalid payload: {content}")
137
138 1
        try:
139 1
            evcs = (
140
                await api.get_evcs()
141
                if len(evc_ids) != 1
142
                else await api.get_evc(evc_ids[0])
143
            )
144
        except RetryError as exc:
145
            exc_error = str(exc.last_attempt.exception())
146
            log.error(exc_error)
147
            raise HTTPException(503, detail=exc_error)
148
149 1
        if evc_ids:
150 1
            evcs = {evc_id: evcs.get(evc_id, {}) for evc_id in evc_ids}
151
        else:
152
            evcs = {k: v for k, v in evcs.items() if utils.has_int_enabled(v)}
153
            if not evcs:
154
                # There's no INT EVCs to get disabled.
155
                return JSONResponse(list(evcs.keys()))
156
157 1
        try:
158 1
            await self.int_manager.disable_int(evcs, force)
159
        except EVCNotFound as exc:
160
            raise HTTPException(404, detail=str(exc))
161
        except EVCHasNoINT as exc:
162
            raise HTTPException(409, detail=str(exc))
163
        except RetryError as exc:
164
            exc_error = str(exc.last_attempt.exception())
165
            log.error(exc_error)
166
            raise HTTPException(503, detail=exc_error)
167
        except UnrecoverableError as exc:
168
            exc_error = str(exc)
169
            log.error(exc_error)
170
            raise HTTPException(500, detail=exc_error)
171
172 1
        return JSONResponse(list(evcs.keys()))
173
174 1
    @rest("v1/evc")
175 1
    def get_evcs(self, _request: Request) -> JSONResponse:
176
        """REST to return the list of EVCs with INT enabled"""
177
        return JSONResponse(utils.get_evc_with_telemetry())
178
179 1
    @rest("v1/sync")
180 1
    def sync_flows(self, _request: Request) -> JSONResponse:
181
        """Endpoint to force the telemetry napp to search for INT flows and delete them
182
        accordingly to the evc metadata."""
183
184
        # TODO
185
        # for evc_id in get_evcs_ids():
186
        return JSONResponse("TBD")
187
188 1
    @rest("v1/evc/update")
189 1
    def update_evc(self, _request: Request) -> JSONResponse:
190
        """If an EVC changed from unidirectional to bidirectional telemetry,
191
        make the change."""
192
        return JSONResponse({})
193
194 1
    @alisten_to("kytos/mef_eline.deleted")
195 1
    async def on_evc_deleted(self, event: KytosEvent) -> None:
196
        """On EVC deleted."""
197
        content = event.content
198
        if (
199
            "metadata" in content
200
            and "telemetry" in content["metadata"]
201
            and content["metadata"]["telemetry"]["enabled"]
202
        ):
203
            evc_id = content["evc_id"]
204
            log.info(
205
                f"EVC({evc_id}, {content.get('name', '')}) got deleted, "
206
                "INT flows will be removed too"
207
            )
208
            stored_flows = await api.get_stored_flows(
209
                [utils.get_cookie(evc_id, settings.INT_COOKIE_PREFIX)]
210
            )
211
            await self.int_manager.remove_int_flows(stored_flows)
212
213 1
    @alisten_to("kytos/flow_manager.flow.error")
214 1
    async def on_flow_mod_error(self, event: KytosEvent):
215
        """On flow mod errors.
216
217
        Only OFPT_ERRORs will be handled, telemetry_int already uses force: true
218
        """
219 1
        flow = event.content["flow"]
220 1
        if any(
221
            (
222
                event.content.get("error_exception"),
223
                event.content.get("error_command") != "add",
224
                flow.cookie >> 56 != settings.INT_COOKIE_PREFIX,
225
            )
226
        ):
227
            return
228
229 1
        async with self._ofpt_error_lock:
230 1
            evc_id = utils.get_id_from_cookie(flow.cookie)
231 1
            evc = await api.get_evc(evc_id, exclude_archived=False)
232 1
            if (
233
                not evc
234
                or "telemetry" not in evc[evc_id]["metadata"]
235
                or "enabled" not in evc[evc_id]["metadata"]["telemetry"]
236
                or not evc[evc_id]["metadata"]["telemetry"]["enabled"]
237
            ):
238
                return
239
240 1
            metadata = {
241
                "telemetry": {
242
                    "enabled": False,
243
                    "status": "DOWN",
244
                    "status_reason": ["ofpt_error"],
245
                    "status_updated_at": datetime.utcnow().strftime(
246
                        "%Y-%m-%dT%H:%M:%S"
247
                    ),
248
                }
249
            }
250 1
            log.error(
251
                f"Disabling EVC({evc_id}) due to OFPT_ERROR, "
252
                f"error_type: {event.content.get('error_type')}, "
253
                f"error_code: {event.content.get('error_code')}, "
254
                f"flow: {flow.as_dict()} "
255
            )
256
257 1
            evcs = {evc_id: {evc_id: evc_id}}
258 1
            stored_flows = await api.get_stored_flows(
259
                [
260
                    utils.get_cookie(evc_id, settings.INT_COOKIE_PREFIX)
261
                    for evc_id in evcs
262
                ]
263
            )
264 1
            await asyncio.gather(
265
                self.int_manager.remove_int_flows(stored_flows),
266
                api.add_evcs_metadata(evcs, metadata, force=True),
267
            )
268
269
    # Event-driven methods: future
270 1
    def listen_for_new_evcs(self):
271
        """Change newly created EVC to INT-enabled EVC based on the metadata field
272
        (future)"""
273
        pass
274
275 1
    def listen_for_evc_change(self):
276
        """Change newly created EVC to INT-enabled EVC based on the
277
        metadata field (future)"""
278
        pass
279
280 1
    def listen_for_path_changes(self):
281
        """Change EVC's new path to INT-enabled EVC based on the metadata field
282
        when there is a path change. (future)"""
283
        pass
284
285 1
    def listen_for_evcs_removed(self):
286
        """Remove all INT flows belonging the just removed EVC (future)"""
287
        pass
288
289 1
    def listen_for_topology_changes(self):
290
        """If the topology changes, make sure it is not the loop ports.
291
        If so, update proxy ports"""
292
        # TODO:
293
        # self.proxy_ports = create_proxy_ports(self.proxy_ports)
294
        pass
295