Total Complexity | 50 |
Total Lines | 325 |
Duplicated Lines | 32 % |
Coverage | 56.69% |
Changes | 0 |
Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.
Common duplication problems, and corresponding solutions are:
Complex classes like build.main often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
1 | """Main module of kytos/telemetry Network Application. |
||
2 | |||
3 | Napp to deploy In-band Network Telemetry over Ethernet Virtual Circuits |
||
4 | |||
5 | """ |
||
6 | |||
7 | 1 | import asyncio |
|
8 | 1 | import pathlib |
|
9 | 1 | from datetime import datetime |
|
10 | |||
11 | 1 | import napps.kytos.telemetry_int.kytos_api_helper as api |
|
12 | 1 | from napps.kytos.telemetry_int import settings, utils |
|
13 | 1 | from tenacity import RetryError |
|
14 | |||
15 | 1 | from kytos.core import KytosEvent, KytosNApp, log, rest |
|
16 | 1 | from kytos.core.helpers import alisten_to, avalidate_openapi_request, load_spec |
|
17 | 1 | from kytos.core.rest_api import HTTPException, JSONResponse, Request, aget_json_or_400 |
|
18 | |||
19 | 1 | from .exceptions import ( |
|
20 | EVCHasINT, |
||
21 | EVCHasNoINT, |
||
22 | EVCNotFound, |
||
23 | FlowsNotFound, |
||
24 | ProxyPortNotFound, |
||
25 | ProxyPortSameSourceIntraEVC, |
||
26 | ProxyPortStatusNotUP, |
||
27 | UnrecoverableError, |
||
28 | ) |
||
29 | 1 | from .managers.int import INTManager |
|
30 | |||
31 | # pylint: disable=fixme |
||
32 | |||
33 | |||
34 | 1 | class Main(KytosNApp): |
|
35 | """Main class of kytos/telemetry NApp. |
||
36 | |||
37 | This class is the entry point for this NApp. |
||
38 | """ |
||
39 | |||
40 | 1 | spec = load_spec(pathlib.Path(__file__).parent / "openapi.yml") |
|
41 | |||
42 | 1 | def setup(self): |
|
43 | """Replace the '__init__' method for the KytosNApp subclass. |
||
44 | |||
45 | The setup method is automatically called by the controller when your |
||
46 | application is loaded. |
||
47 | |||
48 | So, if you have any setup routine, insert it here. |
||
49 | """ |
||
50 | |||
51 | 1 | self.int_manager = INTManager(self.controller) |
|
52 | 1 | self._ofpt_error_lock = asyncio.Lock() |
|
53 | |||
54 | 1 | def execute(self): |
|
55 | """Run after the setup method execution. |
||
56 | |||
57 | You can also use this method in loop mode if you add to the above setup |
||
58 | method a line like the following example: |
||
59 | |||
60 | self.execute_as_loop(30) # 30-second interval. |
||
61 | """ |
||
62 | |||
63 | 1 | def shutdown(self): |
|
64 | """Run when your NApp is unloaded. |
||
65 | |||
66 | If you have some cleanup procedure, insert it here. |
||
67 | """ |
||
68 | |||
69 | 1 | View Code Duplication | @rest("v1/evc/enable", methods=["POST"]) |
|
|||
70 | 1 | async def enable_telemetry(self, request: Request) -> JSONResponse: |
|
71 | """REST to enable INT flows on EVCs. |
||
72 | |||
73 | If a list of evc_ids is empty, it'll enable on non-INT EVCs. |
||
74 | """ |
||
75 | 1 | await avalidate_openapi_request(self.spec, request) |
|
76 | |||
77 | 1 | try: |
|
78 | 1 | content = await aget_json_or_400(request) |
|
79 | 1 | evc_ids = content["evc_ids"] |
|
80 | 1 | force = content.get("force", False) |
|
81 | 1 | if not isinstance(force, bool): |
|
82 | raise TypeError(f"'force' wrong type: {type(force)} expected bool") |
||
83 | except (TypeError, KeyError): |
||
84 | raise HTTPException(400, detail=f"Invalid payload: {content}") |
||
85 | |||
86 | 1 | try: |
|
87 | 1 | evcs = ( |
|
88 | await api.get_evcs() |
||
89 | if len(evc_ids) != 1 |
||
90 | else await api.get_evc(evc_ids[0]) |
||
91 | ) |
||
92 | except RetryError as exc: |
||
93 | exc_error = str(exc.last_attempt.exception()) |
||
94 | log.error(exc_error) |
||
95 | raise HTTPException(503, detail=exc_error) |
||
96 | |||
97 | 1 | if evc_ids: |
|
98 | 1 | evcs = {evc_id: evcs.get(evc_id, {}) for evc_id in evc_ids} |
|
99 | else: |
||
100 | evcs = {k: v for k, v in evcs.items() if not utils.has_int_enabled(v)} |
||
101 | if not evcs: |
||
102 | # There's no non-INT EVCs to get enabled. |
||
103 | return JSONResponse(list(evcs.keys())) |
||
104 | |||
105 | 1 | try: |
|
106 | 1 | await self.int_manager.enable_int(evcs, force) |
|
107 | except (EVCNotFound, FlowsNotFound, ProxyPortNotFound) as exc: |
||
108 | raise HTTPException(404, detail=str(exc)) |
||
109 | except (EVCHasINT, ProxyPortStatusNotUP, ProxyPortSameSourceIntraEVC) as exc: |
||
110 | raise HTTPException(409, detail=str(exc)) |
||
111 | except RetryError as exc: |
||
112 | exc_error = str(exc.last_attempt.exception()) |
||
113 | log.error(exc_error) |
||
114 | raise HTTPException(503, detail=exc_error) |
||
115 | except UnrecoverableError as exc: |
||
116 | exc_error = str(exc) |
||
117 | log.error(exc_error) |
||
118 | raise HTTPException(500, detail=exc_error) |
||
119 | |||
120 | 1 | return JSONResponse(list(evcs.keys()), status_code=201) |
|
121 | |||
122 | 1 | View Code Duplication | @rest("v1/evc/disable", methods=["POST"]) |
123 | 1 | async def disable_telemetry(self, request: Request) -> JSONResponse: |
|
124 | """REST to disable/remove INT flows for an EVC_ID |
||
125 | |||
126 | If a list of evc_ids is empty, it'll disable on all INT EVCs. |
||
127 | """ |
||
128 | 1 | await avalidate_openapi_request(self.spec, request) |
|
129 | |||
130 | 1 | try: |
|
131 | 1 | content = await aget_json_or_400(request) |
|
132 | 1 | evc_ids = content["evc_ids"] |
|
133 | 1 | force = content.get("force", False) |
|
134 | 1 | if not isinstance(force, bool): |
|
135 | raise TypeError(f"'force' wrong type: {type(force)} expected bool") |
||
136 | except (TypeError, KeyError): |
||
137 | raise HTTPException(400, detail=f"Invalid payload: {content}") |
||
138 | |||
139 | 1 | try: |
|
140 | 1 | evcs = ( |
|
141 | await api.get_evcs() |
||
142 | if len(evc_ids) != 1 |
||
143 | else await api.get_evc(evc_ids[0]) |
||
144 | ) |
||
145 | except RetryError as exc: |
||
146 | exc_error = str(exc.last_attempt.exception()) |
||
147 | log.error(exc_error) |
||
148 | raise HTTPException(503, detail=exc_error) |
||
149 | |||
150 | 1 | if evc_ids: |
|
151 | 1 | evcs = {evc_id: evcs.get(evc_id, {}) for evc_id in evc_ids} |
|
152 | else: |
||
153 | evcs = {k: v for k, v in evcs.items() if utils.has_int_enabled(v)} |
||
154 | if not evcs: |
||
155 | # There's no INT EVCs to get disabled. |
||
156 | return JSONResponse(list(evcs.keys())) |
||
157 | |||
158 | 1 | try: |
|
159 | 1 | await self.int_manager.disable_int(evcs, force) |
|
160 | except EVCNotFound as exc: |
||
161 | raise HTTPException(404, detail=str(exc)) |
||
162 | except EVCHasNoINT as exc: |
||
163 | raise HTTPException(409, detail=str(exc)) |
||
164 | except RetryError as exc: |
||
165 | exc_error = str(exc.last_attempt.exception()) |
||
166 | log.error(exc_error) |
||
167 | raise HTTPException(503, detail=exc_error) |
||
168 | except UnrecoverableError as exc: |
||
169 | exc_error = str(exc) |
||
170 | log.error(exc_error) |
||
171 | raise HTTPException(500, detail=exc_error) |
||
172 | |||
173 | 1 | return JSONResponse(list(evcs.keys())) |
|
174 | |||
175 | 1 | @rest("v1/evc") |
|
176 | 1 | async def get_evcs(self, _request: Request) -> JSONResponse: |
|
177 | """REST to return the list of EVCs with INT enabled""" |
||
178 | 1 | try: |
|
179 | 1 | evcs = await api.get_evcs(**{"metadata.telemetry.enabled": "true"}) |
|
180 | 1 | return JSONResponse(evcs) |
|
181 | except RetryError as exc: |
||
182 | exc_error = str(exc.last_attempt.exception()) |
||
183 | log.error(exc_error) |
||
184 | raise HTTPException(503, detail=exc_error) |
||
185 | except UnrecoverableError as exc: |
||
186 | exc_error = str(exc) |
||
187 | log.error(exc_error) |
||
188 | raise HTTPException(500, detail=exc_error) |
||
189 | |||
190 | 1 | @rest("v1/sync") |
|
191 | 1 | def sync_flows(self, _request: Request) -> JSONResponse: |
|
192 | """Endpoint to force the telemetry napp to search for INT flows and delete them |
||
193 | accordingly to the evc metadata.""" |
||
194 | |||
195 | # TODO |
||
196 | # for evc_id in get_evcs_ids(): |
||
197 | return JSONResponse("TBD") |
||
198 | |||
199 | 1 | @rest("v1/evc/update") |
|
200 | 1 | def update_evc(self, _request: Request) -> JSONResponse: |
|
201 | """If an EVC changed from unidirectional to bidirectional telemetry, |
||
202 | make the change.""" |
||
203 | return JSONResponse({}) |
||
204 | |||
205 | 1 | @alisten_to("kytos/of_multi_table.enable_table") |
|
206 | 1 | async def on_table_enabled(self, event): |
|
207 | """Handle of_multi_table.enable_table.""" |
||
208 | 1 | table_group = event.content.get("telemetry_int", {}) |
|
209 | 1 | if not table_group: |
|
210 | return |
||
211 | 1 | for group in table_group: |
|
212 | 1 | if group not in settings.TABLE_GROUP_ALLOWED: |
|
213 | 1 | log.error( |
|
214 | f'The table group "{group}" is not allowed for ' |
||
215 | f"telemetry_int. Allowed table groups are " |
||
216 | f"{settings.TABLE_GROUP_ALLOWED}" |
||
217 | ) |
||
218 | 1 | return |
|
219 | 1 | self.int_manager.flow_builder.table_group.update(table_group) |
|
220 | 1 | content = {"group_table": self.int_manager.flow_builder.table_group} |
|
221 | 1 | event_out = KytosEvent(name="kytos/telemetry_int.enable_table", content=content) |
|
222 | 1 | await self.controller.buffers.app.aput(event_out) |
|
223 | |||
224 | 1 | @alisten_to("kytos/mef_eline.deleted") |
|
225 | 1 | async def on_evc_deleted(self, event: KytosEvent) -> None: |
|
226 | """On EVC deleted.""" |
||
227 | content = event.content |
||
228 | if ( |
||
229 | "metadata" in content |
||
230 | and "telemetry" in content["metadata"] |
||
231 | and content["metadata"]["telemetry"]["enabled"] |
||
232 | ): |
||
233 | evc_id = content["evc_id"] |
||
234 | log.info( |
||
235 | f"EVC({evc_id}, {content.get('name', '')}) got deleted, " |
||
236 | "INT flows will be removed too" |
||
237 | ) |
||
238 | stored_flows = await api.get_stored_flows( |
||
239 | [utils.get_cookie(evc_id, settings.INT_COOKIE_PREFIX)] |
||
240 | ) |
||
241 | await self.int_manager.remove_int_flows(stored_flows) |
||
242 | |||
243 | 1 | @alisten_to("kytos/flow_manager.flow.error") |
|
244 | 1 | async def on_flow_mod_error(self, event: KytosEvent): |
|
245 | """On flow mod errors. |
||
246 | |||
247 | Only OFPT_ERRORs will be handled, telemetry_int already uses force: true |
||
248 | """ |
||
249 | 1 | flow = event.content["flow"] |
|
250 | 1 | if any( |
|
251 | ( |
||
252 | event.content.get("error_exception"), |
||
253 | event.content.get("error_command") != "add", |
||
254 | flow.cookie >> 56 != settings.INT_COOKIE_PREFIX, |
||
255 | ) |
||
256 | ): |
||
257 | return |
||
258 | |||
259 | 1 | async with self._ofpt_error_lock: |
|
260 | 1 | evc_id = utils.get_id_from_cookie(flow.cookie) |
|
261 | 1 | evc = await api.get_evc(evc_id, exclude_archived=False) |
|
262 | 1 | if ( |
|
263 | not evc |
||
264 | or "telemetry" not in evc[evc_id]["metadata"] |
||
265 | or "enabled" not in evc[evc_id]["metadata"]["telemetry"] |
||
266 | or not evc[evc_id]["metadata"]["telemetry"]["enabled"] |
||
267 | ): |
||
268 | return |
||
269 | |||
270 | 1 | metadata = { |
|
271 | "telemetry": { |
||
272 | "enabled": False, |
||
273 | "status": "DOWN", |
||
274 | "status_reason": ["ofpt_error"], |
||
275 | "status_updated_at": datetime.utcnow().strftime( |
||
276 | "%Y-%m-%dT%H:%M:%S" |
||
277 | ), |
||
278 | } |
||
279 | } |
||
280 | 1 | log.error( |
|
281 | f"Disabling EVC({evc_id}) due to OFPT_ERROR, " |
||
282 | f"error_type: {event.content.get('error_type')}, " |
||
283 | f"error_code: {event.content.get('error_code')}, " |
||
284 | f"flow: {flow.as_dict()} " |
||
285 | ) |
||
286 | |||
287 | 1 | evcs = {evc_id: {evc_id: evc_id}} |
|
288 | 1 | stored_flows = await api.get_stored_flows( |
|
289 | [ |
||
290 | utils.get_cookie(evc_id, settings.INT_COOKIE_PREFIX) |
||
291 | for evc_id in evcs |
||
292 | ] |
||
293 | ) |
||
294 | 1 | await asyncio.gather( |
|
295 | self.int_manager.remove_int_flows(stored_flows), |
||
296 | api.add_evcs_metadata(evcs, metadata, force=True), |
||
297 | ) |
||
298 | |||
299 | # Event-driven methods: future |
||
300 | 1 | def listen_for_new_evcs(self): |
|
301 | """Change newly created EVC to INT-enabled EVC based on the metadata field |
||
302 | (future)""" |
||
303 | pass |
||
304 | |||
305 | 1 | def listen_for_evc_change(self): |
|
306 | """Change newly created EVC to INT-enabled EVC based on the |
||
307 | metadata field (future)""" |
||
308 | pass |
||
309 | |||
310 | 1 | def listen_for_path_changes(self): |
|
311 | """Change EVC's new path to INT-enabled EVC based on the metadata field |
||
312 | when there is a path change. (future)""" |
||
313 | pass |
||
314 | |||
315 | 1 | def listen_for_evcs_removed(self): |
|
316 | """Remove all INT flows belonging the just removed EVC (future)""" |
||
317 | pass |
||
318 | |||
319 | 1 | def listen_for_topology_changes(self): |
|
320 | """If the topology changes, make sure it is not the loop ports. |
||
321 | If so, update proxy ports""" |
||
322 | # TODO: |
||
323 | # self.proxy_ports = create_proxy_ports(self.proxy_ports) |
||
324 | pass |
||
325 |