| Total Complexity | 47 |
| Total Lines | 457 |
| Duplicated Lines | 17.51 % |
| Changes | 0 | ||
Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.
Common duplication problems, and corresponding solutions are:
Complex classes like kytos.core.helpers often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
| 1 | """Utilities functions used in Kytos.""" |
||
| 2 | import functools |
||
| 3 | import logging |
||
| 4 | import traceback |
||
| 5 | from asyncio import AbstractEventLoop |
||
| 6 | from collections import defaultdict |
||
| 7 | from concurrent.futures import ThreadPoolExecutor |
||
| 8 | from datetime import datetime, timezone |
||
| 9 | from pathlib import Path |
||
| 10 | from threading import Thread |
||
| 11 | |||
| 12 | from openapi_core import OpenAPI |
||
| 13 | from openapi_core.contrib.starlette import StarletteOpenAPIRequest |
||
| 14 | from openapi_core.unmarshalling.request.datatypes import RequestUnmarshalResult |
||
| 15 | |||
| 16 | from kytos.core.apm import ElasticAPM |
||
| 17 | from kytos.core.config import KytosConfig |
||
| 18 | from kytos.core.rest_api import (HTTPException, Request, |
||
| 19 | content_type_json_or_415, get_body) |
||
| 20 | |||
| 21 | __all__ = ['listen_to', 'now', 'run_on_thread', 'get_time'] |
||
| 22 | |||
| 23 | LOG = logging.getLogger(__name__) |
||
| 24 | |||
| 25 | |||
| 26 | def get_thread_pool_max_workers(): |
||
| 27 | """Get the number of thread pool max workers.""" |
||
| 28 | return KytosConfig().options["daemon"].thread_pool_max_workers |
||
| 29 | |||
| 30 | |||
| 31 | def get_apm_name(): |
||
| 32 | """Get apm backend name.""" |
||
| 33 | return KytosConfig().options["daemon"].apm |
||
| 34 | |||
| 35 | |||
| 36 | # pylint: disable=invalid-name |
||
| 37 | executors = {name: ThreadPoolExecutor(max_workers=max_workers, |
||
| 38 | thread_name_prefix=f"thread_pool_{name}") |
||
| 39 | for name, max_workers in get_thread_pool_max_workers().items()} |
||
| 40 | |||
| 41 | ds_executors = defaultdict(lambda: ThreadPoolExecutor(max_workers=1, |
||
| 42 | thread_name_prefix="dynamic_single")) |
||
| 43 | |||
| 44 | |||
| 45 | def listen_to(event, *events, pool=None): |
||
| 46 | """Decorate Event Listener methods. |
||
| 47 | |||
| 48 | This decorator was built to be used on NAPPs methods to define which |
||
| 49 | type of event the method will handle. With this, we will be able to |
||
| 50 | 'schedule' the app/method to receive an event when a new event is |
||
| 51 | registered on the controller buffers. |
||
| 52 | By using the run_on_thread decorator, we also guarantee that the method |
||
| 53 | (handler) will be called from inside a new thread, avoiding this method to |
||
| 54 | block its caller. |
||
| 55 | |||
| 56 | The decorator will add an attribute to the method called 'events', that |
||
| 57 | will be a list of the events that the method will handle. |
||
| 58 | |||
| 59 | The event that will be listened to is always a string, but it can represent |
||
| 60 | a regular expression to match against multiple Event Types. All listened |
||
| 61 | events are documented in :doc:`/developer/listened_events` section. |
||
| 62 | |||
| 63 | The pool option gives you control on which ThreadPoolExecutor that will |
||
| 64 | execute the decorated handler. This knob is meant for prioritization, |
||
| 65 | allowing executions on different pools depending on the handler primary |
||
| 66 | responsibility and importance to avoid potential scheduling starvation. |
||
| 67 | |||
| 68 | Example of usage: |
||
| 69 | |||
| 70 | .. code-block:: python3 |
||
| 71 | |||
| 72 | class MyAppClass(KytosApp): |
||
| 73 | @listen_to('kytos/of_core.messages.in') |
||
| 74 | def my_handler_of_message_in(self, event): |
||
| 75 | # Do stuff here... |
||
| 76 | |||
| 77 | @listen_to('kytos/of_core.messages.out') |
||
| 78 | def my_handler_of_message_out(self, event): |
||
| 79 | # Do stuff here... |
||
| 80 | |||
| 81 | @listen_to('kytos/of_core.messages.in.ofpt_hello', |
||
| 82 | 'kytos/of_core.messages.out.ofpt_hello') |
||
| 83 | def my_handler_of_hello_messages(self, event): |
||
| 84 | # Do stuff here... |
||
| 85 | |||
| 86 | @listen_to('kytos/of_core.message.*.hello') |
||
| 87 | def my_other_handler_of_hello_messages(self, event): |
||
| 88 | # Do stuff here... |
||
| 89 | |||
| 90 | @listen_to('kytos/of_core.message.*.hello') |
||
| 91 | def my_handler_of_hello_messages(self, event): |
||
| 92 | # Do stuff here... |
||
| 93 | |||
| 94 | @listen_to('kytos/of_core.message.*') |
||
| 95 | def my_stats_handler_of_any_message(self, event): |
||
| 96 | # Do stuff here... |
||
| 97 | |||
| 98 | @listen_to("some_db_oriented_event", pool="db") |
||
| 99 | def db_update(self, event): |
||
| 100 | # Do stuff here... |
||
| 101 | """ |
||
| 102 | def thread_decorator(handler): |
||
| 103 | """Decorate the handler method. |
||
| 104 | |||
| 105 | Returns: |
||
| 106 | A method with an `events` attribute (list of events to be listened) |
||
| 107 | and also decorated to run on a new thread. |
||
| 108 | |||
| 109 | """ |
||
| 110 | @run_on_thread |
||
| 111 | def threaded_handler(*args): |
||
| 112 | """Decorate the handler to run from a new thread.""" |
||
| 113 | handler(*args) |
||
| 114 | |||
| 115 | threaded_handler.events = [event] |
||
| 116 | threaded_handler.events.extend(events) |
||
| 117 | return threaded_handler |
||
| 118 | |||
| 119 | # pylint: disable=broad-except |
||
| 120 | def thread_pool_decorator(handler): |
||
| 121 | """Decorate the handler method. |
||
| 122 | |||
| 123 | Returns: |
||
| 124 | A method with an `events` attribute (list of events to be listened) |
||
| 125 | and also decorated to run on in the thread pool |
||
| 126 | |||
| 127 | """ |
||
| 128 | def done_callback(future): |
||
| 129 | """Done callback.""" |
||
| 130 | if not future.exception(): |
||
| 131 | _ = future.result() |
||
| 132 | return |
||
| 133 | |||
| 134 | # pylint: disable=unused-argument |
||
| 135 | View Code Duplication | def handler_context(*args, **kwargs): |
|
|
|
|||
| 136 | """Handler's context for ThreadPool.""" |
||
| 137 | cls, kytos_event = args[0], args[1] |
||
| 138 | try: |
||
| 139 | result = handler(*args) |
||
| 140 | except Exception: |
||
| 141 | result = None |
||
| 142 | traceback_str = traceback.format_exc().replace("\n", ", ") |
||
| 143 | LOG.error(f"listen_to handler: {handler}, " |
||
| 144 | f"args: {args} traceback: {traceback_str}") |
||
| 145 | if hasattr(cls, "controller"): |
||
| 146 | cls.controller.dead_letter.add_event(kytos_event) |
||
| 147 | return result |
||
| 148 | |||
| 149 | View Code Duplication | def handler_context_apm(*args, apm_client=None): |
|
| 150 | """Handler's context for ThreadPool APM instrumentation.""" |
||
| 151 | cls, kytos_event = args[0], args[1] |
||
| 152 | trace_parent = kytos_event.trace_parent |
||
| 153 | tx_type = "kytos_event" |
||
| 154 | tx = apm_client.begin_transaction(transaction_type=tx_type, |
||
| 155 | trace_parent=trace_parent) |
||
| 156 | kytos_event.trace_parent = tx.trace_parent |
||
| 157 | tx.name = f"{kytos_event.name}@{cls.napp_id}" |
||
| 158 | try: |
||
| 159 | result = handler(*args) |
||
| 160 | tx.result = result |
||
| 161 | except Exception as exc: |
||
| 162 | result = None |
||
| 163 | traceback_str = traceback.format_exc().replace("\n", ", ") |
||
| 164 | LOG.error(f"listen_to handler: {handler}, " |
||
| 165 | f"args: {args} traceback: {traceback_str}") |
||
| 166 | if hasattr(cls, "controller"): |
||
| 167 | cls.controller.dead_letter.add_event(kytos_event) |
||
| 168 | apm_client.capture_exception( |
||
| 169 | exc_info=(type(exc), exc, exc.__traceback__), |
||
| 170 | context={"args": args}, |
||
| 171 | handled=False, |
||
| 172 | ) |
||
| 173 | tx.end() |
||
| 174 | apm_client.tracer.queue_func("transaction", tx.to_dict()) |
||
| 175 | return result |
||
| 176 | |||
| 177 | handler_func, kwargs = handler_context, {} |
||
| 178 | if get_apm_name() == "es": |
||
| 179 | handler_func = handler_context_apm |
||
| 180 | kwargs = {"apm_client": ElasticAPM.get_client()} |
||
| 181 | |||
| 182 | def get_executor(pool, event, default_pool="app", handler=handler): |
||
| 183 | """Get executor.""" |
||
| 184 | if pool == "dynamic_single": |
||
| 185 | return ds_executors[handler] |
||
| 186 | if pool and pool in executors: |
||
| 187 | return executors[pool] |
||
| 188 | if not event: |
||
| 189 | return executors[default_pool] |
||
| 190 | |||
| 191 | if event.name.startswith("kytos/of_core") and "sb" in executors: |
||
| 192 | return executors["sb"] |
||
| 193 | core_of = "kytos/core.openflow" |
||
| 194 | if event.name.startswith(core_of) and "sb" in executors: |
||
| 195 | return executors["sb"] |
||
| 196 | return executors[default_pool] |
||
| 197 | |||
| 198 | def inner(*args): |
||
| 199 | """Decorate the handler to run in the thread pool.""" |
||
| 200 | event = args[1] if len(args) > 1 else None |
||
| 201 | executor = get_executor(pool, event) |
||
| 202 | future = executor.submit(handler_func, *args, **kwargs) |
||
| 203 | future.add_done_callback(done_callback) |
||
| 204 | |||
| 205 | inner.events = [event] |
||
| 206 | inner.events.extend(events) |
||
| 207 | return inner |
||
| 208 | |||
| 209 | if executors: |
||
| 210 | return thread_pool_decorator |
||
| 211 | return thread_decorator |
||
| 212 | |||
| 213 | |||
| 214 | def alisten_to(event, *events): |
||
| 215 | """Decorate async subscribing methods.""" |
||
| 216 | |||
| 217 | def decorator(handler): |
||
| 218 | """Decorate the handler method. |
||
| 219 | |||
| 220 | Returns: |
||
| 221 | A method with an `events` attribute (list of events to be listened) |
||
| 222 | and also decorated as an asyncio task. |
||
| 223 | """ |
||
| 224 | # pylint: disable=unused-argument,broad-except |
||
| 225 | View Code Duplication | async def handler_context(*args, **kwargs): |
|
| 226 | """Async handler's execution context.""" |
||
| 227 | cls, kytos_event = args[0], args[1] |
||
| 228 | try: |
||
| 229 | result = await handler(*args) |
||
| 230 | except Exception: |
||
| 231 | result = None |
||
| 232 | traceback_str = traceback.format_exc().replace("\n", ", ") |
||
| 233 | LOG.error(f"alisten_to handler: {handler}, " |
||
| 234 | f"args: {args} traceback: {traceback_str}") |
||
| 235 | if hasattr(cls, "controller"): |
||
| 236 | cls.controller.dead_letter.add_event(kytos_event) |
||
| 237 | return result |
||
| 238 | |||
| 239 | View Code Duplication | async def handler_context_apm(*args, apm_client=None): |
|
| 240 | """Async handler's execution context with APM instrumentation.""" |
||
| 241 | cls, kytos_event = args[0], args[1] |
||
| 242 | trace_parent = kytos_event.trace_parent |
||
| 243 | tx_type = "kytos_event" |
||
| 244 | tx = apm_client.begin_transaction(transaction_type=tx_type, |
||
| 245 | trace_parent=trace_parent) |
||
| 246 | kytos_event.trace_parent = tx.trace_parent |
||
| 247 | tx.name = f"{kytos_event.name}@{cls.napp_id}" |
||
| 248 | try: |
||
| 249 | result = await handler(*args) |
||
| 250 | tx.result = result |
||
| 251 | except Exception as exc: |
||
| 252 | result = None |
||
| 253 | traceback_str = traceback.format_exc().replace("\n", ", ") |
||
| 254 | LOG.error(f"alisten_to handler: {handler}, " |
||
| 255 | f"args: {args} traceback: {traceback_str}") |
||
| 256 | if hasattr(cls, "controller"): |
||
| 257 | cls.controller.dead_letter.add_event(kytos_event) |
||
| 258 | apm_client.capture_exception( |
||
| 259 | exc_info=(type(exc), exc, exc.__traceback__), |
||
| 260 | context={"args": args}, |
||
| 261 | handled=False, |
||
| 262 | ) |
||
| 263 | tx.end() |
||
| 264 | apm_client.tracer.queue_func("transaction", tx.to_dict()) |
||
| 265 | return result |
||
| 266 | |||
| 267 | handler_func, kwargs = handler_context, {} |
||
| 268 | if get_apm_name() == "es": |
||
| 269 | handler_func = handler_context_apm |
||
| 270 | kwargs = {"apm_client": ElasticAPM.get_client()} |
||
| 271 | |||
| 272 | async def inner(*args): |
||
| 273 | """Inner decorated with events attribute.""" |
||
| 274 | return await handler_func(*args, **kwargs) |
||
| 275 | inner.events = [event] |
||
| 276 | inner.events.extend(events) |
||
| 277 | return inner |
||
| 278 | |||
| 279 | return decorator |
||
| 280 | |||
| 281 | |||
| 282 | def now(tzone=timezone.utc): |
||
| 283 | """Return the current datetime (default to UTC). |
||
| 284 | |||
| 285 | Args: |
||
| 286 | tzone (datetime.timezone): Specific time zone used in datetime. |
||
| 287 | |||
| 288 | Returns: |
||
| 289 | datetime.datetime.now: Date time with specific time zone. |
||
| 290 | |||
| 291 | """ |
||
| 292 | return datetime.now(tzone) |
||
| 293 | |||
| 294 | |||
| 295 | def run_on_thread(method): |
||
| 296 | """Decorate to run the decorated method inside a new thread. |
||
| 297 | |||
| 298 | Args: |
||
| 299 | method (function): function used to run as a new thread. |
||
| 300 | |||
| 301 | Returns: |
||
| 302 | Decorated method that will run inside a new thread. |
||
| 303 | When the decorated method is called, it will not return the created |
||
| 304 | thread to the caller. |
||
| 305 | |||
| 306 | """ |
||
| 307 | def threaded_method(*args): |
||
| 308 | """Ensure the handler method runs inside a new thread.""" |
||
| 309 | thread = Thread(target=method, args=args) |
||
| 310 | |||
| 311 | # Set daemon mode so that we don't have to wait for these threads |
||
| 312 | # to finish when exiting Kytos |
||
| 313 | thread.daemon = True |
||
| 314 | thread.start() |
||
| 315 | return threaded_method |
||
| 316 | |||
| 317 | |||
| 318 | def get_time(data=None): |
||
| 319 | """Receive a dictionary or a string and return a datatime instance. |
||
| 320 | |||
| 321 | data = {"year": 2006, |
||
| 322 | "month": 11, |
||
| 323 | "day": 21, |
||
| 324 | "hour": 16, |
||
| 325 | "minute": 30 , |
||
| 326 | "second": 00} |
||
| 327 | |||
| 328 | or |
||
| 329 | |||
| 330 | data = "21/11/06 16:30:00" |
||
| 331 | |||
| 332 | 2018-04-17T17:13:50Z |
||
| 333 | |||
| 334 | Args: |
||
| 335 | data (str, dict): python dict or string to be converted to datetime |
||
| 336 | |||
| 337 | Returns: |
||
| 338 | datetime: datetime instance. |
||
| 339 | |||
| 340 | """ |
||
| 341 | if isinstance(data, str): |
||
| 342 | date = datetime.strptime(data, "%Y-%m-%dT%H:%M:%S") |
||
| 343 | elif isinstance(data, dict): |
||
| 344 | date = datetime(**data) |
||
| 345 | else: |
||
| 346 | return None |
||
| 347 | return date.replace(tzinfo=timezone.utc) |
||
| 348 | |||
| 349 | |||
| 350 | def load_spec(yml_file_path: Path): |
||
| 351 | """Load and validate spec object given a yml file path.""" |
||
| 352 | return OpenAPI.from_file_path(yml_file_path) |
||
| 353 | |||
| 354 | |||
| 355 | def _request_validation_result_or_400(result: RequestUnmarshalResult) -> None: |
||
| 356 | """Request validation result or raise HTTP 400.""" |
||
| 357 | if not result.errors: |
||
| 358 | return |
||
| 359 | error_response = ( |
||
| 360 | "The request body contains invalid API data." |
||
| 361 | ) |
||
| 362 | errors = result.errors[0] |
||
| 363 | if not errors.__cause__: |
||
| 364 | error_response = str(errors) |
||
| 365 | elif hasattr(errors.__cause__, "schema_errors"): |
||
| 366 | errors = errors.__cause__ |
||
| 367 | schema_errors = errors.schema_errors[0] |
||
| 368 | error_log = { |
||
| 369 | "error_message": schema_errors.message, |
||
| 370 | "error_validator": schema_errors.validator, |
||
| 371 | "error_validator_value": schema_errors.validator_value, |
||
| 372 | "error_path": list(schema_errors.path), |
||
| 373 | "error_schema": schema_errors.schema, |
||
| 374 | "error_schema_path": list(schema_errors.schema_path), |
||
| 375 | } |
||
| 376 | LOG.debug(f"Invalid request (API schema): {error_log}") |
||
| 377 | error_response += f" {schema_errors.message} for field" |
||
| 378 | error_response += ( |
||
| 379 | f" {'/'.join(map(str,schema_errors.path))}." |
||
| 380 | ) |
||
| 381 | else: |
||
| 382 | error_response = str(errors.__cause__) |
||
| 383 | raise HTTPException(400, detail=error_response) |
||
| 384 | |||
| 385 | |||
| 386 | def validate_openapi_request( |
||
| 387 | spec: OpenAPI, request: Request, loop: AbstractEventLoop |
||
| 388 | ) -> bytes: |
||
| 389 | """Validate a Request given an OpenAPI spec. |
||
| 390 | |||
| 391 | This function is meant to be called from a synchronous context |
||
| 392 | since StarletteOpenAPIRequest internally uses `asgiref.sync.AsyncToSync` |
||
| 393 | and its forcing not to use the current running event loop. |
||
| 394 | """ |
||
| 395 | body = get_body(request, loop) |
||
| 396 | if body: |
||
| 397 | content_type_json_or_415(request) |
||
| 398 | openapi_request = StarletteOpenAPIRequest(request, body) |
||
| 399 | result = spec.unmarshal_request(openapi_request) |
||
| 400 | _request_validation_result_or_400(result) |
||
| 401 | return body |
||
| 402 | |||
| 403 | |||
| 404 | async def avalidate_openapi_request( |
||
| 405 | spec: OpenAPI, |
||
| 406 | request: Request, |
||
| 407 | ) -> bytes: |
||
| 408 | """Async validate_openapi_request. |
||
| 409 | |||
| 410 | This function is for async routes. It also returns the request body bytes. |
||
| 411 | It does not try to assume that it'll have a loadable json body to work |
||
| 412 | seamlessly with as many type of endpoints with minimal friction. |
||
| 413 | You can use await aget_json_or_400(request) to get the request body. |
||
| 414 | |||
| 415 | Example: |
||
| 416 | |||
| 417 | await avalidate_openapi_request(self.spec, request) |
||
| 418 | body = await aget_json_or_400(request) |
||
| 419 | """ |
||
| 420 | body = await request.body() |
||
| 421 | if body: |
||
| 422 | content_type_json_or_415(request) |
||
| 423 | openapi_request = StarletteOpenAPIRequest(request, body) |
||
| 424 | result = spec.unmarshal_request(openapi_request) |
||
| 425 | _request_validation_result_or_400(result) |
||
| 426 | return body |
||
| 427 | |||
| 428 | |||
| 429 | def validate_openapi(spec): |
||
| 430 | """Decorator to validate a REST endpoint input. |
||
| 431 | |||
| 432 | Uses the schema defined in the openapi.yml file |
||
| 433 | to validate. |
||
| 434 | """ |
||
| 435 | def validate_decorator(func): |
||
| 436 | @functools.wraps(func) |
||
| 437 | def wrapper_validate(*args, **kwargs): |
||
| 438 | request: Request = None |
||
| 439 | napp = None |
||
| 440 | for arg in args: |
||
| 441 | if isinstance(arg, Request): |
||
| 442 | request = arg |
||
| 443 | if hasattr(arg, "controller"): |
||
| 444 | napp = arg |
||
| 445 | if request and napp: |
||
| 446 | break |
||
| 447 | if not request: |
||
| 448 | err = f"{func.__name__} args doesn't have a Request argument" |
||
| 449 | raise RuntimeError(err) |
||
| 450 | if not napp: |
||
| 451 | err = f"{func.__name__} should be a NApp method to get ev_loop" |
||
| 452 | raise RuntimeError(err) |
||
| 453 | validate_openapi_request(spec, request, napp.controller.loop) |
||
| 454 | return func(*args, **kwargs) |
||
| 455 | return wrapper_validate |
||
| 456 | return validate_decorator |
||
| 457 |