| Total Complexity | 46 |
| Total Lines | 305 |
| Duplicated Lines | 34.1 % |
| Coverage | 53.46% |
| Changes | 0 | ||
Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.
Common duplication problems, and corresponding solutions are:
Complex classes like build.main often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
| 1 | """Main module of kytos/telemetry Network Application. |
||
| 2 | |||
| 3 | Napp to deploy In-band Network Telemetry over Ethernet Virtual Circuits |
||
| 4 | |||
| 5 | """ |
||
| 6 | |||
| 7 | 1 | import asyncio |
|
| 8 | 1 | import pathlib |
|
| 9 | 1 | from datetime import datetime |
|
| 10 | |||
| 11 | 1 | import napps.kytos.telemetry_int.kytos_api_helper as api |
|
| 12 | 1 | from napps.kytos.telemetry_int import settings, utils |
|
| 13 | 1 | from tenacity import RetryError |
|
| 14 | |||
| 15 | 1 | from kytos.core import KytosEvent, KytosNApp, log, rest |
|
| 16 | 1 | from kytos.core.helpers import alisten_to, avalidate_openapi_request, load_spec |
|
| 17 | 1 | from kytos.core.rest_api import HTTPException, JSONResponse, Request, aget_json_or_400 |
|
| 18 | |||
| 19 | 1 | from .exceptions import ( |
|
| 20 | EVCHasINT, |
||
| 21 | EVCHasNoINT, |
||
| 22 | EVCNotFound, |
||
| 23 | FlowsNotFound, |
||
| 24 | ProxyPortNotFound, |
||
| 25 | ProxyPortStatusNotUP, |
||
| 26 | UnrecoverableError, |
||
| 27 | ) |
||
| 28 | 1 | from .managers.int import INTManager |
|
| 29 | |||
| 30 | # pylint: disable=fixme |
||
| 31 | |||
| 32 | |||
| 33 | 1 | class Main(KytosNApp): |
|
| 34 | """Main class of kytos/telemetry NApp. |
||
| 35 | |||
| 36 | This class is the entry point for this NApp. |
||
| 37 | """ |
||
| 38 | |||
| 39 | 1 | spec = load_spec(pathlib.Path(__file__).parent / "openapi.yml") |
|
| 40 | |||
| 41 | 1 | def setup(self): |
|
| 42 | """Replace the '__init__' method for the KytosNApp subclass. |
||
| 43 | |||
| 44 | The setup method is automatically called by the controller when your |
||
| 45 | application is loaded. |
||
| 46 | |||
| 47 | So, if you have any setup routine, insert it here. |
||
| 48 | """ |
||
| 49 | |||
| 50 | 1 | self.int_manager = INTManager(self.controller) |
|
| 51 | 1 | self._ofpt_error_lock = asyncio.Lock() |
|
| 52 | |||
| 53 | 1 | def execute(self): |
|
| 54 | """Run after the setup method execution. |
||
| 55 | |||
| 56 | You can also use this method in loop mode if you add to the above setup |
||
| 57 | method a line like the following example: |
||
| 58 | |||
| 59 | self.execute_as_loop(30) # 30-second interval. |
||
| 60 | """ |
||
| 61 | |||
| 62 | 1 | def shutdown(self): |
|
| 63 | """Run when your NApp is unloaded. |
||
| 64 | |||
| 65 | If you have some cleanup procedure, insert it here. |
||
| 66 | """ |
||
| 67 | |||
| 68 | 1 | View Code Duplication | @rest("v1/evc/enable", methods=["POST"]) |
|
|
|||
| 69 | 1 | async def enable_telemetry(self, request: Request) -> JSONResponse: |
|
| 70 | """REST to enable INT flows on EVCs. |
||
| 71 | |||
| 72 | If a list of evc_ids is empty, it'll enable on non-INT EVCs. |
||
| 73 | """ |
||
| 74 | 1 | await avalidate_openapi_request(self.spec, request) |
|
| 75 | |||
| 76 | 1 | try: |
|
| 77 | 1 | content = await aget_json_or_400(request) |
|
| 78 | 1 | evc_ids = content["evc_ids"] |
|
| 79 | 1 | force = content.get("force", False) |
|
| 80 | 1 | if not isinstance(force, bool): |
|
| 81 | raise TypeError(f"'force' wrong type: {type(force)} expected bool") |
||
| 82 | except (TypeError, KeyError): |
||
| 83 | raise HTTPException(400, detail=f"Invalid payload: {content}") |
||
| 84 | |||
| 85 | 1 | try: |
|
| 86 | 1 | evcs = ( |
|
| 87 | await api.get_evcs() |
||
| 88 | if len(evc_ids) != 1 |
||
| 89 | else await api.get_evc(evc_ids[0]) |
||
| 90 | ) |
||
| 91 | except RetryError as exc: |
||
| 92 | exc_error = str(exc.last_attempt.exception()) |
||
| 93 | log.error(exc_error) |
||
| 94 | raise HTTPException(503, detail=exc_error) |
||
| 95 | |||
| 96 | 1 | if evc_ids: |
|
| 97 | 1 | evcs = {evc_id: evcs.get(evc_id, {}) for evc_id in evc_ids} |
|
| 98 | else: |
||
| 99 | evcs = {k: v for k, v in evcs.items() if not utils.has_int_enabled(v)} |
||
| 100 | if not evcs: |
||
| 101 | # There's no non-INT EVCs to get enabled. |
||
| 102 | return JSONResponse(list(evcs.keys())) |
||
| 103 | |||
| 104 | 1 | try: |
|
| 105 | 1 | await self.int_manager.enable_int(evcs, force) |
|
| 106 | except (EVCNotFound, FlowsNotFound, ProxyPortNotFound) as exc: |
||
| 107 | raise HTTPException(404, detail=str(exc)) |
||
| 108 | except (EVCHasINT, ProxyPortStatusNotUP) as exc: |
||
| 109 | raise HTTPException(409, detail=str(exc)) |
||
| 110 | except RetryError as exc: |
||
| 111 | exc_error = str(exc.last_attempt.exception()) |
||
| 112 | log.error(exc_error) |
||
| 113 | raise HTTPException(503, detail=exc_error) |
||
| 114 | except UnrecoverableError as exc: |
||
| 115 | exc_error = str(exc) |
||
| 116 | log.error(exc_error) |
||
| 117 | raise HTTPException(500, detail=exc_error) |
||
| 118 | |||
| 119 | 1 | return JSONResponse(list(evcs.keys()), status_code=201) |
|
| 120 | |||
| 121 | 1 | View Code Duplication | @rest("v1/evc/disable", methods=["POST"]) |
| 122 | 1 | async def disable_telemetry(self, request: Request) -> JSONResponse: |
|
| 123 | """REST to disable/remove INT flows for an EVC_ID |
||
| 124 | |||
| 125 | If a list of evc_ids is empty, it'll disable on all INT EVCs. |
||
| 126 | """ |
||
| 127 | 1 | await avalidate_openapi_request(self.spec, request) |
|
| 128 | |||
| 129 | 1 | try: |
|
| 130 | 1 | content = await aget_json_or_400(request) |
|
| 131 | 1 | evc_ids = content["evc_ids"] |
|
| 132 | 1 | force = content.get("force", False) |
|
| 133 | 1 | if not isinstance(force, bool): |
|
| 134 | raise TypeError(f"'force' wrong type: {type(force)} expected bool") |
||
| 135 | except (TypeError, KeyError): |
||
| 136 | raise HTTPException(400, detail=f"Invalid payload: {content}") |
||
| 137 | |||
| 138 | 1 | try: |
|
| 139 | 1 | evcs = ( |
|
| 140 | await api.get_evcs() |
||
| 141 | if len(evc_ids) != 1 |
||
| 142 | else await api.get_evc(evc_ids[0]) |
||
| 143 | ) |
||
| 144 | except RetryError as exc: |
||
| 145 | exc_error = str(exc.last_attempt.exception()) |
||
| 146 | log.error(exc_error) |
||
| 147 | raise HTTPException(503, detail=exc_error) |
||
| 148 | |||
| 149 | 1 | if evc_ids: |
|
| 150 | 1 | evcs = {evc_id: evcs.get(evc_id, {}) for evc_id in evc_ids} |
|
| 151 | else: |
||
| 152 | evcs = {k: v for k, v in evcs.items() if utils.has_int_enabled(v)} |
||
| 153 | if not evcs: |
||
| 154 | # There's no INT EVCs to get disabled. |
||
| 155 | return JSONResponse(list(evcs.keys())) |
||
| 156 | |||
| 157 | 1 | try: |
|
| 158 | 1 | await self.int_manager.disable_int(evcs, force) |
|
| 159 | except EVCNotFound as exc: |
||
| 160 | raise HTTPException(404, detail=str(exc)) |
||
| 161 | except EVCHasNoINT as exc: |
||
| 162 | raise HTTPException(409, detail=str(exc)) |
||
| 163 | except RetryError as exc: |
||
| 164 | exc_error = str(exc.last_attempt.exception()) |
||
| 165 | log.error(exc_error) |
||
| 166 | raise HTTPException(503, detail=exc_error) |
||
| 167 | except UnrecoverableError as exc: |
||
| 168 | exc_error = str(exc) |
||
| 169 | log.error(exc_error) |
||
| 170 | raise HTTPException(500, detail=exc_error) |
||
| 171 | |||
| 172 | 1 | return JSONResponse(list(evcs.keys())) |
|
| 173 | |||
| 174 | 1 | @rest("v1/evc") |
|
| 175 | 1 | async def get_evcs(self, _request: Request) -> JSONResponse: |
|
| 176 | """REST to return the list of EVCs with INT enabled""" |
||
| 177 | 1 | try: |
|
| 178 | 1 | evcs = await api.get_evcs(**{"metadata.telemetry.enabled": "true"}) |
|
| 179 | 1 | return JSONResponse(evcs) |
|
| 180 | except RetryError as exc: |
||
| 181 | exc_error = str(exc.last_attempt.exception()) |
||
| 182 | log.error(exc_error) |
||
| 183 | raise HTTPException(503, detail=exc_error) |
||
| 184 | except UnrecoverableError as exc: |
||
| 185 | exc_error = str(exc) |
||
| 186 | log.error(exc_error) |
||
| 187 | raise HTTPException(500, detail=exc_error) |
||
| 188 | |||
| 189 | 1 | @rest("v1/sync") |
|
| 190 | 1 | def sync_flows(self, _request: Request) -> JSONResponse: |
|
| 191 | """Endpoint to force the telemetry napp to search for INT flows and delete them |
||
| 192 | accordingly to the evc metadata.""" |
||
| 193 | |||
| 194 | # TODO |
||
| 195 | # for evc_id in get_evcs_ids(): |
||
| 196 | return JSONResponse("TBD") |
||
| 197 | |||
| 198 | 1 | @rest("v1/evc/update") |
|
| 199 | 1 | def update_evc(self, _request: Request) -> JSONResponse: |
|
| 200 | """If an EVC changed from unidirectional to bidirectional telemetry, |
||
| 201 | make the change.""" |
||
| 202 | return JSONResponse({}) |
||
| 203 | |||
| 204 | 1 | @alisten_to("kytos/mef_eline.deleted") |
|
| 205 | 1 | async def on_evc_deleted(self, event: KytosEvent) -> None: |
|
| 206 | """On EVC deleted.""" |
||
| 207 | content = event.content |
||
| 208 | if ( |
||
| 209 | "metadata" in content |
||
| 210 | and "telemetry" in content["metadata"] |
||
| 211 | and content["metadata"]["telemetry"]["enabled"] |
||
| 212 | ): |
||
| 213 | evc_id = content["evc_id"] |
||
| 214 | log.info( |
||
| 215 | f"EVC({evc_id}, {content.get('name', '')}) got deleted, " |
||
| 216 | "INT flows will be removed too" |
||
| 217 | ) |
||
| 218 | stored_flows = await api.get_stored_flows( |
||
| 219 | [utils.get_cookie(evc_id, settings.INT_COOKIE_PREFIX)] |
||
| 220 | ) |
||
| 221 | await self.int_manager.remove_int_flows(stored_flows) |
||
| 222 | |||
| 223 | 1 | @alisten_to("kytos/flow_manager.flow.error") |
|
| 224 | 1 | async def on_flow_mod_error(self, event: KytosEvent): |
|
| 225 | """On flow mod errors. |
||
| 226 | |||
| 227 | Only OFPT_ERRORs will be handled, telemetry_int already uses force: true |
||
| 228 | """ |
||
| 229 | 1 | flow = event.content["flow"] |
|
| 230 | 1 | if any( |
|
| 231 | ( |
||
| 232 | event.content.get("error_exception"), |
||
| 233 | event.content.get("error_command") != "add", |
||
| 234 | flow.cookie >> 56 != settings.INT_COOKIE_PREFIX, |
||
| 235 | ) |
||
| 236 | ): |
||
| 237 | return |
||
| 238 | |||
| 239 | 1 | async with self._ofpt_error_lock: |
|
| 240 | 1 | evc_id = utils.get_id_from_cookie(flow.cookie) |
|
| 241 | 1 | evc = await api.get_evc(evc_id, exclude_archived=False) |
|
| 242 | 1 | if ( |
|
| 243 | not evc |
||
| 244 | or "telemetry" not in evc[evc_id]["metadata"] |
||
| 245 | or "enabled" not in evc[evc_id]["metadata"]["telemetry"] |
||
| 246 | or not evc[evc_id]["metadata"]["telemetry"]["enabled"] |
||
| 247 | ): |
||
| 248 | return |
||
| 249 | |||
| 250 | 1 | metadata = { |
|
| 251 | "telemetry": { |
||
| 252 | "enabled": False, |
||
| 253 | "status": "DOWN", |
||
| 254 | "status_reason": ["ofpt_error"], |
||
| 255 | "status_updated_at": datetime.utcnow().strftime( |
||
| 256 | "%Y-%m-%dT%H:%M:%S" |
||
| 257 | ), |
||
| 258 | } |
||
| 259 | } |
||
| 260 | 1 | log.error( |
|
| 261 | f"Disabling EVC({evc_id}) due to OFPT_ERROR, " |
||
| 262 | f"error_type: {event.content.get('error_type')}, " |
||
| 263 | f"error_code: {event.content.get('error_code')}, " |
||
| 264 | f"flow: {flow.as_dict()} " |
||
| 265 | ) |
||
| 266 | |||
| 267 | 1 | evcs = {evc_id: {evc_id: evc_id}} |
|
| 268 | 1 | stored_flows = await api.get_stored_flows( |
|
| 269 | [ |
||
| 270 | utils.get_cookie(evc_id, settings.INT_COOKIE_PREFIX) |
||
| 271 | for evc_id in evcs |
||
| 272 | ] |
||
| 273 | ) |
||
| 274 | 1 | await asyncio.gather( |
|
| 275 | self.int_manager.remove_int_flows(stored_flows), |
||
| 276 | api.add_evcs_metadata(evcs, metadata, force=True), |
||
| 277 | ) |
||
| 278 | |||
| 279 | # Event-driven methods: future |
||
| 280 | 1 | def listen_for_new_evcs(self): |
|
| 281 | """Change newly created EVC to INT-enabled EVC based on the metadata field |
||
| 282 | (future)""" |
||
| 283 | pass |
||
| 284 | |||
| 285 | 1 | def listen_for_evc_change(self): |
|
| 286 | """Change newly created EVC to INT-enabled EVC based on the |
||
| 287 | metadata field (future)""" |
||
| 288 | pass |
||
| 289 | |||
| 290 | 1 | def listen_for_path_changes(self): |
|
| 291 | """Change EVC's new path to INT-enabled EVC based on the metadata field |
||
| 292 | when there is a path change. (future)""" |
||
| 293 | pass |
||
| 294 | |||
| 295 | 1 | def listen_for_evcs_removed(self): |
|
| 296 | """Remove all INT flows belonging the just removed EVC (future)""" |
||
| 297 | pass |
||
| 298 | |||
| 299 | 1 | def listen_for_topology_changes(self): |
|
| 300 | """If the topology changes, make sure it is not the loop ports. |
||
| 301 | If so, update proxy ports""" |
||
| 302 | # TODO: |
||
| 303 | # self.proxy_ports = create_proxy_ports(self.proxy_ports) |
||
| 304 | pass |
||
| 305 |