Passed
Pull Request — master (#51)
by Vinicius
09:57 queued 07:16
created

build.main   B

Complexity

Total Complexity 44

Size/Duplication

Total Lines 289
Duplicated Lines 32.18 %

Test Coverage

Coverage 35.38%

Importance

Changes 0
Metric Value
eloc 170
dl 93
loc 289
ccs 46
cts 130
cp 0.3538
rs 8.8798
c 0
b 0
f 0
wmc 44

15 Methods

Rating   Name   Duplication   Size   Complexity  
A Main.listen_for_evc_change() 0 4 1
A Main.listen_for_path_changes() 0 4 1
C Main.disable_telemetry() 46 50 11
A Main.sync_flows() 0 8 1
A Main.listen_for_new_evcs() 0 4 1
A Main.shutdown() 0 2 1
C Main.enable_telemetry() 47 51 11
A Main.setup() 0 11 1
A Main.execute() 0 2 1
A Main.listen_for_evcs_removed() 0 3 1
A Main.update_evc() 0 5 1
A Main.get_evcs() 0 4 1
B Main.on_flow_mod_error() 0 54 7
A Main.on_evc_deleted() 0 18 4
A Main.listen_for_topology_changes() 0 6 1

How to fix   Duplicated Code    Complexity   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

Complexity

 Tip:   Before tackling complexity, make sure that you eliminate any duplication first. This often can reduce the size of classes significantly.

Complex classes like build.main often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""Main module of kytos/telemetry Network Application.
2
3
Napp to deploy In-band Network Telemetry over Ethernet Virtual Circuits
4
5
"""
6
7 1
import asyncio
8 1
from datetime import datetime
9
10 1
import napps.kytos.telemetry_int.kytos_api_helper as api
11 1
from napps.kytos.telemetry_int import settings, utils
12 1
from tenacity import RetryError
13
14 1
from kytos.core import KytosEvent, KytosNApp, log, rest
15 1
from kytos.core.helpers import alisten_to
16 1
from kytos.core.rest_api import HTTPException, JSONResponse, Request, aget_json_or_400
17
18 1
from .exceptions import (
19
    EVCHasINT,
20
    EVCHasNoINT,
21
    EVCNotFound,
22
    FlowsNotFound,
23
    ProxyPortNotFound,
24
    ProxyPortStatusNotUP,
25
    UnrecoverableError,
26
)
27 1
from .managers.int import INTManager
28
29
# pylint: disable=fixme
30
31
32 1
class Main(KytosNApp):
33
    """Main class of kytos/telemetry NApp.
34
35
    This class is the entry point for this NApp.
36
    """
37
38 1
    def setup(self):
39
        """Replace the '__init__' method for the KytosNApp subclass.
40
41
        The setup method is automatically called by the controller when your
42
        application is loaded.
43
44
        So, if you have any setup routine, insert it here.
45
        """
46
47 1
        self.int_manager = INTManager(self.controller)
48 1
        self._ofpt_error_lock = asyncio.Lock()
49
50 1
    def execute(self):
51
        """Run after the setup method execution.
52
53
        You can also use this method in loop mode if you add to the above setup
54
        method a line like the following example:
55
56
            self.execute_as_loop(30)  # 30-second interval.
57
        """
58
59 1
    def shutdown(self):
60
        """Run when your NApp is unloaded.
61
62
        If you have some cleanup procedure, insert it here.
63
        """
64
65 1 View Code Duplication
    @rest("v1/evc/enable", methods=["POST"])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
66 1
    async def enable_telemetry(self, request: Request) -> JSONResponse:
67
        """REST to enable INT flows on EVCs.
68
69
        If a list of evc_ids is empty, it'll enable on non-INT EVCs.
70
        """
71
72
        try:
73
            content = await aget_json_or_400(request)
74
            evc_ids = content["evc_ids"]
75
            force = content.get("force", False)
76
            if not isinstance(force, bool):
77
                raise TypeError(f"'force' wrong type: {type(force)} expected bool")
78
        except (TypeError, KeyError):
79
            raise HTTPException(400, detail=f"Invalid payload: {content}")
80
81
        try:
82
            evcs = (
83
                await api.get_evcs()
84
                if len(evc_ids) != 1
85
                else await api.get_evc(evc_ids[0])
86
            )
87
        except RetryError as exc:
88
            exc_error = str(exc.last_attempt.exception())
89
            log.error(exc_error)
90
            raise HTTPException(503, detail=exc_error)
91
92
        if evc_ids:
93
            evcs = {evc_id: evcs.get(evc_id, {}) for evc_id in evc_ids}
94
        else:
95
            evcs = {k: v for k, v in evcs.items() if not utils.has_int_enabled(v)}
96
            if not evcs:
97
                # There's no non-INT EVCs to get enabled.
98
                return JSONResponse({})
99
100
        try:
101
            await self.int_manager.enable_int(evcs, force)
102
        except (EVCNotFound, FlowsNotFound, ProxyPortNotFound) as exc:
103
            raise HTTPException(404, detail=str(exc))
104
        except (EVCHasINT, ProxyPortStatusNotUP) as exc:
105
            raise HTTPException(400, detail=str(exc))
106
        except RetryError as exc:
107
            exc_error = str(exc.last_attempt.exception())
108
            log.error(exc_error)
109
            raise HTTPException(503, detail=exc_error)
110
        except UnrecoverableError as exc:
111
            exc_error = str(exc)
112
            log.error(exc_error)
113
            raise HTTPException(500, detail=exc_error)
114
115
        return JSONResponse({}, status_code=201)
116
117 1 View Code Duplication
    @rest("v1/evc/disable", methods=["POST"])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
118 1
    async def disable_telemetry(self, request: Request) -> JSONResponse:
119
        """REST to disable/remove INT flows for an EVC_ID
120
121
        If a list of evc_ids is empty, it'll disable on all INT EVCs.
122
        """
123
        try:
124
            content = await aget_json_or_400(request)
125
            evc_ids = content["evc_ids"]
126
            force = content.get("force", False)
127
            if not isinstance(force, bool):
128
                raise TypeError(f"'force' wrong type: {type(force)} expected bool")
129
        except (TypeError, KeyError):
130
            raise HTTPException(400, detail=f"Invalid payload: {content}")
131
132
        try:
133
            evcs = (
134
                await api.get_evcs()
135
                if len(evc_ids) != 1
136
                else await api.get_evc(evc_ids[0])
137
            )
138
        except RetryError as exc:
139
            exc_error = str(exc.last_attempt.exception())
140
            log.error(exc_error)
141
            raise HTTPException(503, detail=exc_error)
142
143
        if evc_ids:
144
            evcs = {evc_id: evcs.get(evc_id, {}) for evc_id in evc_ids}
145
        else:
146
            evcs = {k: v for k, v in evcs.items() if utils.has_int_enabled(v)}
147
            if not evcs:
148
                # There's no INT EVCs to get disabled.
149
                return JSONResponse({})
150
151
        try:
152
            await self.int_manager.disable_int(evcs, force)
153
        except EVCNotFound as exc:
154
            raise HTTPException(404, detail=str(exc))
155
        except EVCHasNoINT as exc:
156
            raise HTTPException(400, detail=str(exc))
157
        except RetryError as exc:
158
            exc_error = str(exc.last_attempt.exception())
159
            log.error(exc_error)
160
            raise HTTPException(503, detail=exc_error)
161
        except UnrecoverableError as exc:
162
            exc_error = str(exc)
163
            log.error(exc_error)
164
            raise HTTPException(500, detail=exc_error)
165
166
        return JSONResponse({})
167
168 1
    @rest("v1/evc")
169 1
    def get_evcs(self, _request: Request) -> JSONResponse:
170
        """REST to return the list of EVCs with INT enabled"""
171
        return JSONResponse(utils.get_evc_with_telemetry())
172
173 1
    @rest("v1/sync")
174 1
    def sync_flows(self, _request: Request) -> JSONResponse:
175
        """Endpoint to force the telemetry napp to search for INT flows and delete them
176
        accordingly to the evc metadata."""
177
178
        # TODO
179
        # for evc_id in get_evcs_ids():
180
        return JSONResponse("TBD")
181
182 1
    @rest("v1/evc/update")
183 1
    def update_evc(self, _request: Request) -> JSONResponse:
184
        """If an EVC changed from unidirectional to bidirectional telemetry,
185
        make the change."""
186
        return JSONResponse({})
187
188 1
    @alisten_to("kytos/mef_eline.deleted")
189 1
    async def on_evc_deleted(self, event: KytosEvent) -> None:
190
        """On EVC deleted."""
191
        content = event.content
192
        if (
193
            "metadata" in content
194
            and "telemetry" in content["metadata"]
195
            and content["metadata"]["telemetry"]["enabled"]
196
        ):
197
            evc_id = content["evc_id"]
198
            log.info(
199
                f"EVC({evc_id}, {content.get('name', '')}) got deleted, "
200
                "INT flows will be removed too"
201
            )
202
            stored_flows = await api.get_stored_flows(
203
                [utils.get_cookie(evc_id, settings.INT_COOKIE_PREFIX)]
204
            )
205
            await self.int_manager.remove_int_flows(stored_flows)
206
207 1
    @alisten_to("kytos/flow_manager.flow.error")
208 1
    async def on_flow_mod_error(self, event: KytosEvent):
209
        """On flow mod errors.
210
211
        Only OFPT_ERRORs will be handled, telemetry_int already uses force: true
212
        """
213 1
        flow = event.content["flow"]
214 1
        if any(
215
            (
216
                event.content.get("error_exception"),
217
                event.content.get("error_command") != "add",
218
                flow.cookie >> 56 != settings.INT_COOKIE_PREFIX,
219
            )
220
        ):
221
            return
222
223 1
        async with self._ofpt_error_lock:
224 1
            evc_id = utils.get_id_from_cookie(flow.cookie)
225 1
            evc = await api.get_evc(evc_id, exclude_archived=False)
226 1
            if (
227
                not evc
228
                or "telemetry" not in evc[evc_id]["metadata"]
229
                or "enabled" not in evc[evc_id]["metadata"]["telemetry"]
230
                or not evc[evc_id]["metadata"]["telemetry"]["enabled"]
231
            ):
232
                return
233
234 1
            metadata = {
235
                "telemetry": {
236
                    "enabled": False,
237
                    "status": "DOWN",
238
                    "status_reason": ["ofpt_error"],
239
                    "status_updated_at": datetime.utcnow().strftime(
240
                        "%Y-%m-%dT%H:%M:%S"
241
                    ),
242
                }
243
            }
244 1
            log.error(
245
                f"Disabling EVC({evc_id}) due to OFPT_ERROR, "
246
                f"error_type: {event.content.get('error_type')}, "
247
                f"error_code: {event.content.get('error_code')}, "
248
                f"flow: {flow.as_dict()} "
249
            )
250
251 1
            evcs = {evc_id: {evc_id: evc_id}}
252 1
            stored_flows = await api.get_stored_flows(
253
                [
254
                    utils.get_cookie(evc_id, settings.INT_COOKIE_PREFIX)
255
                    for evc_id in evcs
256
                ]
257
            )
258 1
            await asyncio.gather(
259
                self.int_manager.remove_int_flows(stored_flows),
260
                api.add_evcs_metadata(evcs, metadata, force=True),
261
            )
262
263
    # Event-driven methods: future
264 1
    def listen_for_new_evcs(self):
265
        """Change newly created EVC to INT-enabled EVC based on the metadata field
266
        (future)"""
267
        pass
268
269 1
    def listen_for_evc_change(self):
270
        """Change newly created EVC to INT-enabled EVC based on the
271
        metadata field (future)"""
272
        pass
273
274 1
    def listen_for_path_changes(self):
275
        """Change EVC's new path to INT-enabled EVC based on the metadata field
276
        when there is a path change. (future)"""
277
        pass
278
279 1
    def listen_for_evcs_removed(self):
280
        """Remove all INT flows belonging the just removed EVC (future)"""
281
        pass
282
283 1
    def listen_for_topology_changes(self):
284
        """If the topology changes, make sure it is not the loop ports.
285
        If so, update proxy ports"""
286
        # TODO:
287
        # self.proxy_ports = create_proxy_ports(self.proxy_ports)
288
        pass
289