| Total Complexity | 87 |
| Total Lines | 520 |
| Duplicated Lines | 9.81 % |
| Coverage | 64.4% |
| Changes | 0 | ||
Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.
Common duplication problems, and corresponding solutions are:
Complex classes like build.main often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
| 1 | """Main module of kytos/telemetry Network Application. |
||
| 2 | |||
| 3 | Napp to deploy In-band Network Telemetry over Ethernet Virtual Circuits |
||
| 4 | |||
| 5 | """ |
||
| 6 | |||
| 7 | 1 | import asyncio |
|
| 8 | 1 | import pathlib |
|
| 9 | 1 | from datetime import datetime |
|
| 10 | |||
| 11 | 1 | import napps.kytos.telemetry_int.kytos_api_helper as api |
|
| 12 | 1 | from napps.kytos.telemetry_int import settings, utils |
|
| 13 | 1 | from tenacity import RetryError |
|
| 14 | |||
| 15 | 1 | from kytos.core import KytosEvent, KytosNApp, log, rest |
|
| 16 | 1 | from kytos.core.helpers import alisten_to, avalidate_openapi_request, load_spec |
|
| 17 | 1 | from kytos.core.rest_api import HTTPException, JSONResponse, Request, aget_json_or_400 |
|
| 18 | |||
| 19 | 1 | from .exceptions import ( |
|
| 20 | EVCError, |
||
| 21 | EVCHasINT, |
||
| 22 | EVCHasNoINT, |
||
| 23 | EVCNotFound, |
||
| 24 | FlowsNotFound, |
||
| 25 | ProxyPortNotFound, |
||
| 26 | ProxyPortSameSourceIntraEVC, |
||
| 27 | ProxyPortStatusNotUP, |
||
| 28 | UnrecoverableError, |
||
| 29 | ) |
||
| 30 | 1 | from .managers.int import INTManager |
|
| 31 | |||
| 32 | # pylint: disable=fixme |
||
| 33 | |||
| 34 | |||
| 35 | 1 | class Main(KytosNApp): |
|
| 36 | """Main class of kytos/telemetry NApp. |
||
| 37 | |||
| 38 | This class is the entry point for this NApp. |
||
| 39 | """ |
||
| 40 | |||
| 41 | 1 | spec = load_spec(pathlib.Path(__file__).parent / "openapi.yml") |
|
| 42 | |||
| 43 | 1 | def setup(self): |
|
| 44 | """Replace the '__init__' method for the KytosNApp subclass. |
||
| 45 | |||
| 46 | The setup method is automatically called by the controller when your |
||
| 47 | application is loaded. |
||
| 48 | |||
| 49 | So, if you have any setup routine, insert it here. |
||
| 50 | """ |
||
| 51 | |||
| 52 | 1 | self.int_manager = INTManager(self.controller) |
|
| 53 | 1 | self._ofpt_error_lock = asyncio.Lock() |
|
| 54 | |||
| 55 | 1 | def execute(self): |
|
| 56 | """Run after the setup method execution. |
||
| 57 | |||
| 58 | You can also use this method in loop mode if you add to the above setup |
||
| 59 | method a line like the following example: |
||
| 60 | |||
| 61 | self.execute_as_loop(30) # 30-second interval. |
||
| 62 | """ |
||
| 63 | |||
| 64 | 1 | def shutdown(self): |
|
| 65 | """Run when your NApp is unloaded. |
||
| 66 | |||
| 67 | If you have some cleanup procedure, insert it here. |
||
| 68 | """ |
||
| 69 | |||
| 70 | 1 | @rest("v1/evc/enable", methods=["POST"]) |
|
| 71 | 1 | async def enable_telemetry(self, request: Request) -> JSONResponse: |
|
| 72 | """REST to enable INT flows on EVCs. |
||
| 73 | |||
| 74 | If a list of evc_ids is empty, it'll enable on non-INT EVCs. |
||
| 75 | """ |
||
| 76 | 1 | await avalidate_openapi_request(self.spec, request) |
|
| 77 | |||
| 78 | 1 | try: |
|
| 79 | 1 | content = await aget_json_or_400(request) |
|
| 80 | 1 | evc_ids = content["evc_ids"] |
|
| 81 | 1 | force = content.get("force", False) |
|
| 82 | 1 | if not isinstance(force, bool): |
|
| 83 | raise TypeError(f"'force' wrong type: {type(force)} expected bool") |
||
| 84 | except (TypeError, KeyError): |
||
| 85 | raise HTTPException(400, detail=f"Invalid payload: {content}") |
||
| 86 | |||
| 87 | 1 | try: |
|
| 88 | 1 | evcs = ( |
|
| 89 | await api.get_evcs() |
||
| 90 | if len(evc_ids) != 1 |
||
| 91 | else await api.get_evc(evc_ids[0]) |
||
| 92 | ) |
||
| 93 | except RetryError as exc: |
||
| 94 | exc_error = str(exc.last_attempt.exception()) |
||
| 95 | log.error(exc_error) |
||
| 96 | raise HTTPException(503, detail=exc_error) |
||
| 97 | |||
| 98 | 1 | if evc_ids: |
|
| 99 | 1 | evcs = {evc_id: evcs.get(evc_id, {}) for evc_id in evc_ids} |
|
| 100 | else: |
||
| 101 | evcs = {k: v for k, v in evcs.items() if not utils.has_int_enabled(v)} |
||
| 102 | if not evcs: |
||
| 103 | # There's no non-INT EVCs to get enabled. |
||
| 104 | return JSONResponse(list(evcs.keys())) |
||
| 105 | |||
| 106 | 1 | try: |
|
| 107 | # First, it tries to get and remove the existing INT flows like mef_eline |
||
| 108 | 1 | stored_flows = await api.get_stored_flows( |
|
| 109 | [ |
||
| 110 | utils.get_cookie(evc_id, settings.INT_COOKIE_PREFIX) |
||
| 111 | for evc_id in evcs |
||
| 112 | ] |
||
| 113 | ) |
||
| 114 | 1 | await self.int_manager._remove_int_flows(stored_flows) |
|
| 115 | 1 | await self.int_manager.enable_int(evcs, force) |
|
| 116 | except (EVCNotFound, FlowsNotFound, ProxyPortNotFound) as exc: |
||
| 117 | raise HTTPException(404, detail=str(exc)) |
||
| 118 | except (EVCHasINT, ProxyPortStatusNotUP, ProxyPortSameSourceIntraEVC) as exc: |
||
| 119 | raise HTTPException(409, detail=str(exc)) |
||
| 120 | except RetryError as exc: |
||
| 121 | exc_error = str(exc.last_attempt.exception()) |
||
| 122 | log.error(exc_error) |
||
| 123 | raise HTTPException(503, detail=exc_error) |
||
| 124 | except UnrecoverableError as exc: |
||
| 125 | exc_error = str(exc) |
||
| 126 | log.error(exc_error) |
||
| 127 | raise HTTPException(500, detail=exc_error) |
||
| 128 | |||
| 129 | 1 | return JSONResponse(list(evcs.keys()), status_code=201) |
|
| 130 | |||
| 131 | 1 | @rest("v1/evc/disable", methods=["POST"]) |
|
| 132 | 1 | async def disable_telemetry(self, request: Request) -> JSONResponse: |
|
| 133 | """REST to disable/remove INT flows for an EVC_ID |
||
| 134 | |||
| 135 | If a list of evc_ids is empty, it'll disable on all INT EVCs. |
||
| 136 | """ |
||
| 137 | 1 | await avalidate_openapi_request(self.spec, request) |
|
| 138 | |||
| 139 | 1 | try: |
|
| 140 | 1 | content = await aget_json_or_400(request) |
|
| 141 | 1 | evc_ids = content["evc_ids"] |
|
| 142 | 1 | force = content.get("force", False) |
|
| 143 | 1 | if not isinstance(force, bool): |
|
| 144 | raise TypeError(f"'force' wrong type: {type(force)} expected bool") |
||
| 145 | except (TypeError, KeyError): |
||
| 146 | raise HTTPException(400, detail=f"Invalid payload: {content}") |
||
| 147 | |||
| 148 | 1 | try: |
|
| 149 | 1 | evcs = ( |
|
| 150 | await api.get_evcs() |
||
| 151 | if len(evc_ids) != 1 |
||
| 152 | else await api.get_evc(evc_ids[0]) |
||
| 153 | ) |
||
| 154 | except RetryError as exc: |
||
| 155 | exc_error = str(exc.last_attempt.exception()) |
||
| 156 | log.error(exc_error) |
||
| 157 | raise HTTPException(503, detail=exc_error) |
||
| 158 | |||
| 159 | 1 | if evc_ids: |
|
| 160 | 1 | evcs = {evc_id: evcs.get(evc_id, {}) for evc_id in evc_ids} |
|
| 161 | else: |
||
| 162 | evcs = {k: v for k, v in evcs.items() if utils.has_int_enabled(v)} |
||
| 163 | if not evcs: |
||
| 164 | # There's no INT EVCs to get disabled. |
||
| 165 | return JSONResponse(list(evcs.keys())) |
||
| 166 | |||
| 167 | 1 | try: |
|
| 168 | 1 | await self.int_manager.disable_int(evcs, force) |
|
| 169 | except EVCNotFound as exc: |
||
| 170 | raise HTTPException(404, detail=str(exc)) |
||
| 171 | except EVCHasNoINT as exc: |
||
| 172 | raise HTTPException(409, detail=str(exc)) |
||
| 173 | except RetryError as exc: |
||
| 174 | exc_error = str(exc.last_attempt.exception()) |
||
| 175 | log.error(exc_error) |
||
| 176 | raise HTTPException(503, detail=exc_error) |
||
| 177 | except UnrecoverableError as exc: |
||
| 178 | exc_error = str(exc) |
||
| 179 | log.error(exc_error) |
||
| 180 | raise HTTPException(500, detail=exc_error) |
||
| 181 | |||
| 182 | 1 | return JSONResponse(list(evcs.keys())) |
|
| 183 | |||
| 184 | 1 | @rest("v1/evc") |
|
| 185 | 1 | async def get_evcs(self, _request: Request) -> JSONResponse: |
|
| 186 | """REST to return the list of EVCs with INT enabled""" |
||
| 187 | 1 | try: |
|
| 188 | 1 | evcs = await api.get_evcs(**{"metadata.telemetry.enabled": "true"}) |
|
| 189 | 1 | return JSONResponse(evcs) |
|
| 190 | except RetryError as exc: |
||
| 191 | exc_error = str(exc.last_attempt.exception()) |
||
| 192 | log.error(exc_error) |
||
| 193 | raise HTTPException(503, detail=exc_error) |
||
| 194 | except UnrecoverableError as exc: |
||
| 195 | exc_error = str(exc) |
||
| 196 | log.error(exc_error) |
||
| 197 | raise HTTPException(500, detail=exc_error) |
||
| 198 | |||
| 199 | 1 | @rest("v1/evc/redeploy", methods=["PATCH"]) |
|
| 200 | 1 | async def redeploy_telemetry(self, request: Request) -> JSONResponse: |
|
| 201 | """REST to redeploy INT on EVCs. |
||
| 202 | |||
| 203 | If a list of evc_ids is empty, it'll redeploy on all INT EVCs. |
||
| 204 | """ |
||
| 205 | 1 | await avalidate_openapi_request(self.spec, request) |
|
| 206 | |||
| 207 | 1 | try: |
|
| 208 | 1 | content = await aget_json_or_400(request) |
|
| 209 | 1 | evc_ids = content["evc_ids"] |
|
| 210 | except (TypeError, KeyError): |
||
| 211 | raise HTTPException(400, detail=f"Invalid payload: {content}") |
||
| 212 | |||
| 213 | 1 | try: |
|
| 214 | 1 | evcs = ( |
|
| 215 | await api.get_evcs() |
||
| 216 | if len(evc_ids) != 1 |
||
| 217 | else await api.get_evc(evc_ids[0]) |
||
| 218 | ) |
||
| 219 | except RetryError as exc: |
||
| 220 | exc_error = str(exc.last_attempt.exception()) |
||
| 221 | log.error(exc_error) |
||
| 222 | raise HTTPException(503, detail=exc_error) |
||
| 223 | |||
| 224 | 1 | if evc_ids: |
|
| 225 | 1 | evcs = {evc_id: evcs.get(evc_id, {}) for evc_id in evc_ids} |
|
| 226 | else: |
||
| 227 | evcs = {k: v for k, v in evcs.items() if utils.has_int_enabled(v)} |
||
| 228 | if not evcs: |
||
| 229 | raise HTTPException(404, detail="There aren't INT EVCs to redeploy") |
||
| 230 | |||
| 231 | 1 | try: |
|
| 232 | 1 | await self.int_manager.redeploy_int(evcs) |
|
| 233 | except (EVCNotFound, FlowsNotFound, ProxyPortNotFound) as exc: |
||
| 234 | raise HTTPException(404, detail=str(exc)) |
||
| 235 | except ProxyPortSameSourceIntraEVC as exc: |
||
| 236 | raise HTTPException(409, detail=str(exc)) |
||
| 237 | except RetryError as exc: |
||
| 238 | exc_error = str(exc.last_attempt.exception()) |
||
| 239 | log.error(exc_error) |
||
| 240 | raise HTTPException(503, detail=exc_error) |
||
| 241 | except UnrecoverableError as exc: |
||
| 242 | exc_error = str(exc) |
||
| 243 | log.error(exc_error) |
||
| 244 | raise HTTPException(500, detail=exc_error) |
||
| 245 | |||
| 246 | 1 | return JSONResponse(list(evcs.keys()), status_code=201) |
|
| 247 | |||
| 248 | 1 | @rest("v1/evc/compare") |
|
| 249 | 1 | async def evc_compare(self, _request: Request) -> JSONResponse: |
|
| 250 | """List and compare which INT EVCs have flows installed comparing with |
||
| 251 | mef_eline flows and telemetry metadata. You should use this endpoint |
||
| 252 | to confirm if both the telemetry metadata is still coherent and also |
||
| 253 | the minimum expected number of flows. A list of EVCs will get returned |
||
| 254 | with the inconsistent INT EVCs. If you encounter any inconsistent |
||
| 255 | EVC you need to analyze the situation and then decide if you'd |
||
| 256 | like to force enable or disable INT. |
||
| 257 | """ |
||
| 258 | |||
| 259 | 1 | try: |
|
| 260 | 1 | int_flows, mef_flows, evcs = await asyncio.gather( |
|
| 261 | api.get_stored_flows( |
||
| 262 | [ |
||
| 263 | ( |
||
| 264 | settings.INT_COOKIE_PREFIX << 56, |
||
| 265 | settings.INT_COOKIE_PREFIX << 56 | 0xFFFFFFFFFFFFFF, |
||
| 266 | ), |
||
| 267 | ] |
||
| 268 | ), |
||
| 269 | api.get_stored_flows( |
||
| 270 | [ |
||
| 271 | ( |
||
| 272 | settings.MEF_COOKIE_PREFIX << 56, |
||
| 273 | settings.MEF_COOKIE_PREFIX << 56 | 0xFFFFFFFFFFFFFF, |
||
| 274 | ), |
||
| 275 | ] |
||
| 276 | ), |
||
| 277 | api.get_evcs(), |
||
| 278 | ) |
||
| 279 | except RetryError as exc: |
||
| 280 | exc_error = str(exc.last_attempt.exception()) |
||
| 281 | log.error(exc_error) |
||
| 282 | raise HTTPException(503, detail=exc_error) |
||
| 283 | except UnrecoverableError as exc: |
||
| 284 | exc_error = str(exc) |
||
| 285 | log.error(exc_error) |
||
| 286 | raise HTTPException(500, detail=exc_error) |
||
| 287 | |||
| 288 | 1 | response = [ |
|
| 289 | {"id": k, "name": evcs[k]["name"], "compare_reason": v} |
||
| 290 | for k, v in self.int_manager.evc_compare(int_flows, mef_flows, evcs).items() |
||
| 291 | ] |
||
| 292 | 1 | return JSONResponse(response) |
|
| 293 | |||
| 294 | 1 | @alisten_to("kytos/mef_eline.evcs_loaded") |
|
| 295 | 1 | async def on_mef_eline_evcs_loaded(self, event: KytosEvent) -> None: |
|
| 296 | """Handle kytos/mef_eline.evcs_loaded.""" |
||
| 297 | 1 | self.int_manager.load_uni_src_proxy_ports(event.content) |
|
| 298 | |||
| 299 | 1 | @alisten_to("kytos/of_multi_table.enable_table") |
|
| 300 | 1 | async def on_table_enabled(self, event): |
|
| 301 | """Handle of_multi_table.enable_table.""" |
||
| 302 | 1 | table_group = event.content.get("telemetry_int", {}) |
|
| 303 | 1 | if not table_group: |
|
| 304 | 1 | return |
|
| 305 | 1 | for group in table_group: |
|
| 306 | 1 | if group not in settings.TABLE_GROUP_ALLOWED: |
|
| 307 | 1 | log.error( |
|
| 308 | f'The table group "{group}" is not allowed for ' |
||
| 309 | f"telemetry_int. Allowed table groups are " |
||
| 310 | f"{settings.TABLE_GROUP_ALLOWED}" |
||
| 311 | ) |
||
| 312 | 1 | return |
|
| 313 | 1 | self.int_manager.flow_builder.table_group.update(table_group) |
|
| 314 | 1 | content = {"group_table": self.int_manager.flow_builder.table_group} |
|
| 315 | 1 | event_out = KytosEvent(name="kytos/telemetry_int.enable_table", content=content) |
|
| 316 | 1 | await self.controller.buffers.app.aput(event_out) |
|
| 317 | |||
| 318 | 1 | @alisten_to("kytos/mef_eline.deleted") |
|
| 319 | 1 | async def on_evc_deleted(self, event: KytosEvent) -> None: |
|
| 320 | """On EVC deleted.""" |
||
| 321 | 1 | content = event.content |
|
| 322 | 1 | if ( |
|
| 323 | "metadata" in content |
||
| 324 | and "telemetry" in content["metadata"] |
||
| 325 | and content["metadata"]["telemetry"]["enabled"] |
||
| 326 | ): |
||
| 327 | 1 | evc_id = content["evc_id"] |
|
| 328 | 1 | log.info(f"Handling mef_eline.deleted on EVC id: {evc_id}") |
|
| 329 | 1 | await self.int_manager.disable_int({evc_id: content}, force=True) |
|
| 330 | |||
| 331 | 1 | View Code Duplication | @alisten_to("kytos/mef_eline.undeployed") |
|
|
|||
| 332 | 1 | async def on_evc_undeployed(self, event: KytosEvent) -> None: |
|
| 333 | """On EVC undeployed.""" |
||
| 334 | 1 | content = event.content |
|
| 335 | 1 | if ( |
|
| 336 | not content["enabled"] |
||
| 337 | and "metadata" in content |
||
| 338 | and "telemetry" in content["metadata"] |
||
| 339 | and content["metadata"]["telemetry"]["enabled"] |
||
| 340 | ): |
||
| 341 | 1 | metadata = { |
|
| 342 | "telemetry": { |
||
| 343 | "enabled": True, |
||
| 344 | "status": "DOWN", |
||
| 345 | "status_reason": ["undeployed"], |
||
| 346 | "status_updated_at": datetime.utcnow().strftime( |
||
| 347 | "%Y-%m-%dT%H:%M:%S" |
||
| 348 | ), |
||
| 349 | } |
||
| 350 | } |
||
| 351 | 1 | evc_id = content["evc_id"] |
|
| 352 | 1 | evcs = {evc_id: content} |
|
| 353 | 1 | log.info(f"Handling mef_eline.undeployed on EVC id: {evc_id}") |
|
| 354 | 1 | await self.int_manager.remove_int_flows(evcs, metadata, force=True) |
|
| 355 | |||
| 356 | 1 | @alisten_to("kytos/mef_eline.(redeployed_link_down|redeployed_link_up)") |
|
| 357 | 1 | async def on_evc_redeployed_link(self, event: KytosEvent) -> None: |
|
| 358 | """On EVC redeployed_link_down|redeployed_link_up.""" |
||
| 359 | 1 | content = event.content |
|
| 360 | 1 | if ( |
|
| 361 | content["enabled"] |
||
| 362 | and "metadata" in content |
||
| 363 | and "telemetry" in content["metadata"] |
||
| 364 | and content["metadata"]["telemetry"]["enabled"] |
||
| 365 | ): |
||
| 366 | 1 | evc_id = content["evc_id"] |
|
| 367 | 1 | content["id"] = evc_id |
|
| 368 | 1 | evcs = {evc_id: content} |
|
| 369 | 1 | log.info(f"Handling {event.name}, EVC id: {evc_id}") |
|
| 370 | 1 | try: |
|
| 371 | 1 | await self.int_manager.redeploy_int(evcs) |
|
| 372 | except EVCError as exc: |
||
| 373 | log.error( |
||
| 374 | f"Failed to redeploy: {exc}. " |
||
| 375 | f"Analyze the error and you'll need to redeploy EVC {evc_id} later" |
||
| 376 | ) |
||
| 377 | |||
| 378 | 1 | View Code Duplication | @alisten_to("kytos/mef_eline.error_redeploy_link_down") |
| 379 | 1 | async def on_evc_error_redeployed_link_down(self, event: KytosEvent) -> None: |
|
| 380 | """On EVC error_redeploy_link_down, this is supposed to happen when |
||
| 381 | a path isn't when mef_eline handles a link down.""" |
||
| 382 | 1 | content = event.content |
|
| 383 | 1 | if ( |
|
| 384 | content["enabled"] |
||
| 385 | and "metadata" in content |
||
| 386 | and "telemetry" in content["metadata"] |
||
| 387 | and content["metadata"]["telemetry"]["enabled"] |
||
| 388 | ): |
||
| 389 | 1 | metadata = { |
|
| 390 | "telemetry": { |
||
| 391 | "enabled": True, |
||
| 392 | "status": "DOWN", |
||
| 393 | "status_reason": ["redeployed_link_down_no_path"], |
||
| 394 | "status_updated_at": datetime.utcnow().strftime( |
||
| 395 | "%Y-%m-%dT%H:%M:%S" |
||
| 396 | ), |
||
| 397 | } |
||
| 398 | } |
||
| 399 | 1 | evc_id = content["evc_id"] |
|
| 400 | 1 | evcs = {evc_id: content} |
|
| 401 | 1 | log.info( |
|
| 402 | f"Handling mef_eline.redeployed_link_down_no_path on EVC id: {evc_id}" |
||
| 403 | ) |
||
| 404 | 1 | await self.int_manager.remove_int_flows(evcs, metadata, force=True) |
|
| 405 | |||
| 406 | 1 | @alisten_to("kytos/topology.link_down") |
|
| 407 | 1 | async def on_link_down(self, event): |
|
| 408 | """Handle topology.link_down.""" |
||
| 409 | 1 | await self.int_manager.handle_pp_link_down(event.content["link"]) |
|
| 410 | |||
| 411 | 1 | @alisten_to("kytos/topology.link_up") |
|
| 412 | 1 | async def on_link_up(self, event): |
|
| 413 | """Handle topology.link_up.""" |
||
| 414 | 1 | await self.int_manager.handle_pp_link_up(event.content["link"]) |
|
| 415 | |||
| 416 | 1 | @alisten_to("kytos/mef_eline.uni_active_updated") |
|
| 417 | 1 | async def on_uni_active_updated(self, event: KytosEvent) -> None: |
|
| 418 | """On mef_eline UNI active updated.""" |
||
| 419 | 1 | content = event.content |
|
| 420 | 1 | if ( |
|
| 421 | "metadata" in content |
||
| 422 | and "telemetry" in content["metadata"] |
||
| 423 | and content["metadata"]["telemetry"]["enabled"] |
||
| 424 | ): |
||
| 425 | 1 | evc_id, active = content["evc_id"], content["active"] |
|
| 426 | 1 | log.info( |
|
| 427 | f"Handling mef_eline.uni_active_updated active {active} " |
||
| 428 | f"on EVC id: {evc_id}" |
||
| 429 | ) |
||
| 430 | |||
| 431 | 1 | metadata = { |
|
| 432 | "telemetry": { |
||
| 433 | "enabled": True, |
||
| 434 | "status": "UP" if active else "DOWN", |
||
| 435 | "status_reason": [] if active else ["uni_down"], |
||
| 436 | "status_updated_at": datetime.utcnow().strftime( |
||
| 437 | "%Y-%m-%dT%H:%M:%S" |
||
| 438 | ), |
||
| 439 | } |
||
| 440 | } |
||
| 441 | 1 | await api.add_evcs_metadata({evc_id: content}, metadata) |
|
| 442 | |||
| 443 | 1 | @alisten_to("kytos/flow_manager.flow.error") |
|
| 444 | 1 | async def on_flow_mod_error(self, event: KytosEvent): |
|
| 445 | """On flow mod errors. |
||
| 446 | |||
| 447 | Only OFPT_ERRORs will be handled, telemetry_int already uses force: true |
||
| 448 | """ |
||
| 449 | 1 | flow = event.content["flow"] |
|
| 450 | 1 | if any( |
|
| 451 | ( |
||
| 452 | event.content.get("error_exception"), |
||
| 453 | event.content.get("error_command") != "add", |
||
| 454 | flow.cookie >> 56 != settings.INT_COOKIE_PREFIX, |
||
| 455 | ) |
||
| 456 | ): |
||
| 457 | return |
||
| 458 | |||
| 459 | 1 | async with self._ofpt_error_lock: |
|
| 460 | 1 | evc_id = utils.get_id_from_cookie(flow.cookie) |
|
| 461 | 1 | evc = await api.get_evc(evc_id, exclude_archived=False) |
|
| 462 | 1 | if ( |
|
| 463 | not evc |
||
| 464 | or "telemetry" not in evc[evc_id]["metadata"] |
||
| 465 | or "enabled" not in evc[evc_id]["metadata"]["telemetry"] |
||
| 466 | or not evc[evc_id]["metadata"]["telemetry"]["enabled"] |
||
| 467 | ): |
||
| 468 | return |
||
| 469 | |||
| 470 | 1 | metadata = { |
|
| 471 | "telemetry": { |
||
| 472 | "enabled": False, |
||
| 473 | "status": "DOWN", |
||
| 474 | "status_reason": ["ofpt_error"], |
||
| 475 | "status_updated_at": datetime.utcnow().strftime( |
||
| 476 | "%Y-%m-%dT%H:%M:%S" |
||
| 477 | ), |
||
| 478 | } |
||
| 479 | } |
||
| 480 | 1 | log.error( |
|
| 481 | f"Disabling EVC({evc_id}) due to OFPT_ERROR, " |
||
| 482 | f"error_type: {event.content.get('error_type')}, " |
||
| 483 | f"error_code: {event.content.get('error_code')}, " |
||
| 484 | f"flow: {flow.as_dict()} " |
||
| 485 | ) |
||
| 486 | |||
| 487 | 1 | evcs = {evc_id: {evc_id: evc_id}} |
|
| 488 | 1 | await self.int_manager.remove_int_flows(evcs, metadata, force=True) |
|
| 489 | |||
| 490 | 1 | @alisten_to("kytos/topology.interfaces.metadata.removed") |
|
| 491 | 1 | async def on_intf_metadata_removed(self, event: KytosEvent) -> None: |
|
| 492 | """On interface metadata removed.""" |
||
| 493 | 1 | await self.int_manager.handle_pp_metadata_removed(event.content["interface"]) |
|
| 494 | |||
| 495 | 1 | @alisten_to("kytos/topology.interfaces.metadata.added") |
|
| 496 | 1 | async def on_intf_metadata_added(self, event: KytosEvent) -> None: |
|
| 497 | """On interface metadata added.""" |
||
| 498 | 1 | await self.int_manager.handle_pp_metadata_added(event.content["interface"]) |
|
| 499 | |||
| 500 | # Event-driven methods: future |
||
| 501 | 1 | def listen_for_new_evcs(self): |
|
| 502 | """Change newly created EVC to INT-enabled EVC based on the metadata field |
||
| 503 | (future)""" |
||
| 504 | pass |
||
| 505 | |||
| 506 | 1 | def listen_for_evc_change(self): |
|
| 507 | """Change newly created EVC to INT-enabled EVC based on the |
||
| 508 | metadata field (future)""" |
||
| 509 | pass |
||
| 510 | |||
| 511 | 1 | def listen_for_path_changes(self): |
|
| 512 | """Change EVC's new path to INT-enabled EVC based on the metadata field |
||
| 513 | when there is a path change. (future)""" |
||
| 514 | pass |
||
| 515 | |||
| 516 | 1 | def listen_for_topology_changes(self): |
|
| 517 | """If the topology changes, make sure it is not the loop ports. |
||
| 518 | If so, update proxy ports""" |
||
| 519 | pass |
||
| 520 |