Passed
Pull Request — master (#60)
by Vinicius
03:21
created

build.main   B

Complexity

Total Complexity 46

Size/Duplication

Total Lines 305
Duplicated Lines 34.1 %

Test Coverage

Coverage 53.46%

Importance

Changes 0
Metric Value
eloc 184
dl 104
loc 305
ccs 77
cts 144
cp 0.5346
rs 8.72
c 0
b 0
f 0
wmc 46

15 Methods

Rating   Name   Duplication   Size   Complexity  
C Main.disable_telemetry() 52 52 11
A Main.shutdown() 0 2 1
C Main.enable_telemetry() 52 52 11
A Main.setup() 0 11 1
A Main.execute() 0 2 1
A Main.listen_for_evc_change() 0 4 1
A Main.listen_for_path_changes() 0 4 1
A Main.sync_flows() 0 8 1
A Main.listen_for_new_evcs() 0 4 1
A Main.listen_for_evcs_removed() 0 3 1
A Main.update_evc() 0 5 1
A Main.get_evcs() 0 14 3
B Main.on_flow_mod_error() 0 54 7
A Main.on_evc_deleted() 0 18 4
A Main.listen_for_topology_changes() 0 6 1

How to fix   Duplicated Code    Complexity   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

Complexity

 Tip:   Before tackling complexity, make sure that you eliminate any duplication first. This often can reduce the size of classes significantly.

Complex classes like build.main often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""Main module of kytos/telemetry Network Application.
2
3
Napp to deploy In-band Network Telemetry over Ethernet Virtual Circuits
4
5
"""
6
7 1
import asyncio
8 1
import pathlib
9 1
from datetime import datetime
10
11 1
import napps.kytos.telemetry_int.kytos_api_helper as api
12 1
from napps.kytos.telemetry_int import settings, utils
13 1
from tenacity import RetryError
14
15 1
from kytos.core import KytosEvent, KytosNApp, log, rest
16 1
from kytos.core.helpers import alisten_to, avalidate_openapi_request, load_spec
17 1
from kytos.core.rest_api import HTTPException, JSONResponse, Request, aget_json_or_400
18
19 1
from .exceptions import (
20
    EVCHasINT,
21
    EVCHasNoINT,
22
    EVCNotFound,
23
    FlowsNotFound,
24
    ProxyPortNotFound,
25
    ProxyPortStatusNotUP,
26
    UnrecoverableError,
27
)
28 1
from .managers.int import INTManager
29
30
# pylint: disable=fixme
31
32
33 1
class Main(KytosNApp):
34
    """Main class of kytos/telemetry NApp.
35
36
    This class is the entry point for this NApp.
37
    """
38
39 1
    spec = load_spec(pathlib.Path(__file__).parent / "openapi.yml")
40
41 1
    def setup(self):
42
        """Replace the '__init__' method for the KytosNApp subclass.
43
44
        The setup method is automatically called by the controller when your
45
        application is loaded.
46
47
        So, if you have any setup routine, insert it here.
48
        """
49
50 1
        self.int_manager = INTManager(self.controller)
51 1
        self._ofpt_error_lock = asyncio.Lock()
52
53 1
    def execute(self):
54
        """Run after the setup method execution.
55
56
        You can also use this method in loop mode if you add to the above setup
57
        method a line like the following example:
58
59
            self.execute_as_loop(30)  # 30-second interval.
60
        """
61
62 1
    def shutdown(self):
63
        """Run when your NApp is unloaded.
64
65
        If you have some cleanup procedure, insert it here.
66
        """
67
68 1 View Code Duplication
    @rest("v1/evc/enable", methods=["POST"])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
69 1
    async def enable_telemetry(self, request: Request) -> JSONResponse:
70
        """REST to enable INT flows on EVCs.
71
72
        If a list of evc_ids is empty, it'll enable on non-INT EVCs.
73
        """
74 1
        await avalidate_openapi_request(self.spec, request)
75
76 1
        try:
77 1
            content = await aget_json_or_400(request)
78 1
            evc_ids = content["evc_ids"]
79 1
            force = content.get("force", False)
80 1
            if not isinstance(force, bool):
81
                raise TypeError(f"'force' wrong type: {type(force)} expected bool")
82
        except (TypeError, KeyError):
83
            raise HTTPException(400, detail=f"Invalid payload: {content}")
84
85 1
        try:
86 1
            evcs = (
87
                await api.get_evcs()
88
                if len(evc_ids) != 1
89
                else await api.get_evc(evc_ids[0])
90
            )
91
        except RetryError as exc:
92
            exc_error = str(exc.last_attempt.exception())
93
            log.error(exc_error)
94
            raise HTTPException(503, detail=exc_error)
95
96 1
        if evc_ids:
97 1
            evcs = {evc_id: evcs.get(evc_id, {}) for evc_id in evc_ids}
98
        else:
99
            evcs = {k: v for k, v in evcs.items() if not utils.has_int_enabled(v)}
100
            if not evcs:
101
                # There's no non-INT EVCs to get enabled.
102
                return JSONResponse(list(evcs.keys()))
103
104 1
        try:
105 1
            await self.int_manager.enable_int(evcs, force)
106
        except (EVCNotFound, FlowsNotFound, ProxyPortNotFound) as exc:
107
            raise HTTPException(404, detail=str(exc))
108
        except (EVCHasINT, ProxyPortStatusNotUP) as exc:
109
            raise HTTPException(409, detail=str(exc))
110
        except RetryError as exc:
111
            exc_error = str(exc.last_attempt.exception())
112
            log.error(exc_error)
113
            raise HTTPException(503, detail=exc_error)
114
        except UnrecoverableError as exc:
115
            exc_error = str(exc)
116
            log.error(exc_error)
117
            raise HTTPException(500, detail=exc_error)
118
119 1
        return JSONResponse(list(evcs.keys()), status_code=201)
120
121 1 View Code Duplication
    @rest("v1/evc/disable", methods=["POST"])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
122 1
    async def disable_telemetry(self, request: Request) -> JSONResponse:
123
        """REST to disable/remove INT flows for an EVC_ID
124
125
        If a list of evc_ids is empty, it'll disable on all INT EVCs.
126
        """
127 1
        await avalidate_openapi_request(self.spec, request)
128
129 1
        try:
130 1
            content = await aget_json_or_400(request)
131 1
            evc_ids = content["evc_ids"]
132 1
            force = content.get("force", False)
133 1
            if not isinstance(force, bool):
134
                raise TypeError(f"'force' wrong type: {type(force)} expected bool")
135
        except (TypeError, KeyError):
136
            raise HTTPException(400, detail=f"Invalid payload: {content}")
137
138 1
        try:
139 1
            evcs = (
140
                await api.get_evcs()
141
                if len(evc_ids) != 1
142
                else await api.get_evc(evc_ids[0])
143
            )
144
        except RetryError as exc:
145
            exc_error = str(exc.last_attempt.exception())
146
            log.error(exc_error)
147
            raise HTTPException(503, detail=exc_error)
148
149 1
        if evc_ids:
150 1
            evcs = {evc_id: evcs.get(evc_id, {}) for evc_id in evc_ids}
151
        else:
152
            evcs = {k: v for k, v in evcs.items() if utils.has_int_enabled(v)}
153
            if not evcs:
154
                # There's no INT EVCs to get disabled.
155
                return JSONResponse(list(evcs.keys()))
156
157 1
        try:
158 1
            await self.int_manager.disable_int(evcs, force)
159
        except EVCNotFound as exc:
160
            raise HTTPException(404, detail=str(exc))
161
        except EVCHasNoINT as exc:
162
            raise HTTPException(409, detail=str(exc))
163
        except RetryError as exc:
164
            exc_error = str(exc.last_attempt.exception())
165
            log.error(exc_error)
166
            raise HTTPException(503, detail=exc_error)
167
        except UnrecoverableError as exc:
168
            exc_error = str(exc)
169
            log.error(exc_error)
170
            raise HTTPException(500, detail=exc_error)
171
172 1
        return JSONResponse(list(evcs.keys()))
173
174 1
    @rest("v1/evc")
175 1
    async def get_evcs(self, _request: Request) -> JSONResponse:
176
        """REST to return the list of EVCs with INT enabled"""
177 1
        try:
178 1
            evcs = await api.get_evcs(**{"metadata.telemetry.enabled": "true"})
179 1
            return JSONResponse(evcs)
180
        except RetryError as exc:
181
            exc_error = str(exc.last_attempt.exception())
182
            log.error(exc_error)
183
            raise HTTPException(503, detail=exc_error)
184
        except UnrecoverableError as exc:
185
            exc_error = str(exc)
186
            log.error(exc_error)
187
            raise HTTPException(500, detail=exc_error)
188
189 1
    @rest("v1/sync")
190 1
    def sync_flows(self, _request: Request) -> JSONResponse:
191
        """Endpoint to force the telemetry napp to search for INT flows and delete them
192
        accordingly to the evc metadata."""
193
194
        # TODO
195
        # for evc_id in get_evcs_ids():
196
        return JSONResponse("TBD")
197
198 1
    @rest("v1/evc/update")
199 1
    def update_evc(self, _request: Request) -> JSONResponse:
200
        """If an EVC changed from unidirectional to bidirectional telemetry,
201
        make the change."""
202
        return JSONResponse({})
203
204 1
    @alisten_to("kytos/mef_eline.deleted")
205 1
    async def on_evc_deleted(self, event: KytosEvent) -> None:
206
        """On EVC deleted."""
207
        content = event.content
208
        if (
209
            "metadata" in content
210
            and "telemetry" in content["metadata"]
211
            and content["metadata"]["telemetry"]["enabled"]
212
        ):
213
            evc_id = content["evc_id"]
214
            log.info(
215
                f"EVC({evc_id}, {content.get('name', '')}) got deleted, "
216
                "INT flows will be removed too"
217
            )
218
            stored_flows = await api.get_stored_flows(
219
                [utils.get_cookie(evc_id, settings.INT_COOKIE_PREFIX)]
220
            )
221
            await self.int_manager.remove_int_flows(stored_flows)
222
223 1
    @alisten_to("kytos/flow_manager.flow.error")
224 1
    async def on_flow_mod_error(self, event: KytosEvent):
225
        """On flow mod errors.
226
227
        Only OFPT_ERRORs will be handled, telemetry_int already uses force: true
228
        """
229 1
        flow = event.content["flow"]
230 1
        if any(
231
            (
232
                event.content.get("error_exception"),
233
                event.content.get("error_command") != "add",
234
                flow.cookie >> 56 != settings.INT_COOKIE_PREFIX,
235
            )
236
        ):
237
            return
238
239 1
        async with self._ofpt_error_lock:
240 1
            evc_id = utils.get_id_from_cookie(flow.cookie)
241 1
            evc = await api.get_evc(evc_id, exclude_archived=False)
242 1
            if (
243
                not evc
244
                or "telemetry" not in evc[evc_id]["metadata"]
245
                or "enabled" not in evc[evc_id]["metadata"]["telemetry"]
246
                or not evc[evc_id]["metadata"]["telemetry"]["enabled"]
247
            ):
248
                return
249
250 1
            metadata = {
251
                "telemetry": {
252
                    "enabled": False,
253
                    "status": "DOWN",
254
                    "status_reason": ["ofpt_error"],
255
                    "status_updated_at": datetime.utcnow().strftime(
256
                        "%Y-%m-%dT%H:%M:%S"
257
                    ),
258
                }
259
            }
260 1
            log.error(
261
                f"Disabling EVC({evc_id}) due to OFPT_ERROR, "
262
                f"error_type: {event.content.get('error_type')}, "
263
                f"error_code: {event.content.get('error_code')}, "
264
                f"flow: {flow.as_dict()} "
265
            )
266
267 1
            evcs = {evc_id: {evc_id: evc_id}}
268 1
            stored_flows = await api.get_stored_flows(
269
                [
270
                    utils.get_cookie(evc_id, settings.INT_COOKIE_PREFIX)
271
                    for evc_id in evcs
272
                ]
273
            )
274 1
            await asyncio.gather(
275
                self.int_manager.remove_int_flows(stored_flows),
276
                api.add_evcs_metadata(evcs, metadata, force=True),
277
            )
278
279
    # Event-driven methods: future
280 1
    def listen_for_new_evcs(self):
281
        """Change newly created EVC to INT-enabled EVC based on the metadata field
282
        (future)"""
283
        pass
284
285 1
    def listen_for_evc_change(self):
286
        """Change newly created EVC to INT-enabled EVC based on the
287
        metadata field (future)"""
288
        pass
289
290 1
    def listen_for_path_changes(self):
291
        """Change EVC's new path to INT-enabled EVC based on the metadata field
292
        when there is a path change. (future)"""
293
        pass
294
295 1
    def listen_for_evcs_removed(self):
296
        """Remove all INT flows belonging the just removed EVC (future)"""
297
        pass
298
299 1
    def listen_for_topology_changes(self):
300
        """If the topology changes, make sure it is not the loop ports.
301
        If so, update proxy ports"""
302
        # TODO:
303
        # self.proxy_ports = create_proxy_ports(self.proxy_ports)
304
        pass
305