Total Complexity | 46 |
Total Lines | 305 |
Duplicated Lines | 34.1 % |
Coverage | 53.46% |
Changes | 0 |
Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.
Common duplication problems, and corresponding solutions are:
Complex classes like build.main often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
1 | """Main module of kytos/telemetry Network Application. |
||
2 | |||
3 | Napp to deploy In-band Network Telemetry over Ethernet Virtual Circuits |
||
4 | |||
5 | """ |
||
6 | |||
7 | 1 | import asyncio |
|
8 | 1 | import pathlib |
|
9 | 1 | from datetime import datetime |
|
10 | |||
11 | 1 | import napps.kytos.telemetry_int.kytos_api_helper as api |
|
12 | 1 | from napps.kytos.telemetry_int import settings, utils |
|
13 | 1 | from tenacity import RetryError |
|
14 | |||
15 | 1 | from kytos.core import KytosEvent, KytosNApp, log, rest |
|
16 | 1 | from kytos.core.helpers import alisten_to, avalidate_openapi_request, load_spec |
|
17 | 1 | from kytos.core.rest_api import HTTPException, JSONResponse, Request, aget_json_or_400 |
|
18 | |||
19 | 1 | from .exceptions import ( |
|
20 | EVCHasINT, |
||
21 | EVCHasNoINT, |
||
22 | EVCNotFound, |
||
23 | FlowsNotFound, |
||
24 | ProxyPortNotFound, |
||
25 | ProxyPortStatusNotUP, |
||
26 | UnrecoverableError, |
||
27 | ) |
||
28 | 1 | from .managers.int import INTManager |
|
29 | |||
30 | # pylint: disable=fixme |
||
31 | |||
32 | |||
33 | 1 | class Main(KytosNApp): |
|
34 | """Main class of kytos/telemetry NApp. |
||
35 | |||
36 | This class is the entry point for this NApp. |
||
37 | """ |
||
38 | |||
39 | 1 | spec = load_spec(pathlib.Path(__file__).parent / "openapi.yml") |
|
40 | |||
41 | 1 | def setup(self): |
|
42 | """Replace the '__init__' method for the KytosNApp subclass. |
||
43 | |||
44 | The setup method is automatically called by the controller when your |
||
45 | application is loaded. |
||
46 | |||
47 | So, if you have any setup routine, insert it here. |
||
48 | """ |
||
49 | |||
50 | 1 | self.int_manager = INTManager(self.controller) |
|
51 | 1 | self._ofpt_error_lock = asyncio.Lock() |
|
52 | |||
53 | 1 | def execute(self): |
|
54 | """Run after the setup method execution. |
||
55 | |||
56 | You can also use this method in loop mode if you add to the above setup |
||
57 | method a line like the following example: |
||
58 | |||
59 | self.execute_as_loop(30) # 30-second interval. |
||
60 | """ |
||
61 | |||
62 | 1 | def shutdown(self): |
|
63 | """Run when your NApp is unloaded. |
||
64 | |||
65 | If you have some cleanup procedure, insert it here. |
||
66 | """ |
||
67 | |||
68 | 1 | View Code Duplication | @rest("v1/evc/enable", methods=["POST"]) |
|
|||
69 | 1 | async def enable_telemetry(self, request: Request) -> JSONResponse: |
|
70 | """REST to enable INT flows on EVCs. |
||
71 | |||
72 | If a list of evc_ids is empty, it'll enable on non-INT EVCs. |
||
73 | """ |
||
74 | 1 | await avalidate_openapi_request(self.spec, request) |
|
75 | |||
76 | 1 | try: |
|
77 | 1 | content = await aget_json_or_400(request) |
|
78 | 1 | evc_ids = content["evc_ids"] |
|
79 | 1 | force = content.get("force", False) |
|
80 | 1 | if not isinstance(force, bool): |
|
81 | raise TypeError(f"'force' wrong type: {type(force)} expected bool") |
||
82 | except (TypeError, KeyError): |
||
83 | raise HTTPException(400, detail=f"Invalid payload: {content}") |
||
84 | |||
85 | 1 | try: |
|
86 | 1 | evcs = ( |
|
87 | await api.get_evcs() |
||
88 | if len(evc_ids) != 1 |
||
89 | else await api.get_evc(evc_ids[0]) |
||
90 | ) |
||
91 | except RetryError as exc: |
||
92 | exc_error = str(exc.last_attempt.exception()) |
||
93 | log.error(exc_error) |
||
94 | raise HTTPException(503, detail=exc_error) |
||
95 | |||
96 | 1 | if evc_ids: |
|
97 | 1 | evcs = {evc_id: evcs.get(evc_id, {}) for evc_id in evc_ids} |
|
98 | else: |
||
99 | evcs = {k: v for k, v in evcs.items() if not utils.has_int_enabled(v)} |
||
100 | if not evcs: |
||
101 | # There's no non-INT EVCs to get enabled. |
||
102 | return JSONResponse(list(evcs.keys())) |
||
103 | |||
104 | 1 | try: |
|
105 | 1 | await self.int_manager.enable_int(evcs, force) |
|
106 | except (EVCNotFound, FlowsNotFound, ProxyPortNotFound) as exc: |
||
107 | raise HTTPException(404, detail=str(exc)) |
||
108 | except (EVCHasINT, ProxyPortStatusNotUP) as exc: |
||
109 | raise HTTPException(409, detail=str(exc)) |
||
110 | except RetryError as exc: |
||
111 | exc_error = str(exc.last_attempt.exception()) |
||
112 | log.error(exc_error) |
||
113 | raise HTTPException(503, detail=exc_error) |
||
114 | except UnrecoverableError as exc: |
||
115 | exc_error = str(exc) |
||
116 | log.error(exc_error) |
||
117 | raise HTTPException(500, detail=exc_error) |
||
118 | |||
119 | 1 | return JSONResponse(list(evcs.keys()), status_code=201) |
|
120 | |||
121 | 1 | View Code Duplication | @rest("v1/evc/disable", methods=["POST"]) |
122 | 1 | async def disable_telemetry(self, request: Request) -> JSONResponse: |
|
123 | """REST to disable/remove INT flows for an EVC_ID |
||
124 | |||
125 | If a list of evc_ids is empty, it'll disable on all INT EVCs. |
||
126 | """ |
||
127 | 1 | await avalidate_openapi_request(self.spec, request) |
|
128 | |||
129 | 1 | try: |
|
130 | 1 | content = await aget_json_or_400(request) |
|
131 | 1 | evc_ids = content["evc_ids"] |
|
132 | 1 | force = content.get("force", False) |
|
133 | 1 | if not isinstance(force, bool): |
|
134 | raise TypeError(f"'force' wrong type: {type(force)} expected bool") |
||
135 | except (TypeError, KeyError): |
||
136 | raise HTTPException(400, detail=f"Invalid payload: {content}") |
||
137 | |||
138 | 1 | try: |
|
139 | 1 | evcs = ( |
|
140 | await api.get_evcs() |
||
141 | if len(evc_ids) != 1 |
||
142 | else await api.get_evc(evc_ids[0]) |
||
143 | ) |
||
144 | except RetryError as exc: |
||
145 | exc_error = str(exc.last_attempt.exception()) |
||
146 | log.error(exc_error) |
||
147 | raise HTTPException(503, detail=exc_error) |
||
148 | |||
149 | 1 | if evc_ids: |
|
150 | 1 | evcs = {evc_id: evcs.get(evc_id, {}) for evc_id in evc_ids} |
|
151 | else: |
||
152 | evcs = {k: v for k, v in evcs.items() if utils.has_int_enabled(v)} |
||
153 | if not evcs: |
||
154 | # There's no INT EVCs to get disabled. |
||
155 | return JSONResponse(list(evcs.keys())) |
||
156 | |||
157 | 1 | try: |
|
158 | 1 | await self.int_manager.disable_int(evcs, force) |
|
159 | except EVCNotFound as exc: |
||
160 | raise HTTPException(404, detail=str(exc)) |
||
161 | except EVCHasNoINT as exc: |
||
162 | raise HTTPException(409, detail=str(exc)) |
||
163 | except RetryError as exc: |
||
164 | exc_error = str(exc.last_attempt.exception()) |
||
165 | log.error(exc_error) |
||
166 | raise HTTPException(503, detail=exc_error) |
||
167 | except UnrecoverableError as exc: |
||
168 | exc_error = str(exc) |
||
169 | log.error(exc_error) |
||
170 | raise HTTPException(500, detail=exc_error) |
||
171 | |||
172 | 1 | return JSONResponse(list(evcs.keys())) |
|
173 | |||
174 | 1 | @rest("v1/evc") |
|
175 | 1 | async def get_evcs(self, _request: Request) -> JSONResponse: |
|
176 | """REST to return the list of EVCs with INT enabled""" |
||
177 | 1 | try: |
|
178 | 1 | evcs = await api.get_evcs(**{"metadata.telemetry.enabled": "true"}) |
|
179 | 1 | return JSONResponse(evcs) |
|
180 | except RetryError as exc: |
||
181 | exc_error = str(exc.last_attempt.exception()) |
||
182 | log.error(exc_error) |
||
183 | raise HTTPException(503, detail=exc_error) |
||
184 | except UnrecoverableError as exc: |
||
185 | exc_error = str(exc) |
||
186 | log.error(exc_error) |
||
187 | raise HTTPException(500, detail=exc_error) |
||
188 | |||
189 | 1 | @rest("v1/sync") |
|
190 | 1 | def sync_flows(self, _request: Request) -> JSONResponse: |
|
191 | """Endpoint to force the telemetry napp to search for INT flows and delete them |
||
192 | accordingly to the evc metadata.""" |
||
193 | |||
194 | # TODO |
||
195 | # for evc_id in get_evcs_ids(): |
||
196 | return JSONResponse("TBD") |
||
197 | |||
198 | 1 | @rest("v1/evc/update") |
|
199 | 1 | def update_evc(self, _request: Request) -> JSONResponse: |
|
200 | """If an EVC changed from unidirectional to bidirectional telemetry, |
||
201 | make the change.""" |
||
202 | return JSONResponse({}) |
||
203 | |||
204 | 1 | @alisten_to("kytos/mef_eline.deleted") |
|
205 | 1 | async def on_evc_deleted(self, event: KytosEvent) -> None: |
|
206 | """On EVC deleted.""" |
||
207 | content = event.content |
||
208 | if ( |
||
209 | "metadata" in content |
||
210 | and "telemetry" in content["metadata"] |
||
211 | and content["metadata"]["telemetry"]["enabled"] |
||
212 | ): |
||
213 | evc_id = content["evc_id"] |
||
214 | log.info( |
||
215 | f"EVC({evc_id}, {content.get('name', '')}) got deleted, " |
||
216 | "INT flows will be removed too" |
||
217 | ) |
||
218 | stored_flows = await api.get_stored_flows( |
||
219 | [utils.get_cookie(evc_id, settings.INT_COOKIE_PREFIX)] |
||
220 | ) |
||
221 | await self.int_manager.remove_int_flows(stored_flows) |
||
222 | |||
223 | 1 | @alisten_to("kytos/flow_manager.flow.error") |
|
224 | 1 | async def on_flow_mod_error(self, event: KytosEvent): |
|
225 | """On flow mod errors. |
||
226 | |||
227 | Only OFPT_ERRORs will be handled, telemetry_int already uses force: true |
||
228 | """ |
||
229 | 1 | flow = event.content["flow"] |
|
230 | 1 | if any( |
|
231 | ( |
||
232 | event.content.get("error_exception"), |
||
233 | event.content.get("error_command") != "add", |
||
234 | flow.cookie >> 56 != settings.INT_COOKIE_PREFIX, |
||
235 | ) |
||
236 | ): |
||
237 | return |
||
238 | |||
239 | 1 | async with self._ofpt_error_lock: |
|
240 | 1 | evc_id = utils.get_id_from_cookie(flow.cookie) |
|
241 | 1 | evc = await api.get_evc(evc_id, exclude_archived=False) |
|
242 | 1 | if ( |
|
243 | not evc |
||
244 | or "telemetry" not in evc[evc_id]["metadata"] |
||
245 | or "enabled" not in evc[evc_id]["metadata"]["telemetry"] |
||
246 | or not evc[evc_id]["metadata"]["telemetry"]["enabled"] |
||
247 | ): |
||
248 | return |
||
249 | |||
250 | 1 | metadata = { |
|
251 | "telemetry": { |
||
252 | "enabled": False, |
||
253 | "status": "DOWN", |
||
254 | "status_reason": ["ofpt_error"], |
||
255 | "status_updated_at": datetime.utcnow().strftime( |
||
256 | "%Y-%m-%dT%H:%M:%S" |
||
257 | ), |
||
258 | } |
||
259 | } |
||
260 | 1 | log.error( |
|
261 | f"Disabling EVC({evc_id}) due to OFPT_ERROR, " |
||
262 | f"error_type: {event.content.get('error_type')}, " |
||
263 | f"error_code: {event.content.get('error_code')}, " |
||
264 | f"flow: {flow.as_dict()} " |
||
265 | ) |
||
266 | |||
267 | 1 | evcs = {evc_id: {evc_id: evc_id}} |
|
268 | 1 | stored_flows = await api.get_stored_flows( |
|
269 | [ |
||
270 | utils.get_cookie(evc_id, settings.INT_COOKIE_PREFIX) |
||
271 | for evc_id in evcs |
||
272 | ] |
||
273 | ) |
||
274 | 1 | await asyncio.gather( |
|
275 | self.int_manager.remove_int_flows(stored_flows), |
||
276 | api.add_evcs_metadata(evcs, metadata, force=True), |
||
277 | ) |
||
278 | |||
279 | # Event-driven methods: future |
||
280 | 1 | def listen_for_new_evcs(self): |
|
281 | """Change newly created EVC to INT-enabled EVC based on the metadata field |
||
282 | (future)""" |
||
283 | pass |
||
284 | |||
285 | 1 | def listen_for_evc_change(self): |
|
286 | """Change newly created EVC to INT-enabled EVC based on the |
||
287 | metadata field (future)""" |
||
288 | pass |
||
289 | |||
290 | 1 | def listen_for_path_changes(self): |
|
291 | """Change EVC's new path to INT-enabled EVC based on the metadata field |
||
292 | when there is a path change. (future)""" |
||
293 | pass |
||
294 | |||
295 | 1 | def listen_for_evcs_removed(self): |
|
296 | """Remove all INT flows belonging the just removed EVC (future)""" |
||
297 | pass |
||
298 | |||
299 | 1 | def listen_for_topology_changes(self): |
|
300 | """If the topology changes, make sure it is not the loop ports. |
||
301 | If so, update proxy ports""" |
||
302 | # TODO: |
||
303 | # self.proxy_ports = create_proxy_ports(self.proxy_ports) |
||
304 | pass |
||
305 |