Total Complexity | 44 |
Total Lines | 289 |
Duplicated Lines | 32.18 % |
Coverage | 35.38% |
Changes | 0 |
Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.
Common duplication problems, and corresponding solutions are:
Complex classes like build.main often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
1 | """Main module of kytos/telemetry Network Application. |
||
2 | |||
3 | Napp to deploy In-band Network Telemetry over Ethernet Virtual Circuits |
||
4 | |||
5 | """ |
||
6 | |||
7 | 1 | import asyncio |
|
8 | 1 | from datetime import datetime |
|
9 | |||
10 | 1 | import napps.kytos.telemetry_int.kytos_api_helper as api |
|
11 | 1 | from napps.kytos.telemetry_int import settings, utils |
|
12 | 1 | from tenacity import RetryError |
|
13 | |||
14 | 1 | from kytos.core import KytosEvent, KytosNApp, log, rest |
|
15 | 1 | from kytos.core.helpers import alisten_to |
|
16 | 1 | from kytos.core.rest_api import HTTPException, JSONResponse, Request, aget_json_or_400 |
|
17 | |||
18 | 1 | from .exceptions import ( |
|
19 | EVCHasINT, |
||
20 | EVCHasNoINT, |
||
21 | EVCNotFound, |
||
22 | FlowsNotFound, |
||
23 | ProxyPortNotFound, |
||
24 | ProxyPortStatusNotUP, |
||
25 | UnrecoverableError, |
||
26 | ) |
||
27 | 1 | from .managers.int import INTManager |
|
28 | |||
29 | # pylint: disable=fixme |
||
30 | |||
31 | |||
32 | 1 | class Main(KytosNApp): |
|
33 | """Main class of kytos/telemetry NApp. |
||
34 | |||
35 | This class is the entry point for this NApp. |
||
36 | """ |
||
37 | |||
38 | 1 | def setup(self): |
|
39 | """Replace the '__init__' method for the KytosNApp subclass. |
||
40 | |||
41 | The setup method is automatically called by the controller when your |
||
42 | application is loaded. |
||
43 | |||
44 | So, if you have any setup routine, insert it here. |
||
45 | """ |
||
46 | |||
47 | 1 | self.int_manager = INTManager(self.controller) |
|
48 | 1 | self._ofpt_error_lock = asyncio.Lock() |
|
49 | |||
50 | 1 | def execute(self): |
|
51 | """Run after the setup method execution. |
||
52 | |||
53 | You can also use this method in loop mode if you add to the above setup |
||
54 | method a line like the following example: |
||
55 | |||
56 | self.execute_as_loop(30) # 30-second interval. |
||
57 | """ |
||
58 | |||
59 | 1 | def shutdown(self): |
|
60 | """Run when your NApp is unloaded. |
||
61 | |||
62 | If you have some cleanup procedure, insert it here. |
||
63 | """ |
||
64 | |||
65 | 1 | View Code Duplication | @rest("v1/evc/enable", methods=["POST"]) |
|
|||
66 | 1 | async def enable_telemetry(self, request: Request) -> JSONResponse: |
|
67 | """REST to enable INT flows on EVCs. |
||
68 | |||
69 | If a list of evc_ids is empty, it'll enable on non-INT EVCs. |
||
70 | """ |
||
71 | |||
72 | try: |
||
73 | content = await aget_json_or_400(request) |
||
74 | evc_ids = content["evc_ids"] |
||
75 | force = content.get("force", False) |
||
76 | if not isinstance(force, bool): |
||
77 | raise TypeError(f"'force' wrong type: {type(force)} expected bool") |
||
78 | except (TypeError, KeyError): |
||
79 | raise HTTPException(400, detail=f"Invalid payload: {content}") |
||
80 | |||
81 | try: |
||
82 | evcs = ( |
||
83 | await api.get_evcs() |
||
84 | if len(evc_ids) != 1 |
||
85 | else await api.get_evc(evc_ids[0]) |
||
86 | ) |
||
87 | except RetryError as exc: |
||
88 | exc_error = str(exc.last_attempt.exception()) |
||
89 | log.error(exc_error) |
||
90 | raise HTTPException(503, detail=exc_error) |
||
91 | |||
92 | if evc_ids: |
||
93 | evcs = {evc_id: evcs.get(evc_id, {}) for evc_id in evc_ids} |
||
94 | else: |
||
95 | evcs = {k: v for k, v in evcs.items() if not utils.has_int_enabled(v)} |
||
96 | if not evcs: |
||
97 | # There's no non-INT EVCs to get enabled. |
||
98 | return JSONResponse({}) |
||
99 | |||
100 | try: |
||
101 | await self.int_manager.enable_int(evcs, force) |
||
102 | except (EVCNotFound, FlowsNotFound, ProxyPortNotFound) as exc: |
||
103 | raise HTTPException(404, detail=str(exc)) |
||
104 | except (EVCHasINT, ProxyPortStatusNotUP) as exc: |
||
105 | raise HTTPException(400, detail=str(exc)) |
||
106 | except RetryError as exc: |
||
107 | exc_error = str(exc.last_attempt.exception()) |
||
108 | log.error(exc_error) |
||
109 | raise HTTPException(503, detail=exc_error) |
||
110 | except UnrecoverableError as exc: |
||
111 | exc_error = str(exc) |
||
112 | log.error(exc_error) |
||
113 | raise HTTPException(500, detail=exc_error) |
||
114 | |||
115 | return JSONResponse({}, status_code=201) |
||
116 | |||
117 | 1 | View Code Duplication | @rest("v1/evc/disable", methods=["POST"]) |
118 | 1 | async def disable_telemetry(self, request: Request) -> JSONResponse: |
|
119 | """REST to disable/remove INT flows for an EVC_ID |
||
120 | |||
121 | If a list of evc_ids is empty, it'll disable on all INT EVCs. |
||
122 | """ |
||
123 | try: |
||
124 | content = await aget_json_or_400(request) |
||
125 | evc_ids = content["evc_ids"] |
||
126 | force = content.get("force", False) |
||
127 | if not isinstance(force, bool): |
||
128 | raise TypeError(f"'force' wrong type: {type(force)} expected bool") |
||
129 | except (TypeError, KeyError): |
||
130 | raise HTTPException(400, detail=f"Invalid payload: {content}") |
||
131 | |||
132 | try: |
||
133 | evcs = ( |
||
134 | await api.get_evcs() |
||
135 | if len(evc_ids) != 1 |
||
136 | else await api.get_evc(evc_ids[0]) |
||
137 | ) |
||
138 | except RetryError as exc: |
||
139 | exc_error = str(exc.last_attempt.exception()) |
||
140 | log.error(exc_error) |
||
141 | raise HTTPException(503, detail=exc_error) |
||
142 | |||
143 | if evc_ids: |
||
144 | evcs = {evc_id: evcs.get(evc_id, {}) for evc_id in evc_ids} |
||
145 | else: |
||
146 | evcs = {k: v for k, v in evcs.items() if utils.has_int_enabled(v)} |
||
147 | if not evcs: |
||
148 | # There's no INT EVCs to get disabled. |
||
149 | return JSONResponse({}) |
||
150 | |||
151 | try: |
||
152 | await self.int_manager.disable_int(evcs, force) |
||
153 | except EVCNotFound as exc: |
||
154 | raise HTTPException(404, detail=str(exc)) |
||
155 | except EVCHasNoINT as exc: |
||
156 | raise HTTPException(400, detail=str(exc)) |
||
157 | except RetryError as exc: |
||
158 | exc_error = str(exc.last_attempt.exception()) |
||
159 | log.error(exc_error) |
||
160 | raise HTTPException(503, detail=exc_error) |
||
161 | except UnrecoverableError as exc: |
||
162 | exc_error = str(exc) |
||
163 | log.error(exc_error) |
||
164 | raise HTTPException(500, detail=exc_error) |
||
165 | |||
166 | return JSONResponse({}) |
||
167 | |||
168 | 1 | @rest("v1/evc") |
|
169 | 1 | def get_evcs(self, _request: Request) -> JSONResponse: |
|
170 | """REST to return the list of EVCs with INT enabled""" |
||
171 | return JSONResponse(utils.get_evc_with_telemetry()) |
||
172 | |||
173 | 1 | @rest("v1/sync") |
|
174 | 1 | def sync_flows(self, _request: Request) -> JSONResponse: |
|
175 | """Endpoint to force the telemetry napp to search for INT flows and delete them |
||
176 | accordingly to the evc metadata.""" |
||
177 | |||
178 | # TODO |
||
179 | # for evc_id in get_evcs_ids(): |
||
180 | return JSONResponse("TBD") |
||
181 | |||
182 | 1 | @rest("v1/evc/update") |
|
183 | 1 | def update_evc(self, _request: Request) -> JSONResponse: |
|
184 | """If an EVC changed from unidirectional to bidirectional telemetry, |
||
185 | make the change.""" |
||
186 | return JSONResponse({}) |
||
187 | |||
188 | 1 | @alisten_to("kytos/mef_eline.deleted") |
|
189 | 1 | async def on_evc_deleted(self, event: KytosEvent) -> None: |
|
190 | """On EVC deleted.""" |
||
191 | content = event.content |
||
192 | if ( |
||
193 | "metadata" in content |
||
194 | and "telemetry" in content["metadata"] |
||
195 | and content["metadata"]["telemetry"]["enabled"] |
||
196 | ): |
||
197 | evc_id = content["evc_id"] |
||
198 | log.info( |
||
199 | f"EVC({evc_id}, {content.get('name', '')}) got deleted, " |
||
200 | "INT flows will be removed too" |
||
201 | ) |
||
202 | stored_flows = await api.get_stored_flows( |
||
203 | [utils.get_cookie(evc_id, settings.INT_COOKIE_PREFIX)] |
||
204 | ) |
||
205 | await self.int_manager.remove_int_flows(stored_flows) |
||
206 | |||
207 | 1 | @alisten_to("kytos/flow_manager.flow.error") |
|
208 | 1 | async def on_flow_mod_error(self, event: KytosEvent): |
|
209 | """On flow mod errors. |
||
210 | |||
211 | Only OFPT_ERRORs will be handled, telemetry_int already uses force: true |
||
212 | """ |
||
213 | 1 | flow = event.content["flow"] |
|
214 | 1 | if any( |
|
215 | ( |
||
216 | event.content.get("error_exception"), |
||
217 | event.content.get("error_command") != "add", |
||
218 | flow.cookie >> 56 != settings.INT_COOKIE_PREFIX, |
||
219 | ) |
||
220 | ): |
||
221 | return |
||
222 | |||
223 | 1 | async with self._ofpt_error_lock: |
|
224 | 1 | evc_id = utils.get_id_from_cookie(flow.cookie) |
|
225 | 1 | evc = await api.get_evc(evc_id, exclude_archived=False) |
|
226 | 1 | if ( |
|
227 | not evc |
||
228 | or "telemetry" not in evc[evc_id]["metadata"] |
||
229 | or "enabled" not in evc[evc_id]["metadata"]["telemetry"] |
||
230 | or not evc[evc_id]["metadata"]["telemetry"]["enabled"] |
||
231 | ): |
||
232 | return |
||
233 | |||
234 | 1 | metadata = { |
|
235 | "telemetry": { |
||
236 | "enabled": False, |
||
237 | "status": "DOWN", |
||
238 | "status_reason": ["ofpt_error"], |
||
239 | "status_updated_at": datetime.utcnow().strftime( |
||
240 | "%Y-%m-%dT%H:%M:%S" |
||
241 | ), |
||
242 | } |
||
243 | } |
||
244 | 1 | log.error( |
|
245 | f"Disabling EVC({evc_id}) due to OFPT_ERROR, " |
||
246 | f"error_type: {event.content.get('error_type')}, " |
||
247 | f"error_code: {event.content.get('error_code')}, " |
||
248 | f"flow: {flow.as_dict()} " |
||
249 | ) |
||
250 | |||
251 | 1 | evcs = {evc_id: {evc_id: evc_id}} |
|
252 | 1 | stored_flows = await api.get_stored_flows( |
|
253 | [ |
||
254 | utils.get_cookie(evc_id, settings.INT_COOKIE_PREFIX) |
||
255 | for evc_id in evcs |
||
256 | ] |
||
257 | ) |
||
258 | 1 | await asyncio.gather( |
|
259 | self.int_manager.remove_int_flows(stored_flows), |
||
260 | api.add_evcs_metadata(evcs, metadata, force=True), |
||
261 | ) |
||
262 | |||
263 | # Event-driven methods: future |
||
264 | 1 | def listen_for_new_evcs(self): |
|
265 | """Change newly created EVC to INT-enabled EVC based on the metadata field |
||
266 | (future)""" |
||
267 | pass |
||
268 | |||
269 | 1 | def listen_for_evc_change(self): |
|
270 | """Change newly created EVC to INT-enabled EVC based on the |
||
271 | metadata field (future)""" |
||
272 | pass |
||
273 | |||
274 | 1 | def listen_for_path_changes(self): |
|
275 | """Change EVC's new path to INT-enabled EVC based on the metadata field |
||
276 | when there is a path change. (future)""" |
||
277 | pass |
||
278 | |||
279 | 1 | def listen_for_evcs_removed(self): |
|
280 | """Remove all INT flows belonging the just removed EVC (future)""" |
||
281 | pass |
||
282 | |||
283 | 1 | def listen_for_topology_changes(self): |
|
284 | """If the topology changes, make sure it is not the loop ports. |
||
285 | If so, update proxy ports""" |
||
286 | # TODO: |
||
287 | # self.proxy_ports = create_proxy_ports(self.proxy_ports) |
||
288 | pass |
||
289 |