Total Complexity | 47 |
Total Lines | 462 |
Duplicated Lines | 17.32 % |
Changes | 0 |
Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.
Common duplication problems, and corresponding solutions are:
Complex classes like kytos.core.helpers often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
1 | """Utilities functions used in Kytos.""" |
||
2 | import functools |
||
3 | import logging |
||
4 | import traceback |
||
5 | from asyncio import AbstractEventLoop |
||
6 | from concurrent.futures import ThreadPoolExecutor |
||
7 | from datetime import datetime, timezone |
||
8 | from pathlib import Path |
||
9 | from threading import Thread |
||
10 | |||
11 | from openapi_core.spec import Spec |
||
12 | from openapi_core.spec.shortcuts import create_spec |
||
13 | from openapi_core.validation.request import openapi_request_validator |
||
14 | from openapi_core.validation.request.datatypes import RequestValidationResult |
||
15 | from openapi_spec_validator import validate_spec |
||
16 | from openapi_spec_validator.readers import read_from_filename |
||
17 | |||
18 | from kytos.core.apm import ElasticAPM |
||
19 | from kytos.core.config import KytosConfig |
||
20 | from kytos.core.rest_api import (AStarletteOpenAPIRequest, HTTPException, |
||
21 | Request, StarletteOpenAPIRequest, |
||
22 | content_type_json_or_415, get_body) |
||
23 | |||
24 | __all__ = ['listen_to', 'now', 'run_on_thread', 'get_time'] |
||
25 | |||
26 | LOG = logging.getLogger(__name__) |
||
27 | |||
28 | |||
29 | def get_thread_pool_max_workers(): |
||
30 | """Get the number of thread pool max workers.""" |
||
31 | return KytosConfig().options["daemon"].thread_pool_max_workers |
||
32 | |||
33 | |||
34 | def get_apm_name(): |
||
35 | """Get apm backend name.""" |
||
36 | return KytosConfig().options["daemon"].apm |
||
37 | |||
38 | |||
39 | # pylint: disable=invalid-name |
||
40 | executors = {name: ThreadPoolExecutor(max_workers=max_workers, |
||
41 | thread_name_prefix=f"thread_pool_{name}") |
||
42 | for name, max_workers in get_thread_pool_max_workers().items()} |
||
43 | |||
44 | |||
45 | def listen_to(event, *events, pool=None): |
||
46 | """Decorate Event Listener methods. |
||
47 | |||
48 | This decorator was built to be used on NAPPs methods to define which |
||
49 | type of event the method will handle. With this, we will be able to |
||
50 | 'schedule' the app/method to receive an event when a new event is |
||
51 | registered on the controller buffers. |
||
52 | By using the run_on_thread decorator, we also guarantee that the method |
||
53 | (handler) will be called from inside a new thread, avoiding this method to |
||
54 | block its caller. |
||
55 | |||
56 | The decorator will add an attribute to the method called 'events', that |
||
57 | will be a list of the events that the method will handle. |
||
58 | |||
59 | The event that will be listened to is always a string, but it can represent |
||
60 | a regular expression to match against multiple Event Types. All listened |
||
61 | events are documented in :doc:`/developer/listened_events` section. |
||
62 | |||
63 | The pool option gives you control on which ThreadPoolExecutor that will |
||
64 | execute the decorated handler. This knob is meant for prioritization, |
||
65 | allowing executions on different pools depending on the handler primary |
||
66 | responsibility and importance to avoid potential scheduling starvation. |
||
67 | |||
68 | Example of usage: |
||
69 | |||
70 | .. code-block:: python3 |
||
71 | |||
72 | class MyAppClass(KytosApp): |
||
73 | @listen_to('kytos/of_core.messages.in') |
||
74 | def my_handler_of_message_in(self, event): |
||
75 | # Do stuff here... |
||
76 | |||
77 | @listen_to('kytos/of_core.messages.out') |
||
78 | def my_handler_of_message_out(self, event): |
||
79 | # Do stuff here... |
||
80 | |||
81 | @listen_to('kytos/of_core.messages.in.ofpt_hello', |
||
82 | 'kytos/of_core.messages.out.ofpt_hello') |
||
83 | def my_handler_of_hello_messages(self, event): |
||
84 | # Do stuff here... |
||
85 | |||
86 | @listen_to('kytos/of_core.message.*.hello') |
||
87 | def my_other_handler_of_hello_messages(self, event): |
||
88 | # Do stuff here... |
||
89 | |||
90 | @listen_to('kytos/of_core.message.*.hello') |
||
91 | def my_handler_of_hello_messages(self, event): |
||
92 | # Do stuff here... |
||
93 | |||
94 | @listen_to('kytos/of_core.message.*') |
||
95 | def my_stats_handler_of_any_message(self, event): |
||
96 | # Do stuff here... |
||
97 | |||
98 | @listen_to("some_db_oriented_event", pool="db") |
||
99 | def db_update(self, event): |
||
100 | # Do stuff here... |
||
101 | """ |
||
102 | def thread_decorator(handler): |
||
103 | """Decorate the handler method. |
||
104 | |||
105 | Returns: |
||
106 | A method with an `events` attribute (list of events to be listened) |
||
107 | and also decorated to run on a new thread. |
||
108 | |||
109 | """ |
||
110 | @run_on_thread |
||
111 | def threaded_handler(*args): |
||
112 | """Decorate the handler to run from a new thread.""" |
||
113 | handler(*args) |
||
114 | |||
115 | threaded_handler.events = [event] |
||
116 | threaded_handler.events.extend(events) |
||
117 | return threaded_handler |
||
118 | |||
119 | # pylint: disable=broad-except |
||
120 | def thread_pool_decorator(handler): |
||
121 | """Decorate the handler method. |
||
122 | |||
123 | Returns: |
||
124 | A method with an `events` attribute (list of events to be listened) |
||
125 | and also decorated to run on in the thread pool |
||
126 | |||
127 | """ |
||
128 | def done_callback(future): |
||
129 | """Done callback.""" |
||
130 | if not future.exception(): |
||
131 | _ = future.result() |
||
132 | return |
||
133 | |||
134 | # pylint: disable=unused-argument |
||
135 | View Code Duplication | def handler_context(*args, **kwargs): |
|
|
|||
136 | """Handler's context for ThreadPool.""" |
||
137 | cls, kytos_event = args[0], args[1] |
||
138 | try: |
||
139 | result = handler(*args) |
||
140 | except Exception: |
||
141 | result = None |
||
142 | traceback_str = traceback.format_exc().replace("\n", ", ") |
||
143 | LOG.error(f"listen_to handler: {handler}, " |
||
144 | f"args: {args} traceback: {traceback_str}") |
||
145 | if hasattr(cls, "controller"): |
||
146 | cls.controller.dead_letter.add_event(kytos_event) |
||
147 | return result |
||
148 | |||
149 | View Code Duplication | def handler_context_apm(*args, apm_client=None): |
|
150 | """Handler's context for ThreadPool APM instrumentation.""" |
||
151 | cls, kytos_event = args[0], args[1] |
||
152 | trace_parent = kytos_event.trace_parent |
||
153 | tx_type = "kytos_event" |
||
154 | tx = apm_client.begin_transaction(transaction_type=tx_type, |
||
155 | trace_parent=trace_parent) |
||
156 | kytos_event.trace_parent = tx.trace_parent |
||
157 | tx.name = f"{kytos_event.name}@{cls.napp_id}" |
||
158 | try: |
||
159 | result = handler(*args) |
||
160 | tx.result = result |
||
161 | except Exception as exc: |
||
162 | result = None |
||
163 | traceback_str = traceback.format_exc().replace("\n", ", ") |
||
164 | LOG.error(f"listen_to handler: {handler}, " |
||
165 | f"args: {args} traceback: {traceback_str}") |
||
166 | if hasattr(cls, "controller"): |
||
167 | cls.controller.dead_letter.add_event(kytos_event) |
||
168 | apm_client.capture_exception( |
||
169 | exc_info=(type(exc), exc, exc.__traceback__), |
||
170 | context={"args": args}, |
||
171 | handled=False, |
||
172 | ) |
||
173 | tx.end() |
||
174 | apm_client.tracer.queue_func("transaction", tx.to_dict()) |
||
175 | return result |
||
176 | |||
177 | handler_func, kwargs = handler_context, {} |
||
178 | if get_apm_name() == "es": |
||
179 | handler_func = handler_context_apm |
||
180 | kwargs = dict(apm_client=ElasticAPM.get_client()) |
||
181 | |||
182 | def get_executor(pool, event, default_pool="app"): |
||
183 | """Get executor.""" |
||
184 | if pool: |
||
185 | return executors[pool] |
||
186 | if not event: |
||
187 | return executors[default_pool] |
||
188 | |||
189 | if event.name.startswith("kytos/of_core") and "sb" in executors: |
||
190 | return executors["sb"] |
||
191 | core_of = "kytos/core.openflow" |
||
192 | if event.name.startswith(core_of) and "sb" in executors: |
||
193 | return executors["sb"] |
||
194 | if event.name.startswith("kytos.storehouse") and "db" in executors: |
||
195 | return executors["db"] |
||
196 | return executors[default_pool] |
||
197 | |||
198 | def inner(*args): |
||
199 | """Decorate the handler to run in the thread pool.""" |
||
200 | event = args[1] if len(args) > 1 else None |
||
201 | executor = get_executor(pool, event) |
||
202 | future = executor.submit(handler_func, *args, **kwargs) |
||
203 | future.add_done_callback(done_callback) |
||
204 | |||
205 | inner.events = [event] |
||
206 | inner.events.extend(events) |
||
207 | return inner |
||
208 | |||
209 | if executors: |
||
210 | return thread_pool_decorator |
||
211 | return thread_decorator |
||
212 | |||
213 | |||
214 | def alisten_to(event, *events): |
||
215 | """Decorate async subscribing methods.""" |
||
216 | |||
217 | def decorator(handler): |
||
218 | """Decorate the handler method. |
||
219 | |||
220 | Returns: |
||
221 | A method with an `events` attribute (list of events to be listened) |
||
222 | and also decorated as an asyncio task. |
||
223 | """ |
||
224 | # pylint: disable=unused-argument,broad-except |
||
225 | View Code Duplication | async def handler_context(*args, **kwargs): |
|
226 | """Async handler's execution context.""" |
||
227 | cls, kytos_event = args[0], args[1] |
||
228 | try: |
||
229 | result = await handler(*args) |
||
230 | except Exception: |
||
231 | result = None |
||
232 | traceback_str = traceback.format_exc().replace("\n", ", ") |
||
233 | LOG.error(f"alisten_to handler: {handler}, " |
||
234 | f"args: {args} traceback: {traceback_str}") |
||
235 | if hasattr(cls, "controller"): |
||
236 | cls.controller.dead_letter.add_event(kytos_event) |
||
237 | return result |
||
238 | |||
239 | View Code Duplication | async def handler_context_apm(*args, apm_client=None): |
|
240 | """Async handler's execution context with APM instrumentation.""" |
||
241 | cls, kytos_event = args[0], args[1] |
||
242 | trace_parent = kytos_event.trace_parent |
||
243 | tx_type = "kytos_event" |
||
244 | tx = apm_client.begin_transaction(transaction_type=tx_type, |
||
245 | trace_parent=trace_parent) |
||
246 | kytos_event.trace_parent = tx.trace_parent |
||
247 | tx.name = f"{kytos_event.name}@{cls.napp_id}" |
||
248 | try: |
||
249 | result = await handler(*args) |
||
250 | tx.result = result |
||
251 | except Exception as exc: |
||
252 | result = None |
||
253 | traceback_str = traceback.format_exc().replace("\n", ", ") |
||
254 | LOG.error(f"alisten_to handler: {handler}, " |
||
255 | f"args: {args} traceback: {traceback_str}") |
||
256 | if hasattr(cls, "controller"): |
||
257 | cls.controller.dead_letter.add_event(kytos_event) |
||
258 | apm_client.capture_exception( |
||
259 | exc_info=(type(exc), exc, exc.__traceback__), |
||
260 | context={"args": args}, |
||
261 | handled=False, |
||
262 | ) |
||
263 | tx.end() |
||
264 | apm_client.tracer.queue_func("transaction", tx.to_dict()) |
||
265 | return result |
||
266 | |||
267 | handler_func, kwargs = handler_context, {} |
||
268 | if get_apm_name() == "es": |
||
269 | handler_func = handler_context_apm |
||
270 | kwargs = dict(apm_client=ElasticAPM.get_client()) |
||
271 | |||
272 | async def inner(*args): |
||
273 | """Inner decorated with events attribute.""" |
||
274 | return await handler_func(*args, **kwargs) |
||
275 | inner.events = [event] |
||
276 | inner.events.extend(events) |
||
277 | return inner |
||
278 | |||
279 | return decorator |
||
280 | |||
281 | |||
282 | def now(tzone=timezone.utc): |
||
283 | """Return the current datetime (default to UTC). |
||
284 | |||
285 | Args: |
||
286 | tzone (datetime.timezone): Specific time zone used in datetime. |
||
287 | |||
288 | Returns: |
||
289 | datetime.datetime.now: Date time with specific time zone. |
||
290 | |||
291 | """ |
||
292 | return datetime.now(tzone) |
||
293 | |||
294 | |||
295 | def run_on_thread(method): |
||
296 | """Decorate to run the decorated method inside a new thread. |
||
297 | |||
298 | Args: |
||
299 | method (function): function used to run as a new thread. |
||
300 | |||
301 | Returns: |
||
302 | Decorated method that will run inside a new thread. |
||
303 | When the decorated method is called, it will not return the created |
||
304 | thread to the caller. |
||
305 | |||
306 | """ |
||
307 | def threaded_method(*args): |
||
308 | """Ensure the handler method runs inside a new thread.""" |
||
309 | thread = Thread(target=method, args=args) |
||
310 | |||
311 | # Set daemon mode so that we don't have to wait for these threads |
||
312 | # to finish when exiting Kytos |
||
313 | thread.daemon = True |
||
314 | thread.start() |
||
315 | return threaded_method |
||
316 | |||
317 | |||
318 | def get_time(data=None): |
||
319 | """Receive a dictionary or a string and return a datatime instance. |
||
320 | |||
321 | data = {"year": 2006, |
||
322 | "month": 11, |
||
323 | "day": 21, |
||
324 | "hour": 16, |
||
325 | "minute": 30 , |
||
326 | "second": 00} |
||
327 | |||
328 | or |
||
329 | |||
330 | data = "21/11/06 16:30:00" |
||
331 | |||
332 | 2018-04-17T17:13:50Z |
||
333 | |||
334 | Args: |
||
335 | data (str, dict): python dict or string to be converted to datetime |
||
336 | |||
337 | Returns: |
||
338 | datetime: datetime instance. |
||
339 | |||
340 | """ |
||
341 | if isinstance(data, str): |
||
342 | date = datetime.strptime(data, "%Y-%m-%dT%H:%M:%S") |
||
343 | elif isinstance(data, dict): |
||
344 | date = datetime(**data) |
||
345 | else: |
||
346 | return None |
||
347 | return date.replace(tzinfo=timezone.utc) |
||
348 | |||
349 | |||
350 | def _read_from_filename(yml_file_path: Path) -> dict: |
||
351 | """Read from yml filename.""" |
||
352 | spec_dict, _ = read_from_filename(yml_file_path) |
||
353 | return spec_dict |
||
354 | |||
355 | |||
356 | def load_spec(yml_file_path: Path): |
||
357 | """Load and validate spec object given a yml file path.""" |
||
358 | spec_dict = _read_from_filename(yml_file_path) |
||
359 | validate_spec(spec_dict) |
||
360 | return create_spec(spec_dict) |
||
361 | |||
362 | |||
363 | def _request_validation_result_or_400(result: RequestValidationResult) -> None: |
||
364 | """Request validation result or raise HTTP 400.""" |
||
365 | if not result.errors: |
||
366 | return |
||
367 | error_response = ( |
||
368 | "The request body contains invalid API data." |
||
369 | ) |
||
370 | errors = result.errors[0] |
||
371 | if hasattr(errors, "schema_errors"): |
||
372 | schema_errors = errors.schema_errors[0] |
||
373 | error_log = { |
||
374 | "error_message": schema_errors.message, |
||
375 | "error_validator": schema_errors.validator, |
||
376 | "error_validator_value": schema_errors.validator_value, |
||
377 | "error_path": list(schema_errors.path), |
||
378 | "error_schema": schema_errors.schema, |
||
379 | "error_schema_path": list(schema_errors.schema_path), |
||
380 | } |
||
381 | LOG.debug(f"Invalid request (API schema): {error_log}") |
||
382 | error_response += f" {schema_errors.message} for field" |
||
383 | error_response += ( |
||
384 | f" {'/'.join(map(str,schema_errors.path))}." |
||
385 | ) |
||
386 | else: |
||
387 | error_response = str(errors) |
||
388 | raise HTTPException(400, detail=error_response) |
||
389 | |||
390 | |||
391 | def validate_openapi_request( |
||
392 | spec: Spec, request: Request, loop: AbstractEventLoop |
||
393 | ) -> bytes: |
||
394 | """Validate a Request given an OpenAPI spec. |
||
395 | |||
396 | This function is meant to be called from a synchronous context |
||
397 | since StarletteOpenAPIRequest internally uses `asgiref.sync.AsyncToSync` |
||
398 | and its forcing not to use the current running event loop. |
||
399 | """ |
||
400 | body = get_body(request, loop) |
||
401 | if body: |
||
402 | content_type_json_or_415(request) |
||
403 | openapi_request = StarletteOpenAPIRequest(request, body) |
||
404 | result = openapi_request_validator.validate(spec, openapi_request) |
||
405 | _request_validation_result_or_400(result) |
||
406 | return body |
||
407 | |||
408 | |||
409 | async def avalidate_openapi_request( |
||
410 | spec: Spec, |
||
411 | request: Request, |
||
412 | ) -> bytes: |
||
413 | """Async validate_openapi_request. |
||
414 | |||
415 | This function is for async routes. It also returns the request body bytes. |
||
416 | It does not try to assume that it'll have a loadable json body to work |
||
417 | seamlessly with as many type of endpoints with minimal friction. |
||
418 | You can use await aget_json_or_400(request) to get the request body. |
||
419 | |||
420 | Example: |
||
421 | |||
422 | await avalidate_openapi_request(self.spec, request) |
||
423 | body = await aget_json_or_400(request) |
||
424 | """ |
||
425 | body = await request.body() |
||
426 | if body: |
||
427 | content_type_json_or_415(request) |
||
428 | openapi_request = AStarletteOpenAPIRequest(request, body) |
||
429 | result = openapi_request_validator.validate(spec, openapi_request) |
||
430 | _request_validation_result_or_400(result) |
||
431 | return body |
||
432 | |||
433 | |||
434 | def validate_openapi(spec): |
||
435 | """Decorator to validate a REST endpoint input. |
||
436 | |||
437 | Uses the schema defined in the openapi.yml file |
||
438 | to validate. |
||
439 | """ |
||
440 | def validate_decorator(func): |
||
441 | @functools.wraps(func) |
||
442 | def wrapper_validate(*args, **kwargs): |
||
443 | request: Request = None |
||
444 | napp = None |
||
445 | for arg in args: |
||
446 | if isinstance(arg, Request): |
||
447 | request = arg |
||
448 | if hasattr(arg, "controller"): |
||
449 | napp = arg |
||
450 | if request and napp: |
||
451 | break |
||
452 | if not request: |
||
453 | err = f"{func.__name__} args doesn't have a Request argument" |
||
454 | raise RuntimeError(err) |
||
455 | if not napp: |
||
456 | err = f"{func.__name__} should be a NApp method to get ev_loop" |
||
457 | raise RuntimeError(err) |
||
458 | validate_openapi_request(spec, request, napp.controller.loop) |
||
459 | return func(*args, **kwargs) |
||
460 | return wrapper_validate |
||
461 | return validate_decorator |
||
462 |