kytos.core.helpers   B
last analyzed

Complexity

Total Complexity 51

Size/Duplication

Total Lines 454
Duplicated Lines 17.62 %

Importance

Changes 0
Metric Value
eloc 233
dl 80
loc 454
rs 7.92
c 0
b 0
f 0
wmc 51

13 Functions

Rating   Name   Duplication   Size   Complexity  
A get_apm_name() 0 3 1
A get_thread_pool_max_workers() 0 3 1
A avalidate_openapi_request() 0 25 3
A get_time() 0 30 3
B alisten_to() 40 66 6
A load_spec() 0 5 1
A _request_validation_result_or_400() 0 12 5
A validate_openapi_request() 0 18 3
B validate_openapi() 0 28 8
A _read_from_filename() 0 4 1
A run_on_thread() 0 21 1
F listen_to() 40 167 17
A now() 0 11 1

How to fix   Duplicated Code    Complexity   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

Complexity

 Tip:   Before tackling complexity, make sure that you eliminate any duplication first. This often can reduce the size of classes significantly.

Complex classes like kytos.core.helpers often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""Utilities functions used in Kytos."""
2
import functools
3
import logging
4
import traceback
5
from asyncio import AbstractEventLoop
6
from collections import defaultdict
7
from concurrent.futures import ThreadPoolExecutor
8
from datetime import datetime, timezone
9
from pathlib import Path
10
from threading import Thread
11
12
from openapi_core import Spec, unmarshal_request
13
from openapi_core.exceptions import OpenAPIError
14
from openapi_spec_validator import validate_spec
15
from openapi_spec_validator.readers import read_from_filename
16
17
from kytos.core.apm import ElasticAPM
18
from kytos.core.config import KytosConfig
19
from kytos.core.rest_api import (AStarletteOpenAPIRequest, HTTPException,
20
                                 Request, StarletteOpenAPIRequest,
21
                                 content_type_json_or_415, get_body)
22
23
__all__ = ['listen_to', 'now', 'run_on_thread', 'get_time']
24
25
LOG = logging.getLogger(__name__)
26
27
28
def get_thread_pool_max_workers():
29
    """Get the number of thread pool max workers."""
30
    return KytosConfig().options["daemon"].thread_pool_max_workers
31
32
33
def get_apm_name():
34
    """Get apm backend name."""
35
    return KytosConfig().options["daemon"].apm
36
37
38
# pylint: disable=invalid-name
39
executors = {name: ThreadPoolExecutor(max_workers=max_workers,
40
                                      thread_name_prefix=f"thread_pool_{name}")
41
             for name, max_workers in get_thread_pool_max_workers().items()}
42
43
ds_executors = defaultdict(lambda: ThreadPoolExecutor(max_workers=1,
44
                           thread_name_prefix="dynamic_single"))
45
46
47
def listen_to(event, *events, pool=None):
48
    """Decorate Event Listener methods.
49
50
    This decorator was built to be used on NAPPs methods to define which
51
    type of event the method will handle. With this, we will be able to
52
    'schedule' the app/method to receive an event when a new event is
53
    registered on the controller buffers.
54
    By using the run_on_thread decorator, we also guarantee that the method
55
    (handler) will be called from inside a new thread, avoiding this method to
56
    block its caller.
57
58
    The decorator will add an attribute to the method called 'events', that
59
    will be a list of the events that the method will handle.
60
61
    The event that will be listened to is always a string, but it can represent
62
    a regular expression to match against multiple Event Types. All listened
63
    events are documented in :doc:`/developer/listened_events` section.
64
65
    The pool option gives you control on which ThreadPoolExecutor that will
66
    execute the decorated handler. This knob is meant for prioritization,
67
    allowing executions on different pools depending on the handler primary
68
    responsibility and importance to avoid potential scheduling starvation.
69
70
    Example of usage:
71
72
    .. code-block:: python3
73
74
        class MyAppClass(KytosApp):
75
            @listen_to('kytos/of_core.messages.in')
76
            def my_handler_of_message_in(self, event):
77
                # Do stuff here...
78
79
            @listen_to('kytos/of_core.messages.out')
80
            def my_handler_of_message_out(self, event):
81
                # Do stuff here...
82
83
            @listen_to('kytos/of_core.messages.in.ofpt_hello',
84
                       'kytos/of_core.messages.out.ofpt_hello')
85
            def my_handler_of_hello_messages(self, event):
86
                # Do stuff here...
87
88
            @listen_to('kytos/of_core.message.*.hello')
89
            def my_other_handler_of_hello_messages(self, event):
90
                # Do stuff here...
91
92
            @listen_to('kytos/of_core.message.*.hello')
93
            def my_handler_of_hello_messages(self, event):
94
                # Do stuff here...
95
96
            @listen_to('kytos/of_core.message.*')
97
            def my_stats_handler_of_any_message(self, event):
98
                # Do stuff here...
99
100
            @listen_to("some_db_oriented_event", pool="db")
101
            def db_update(self, event):
102
                # Do stuff here...
103
    """
104
    def thread_decorator(handler):
105
        """Decorate the handler method.
106
107
        Returns:
108
            A method with an `events` attribute (list of events to be listened)
109
            and also decorated to run on a new thread.
110
111
        """
112
        @run_on_thread
113
        def threaded_handler(*args):
114
            """Decorate the handler to run from a new thread."""
115
            handler(*args)
116
117
        threaded_handler.events = [event]
118
        threaded_handler.events.extend(events)
119
        return threaded_handler
120
121
    # pylint: disable=broad-except
122
    def thread_pool_decorator(handler):
123
        """Decorate the handler method.
124
125
        Returns:
126
            A method with an `events` attribute (list of events to be listened)
127
            and also decorated to run on in the thread pool
128
129
        """
130
        def done_callback(future):
131
            """Done callback."""
132
            if not future.exception():
133
                _ = future.result()
134
                return
135
136
        # pylint: disable=unused-argument
137 View Code Duplication
        def handler_context(*args, **kwargs):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
138
            """Handler's context for ThreadPool."""
139
            cls, kytos_event = args[0], args[1]
140
            try:
141
                result = handler(*args)
142
            except Exception:
143
                result = None
144
                traceback_str = traceback.format_exc()
145
                LOG.error(f"listen_to handler: {handler}, "
146
                          f"args: {args} traceback: {traceback_str}")
147
                if hasattr(cls, "controller"):
148
                    cls.controller.dead_letter.add_event(kytos_event)
149
            return result
150
151 View Code Duplication
        def handler_context_apm(*args, apm_client=None):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
152
            """Handler's context for ThreadPool APM instrumentation."""
153
            cls, kytos_event = args[0], args[1]
154
            trace_parent = kytos_event.trace_parent
155
            tx_type = "kytos_event"
156
            tx = apm_client.begin_transaction(transaction_type=tx_type,
157
                                              trace_parent=trace_parent)
158
            kytos_event.trace_parent = tx.trace_parent
159
            tx.name = f"{kytos_event.name}@{cls.napp_id}"
160
            try:
161
                result = handler(*args)
162
                tx.result = result
163
            except Exception as exc:
164
                result = None
165
                traceback_str = traceback.format_exc()
166
                LOG.error(f"listen_to handler: {handler}, "
167
                          f"args: {args} traceback: {traceback_str}")
168
                if hasattr(cls, "controller"):
169
                    cls.controller.dead_letter.add_event(kytos_event)
170
                apm_client.capture_exception(
171
                    exc_info=(type(exc), exc, exc.__traceback__),
172
                    context={"args": args},
173
                    handled=False,
174
                )
175
            tx.end()
176
            apm_client.tracer.queue_func("transaction", tx.to_dict())
177
            return result
178
179
        handler_func, kwargs = handler_context, {}
180
        if get_apm_name() == "es":
181
            handler_func = handler_context_apm
182
            kwargs = {"apm_client": ElasticAPM.get_client()}
183
184
        def get_executor(pool, event, default_pool="app", handler=handler):
185
            """Get executor."""
186
            if pool == "dynamic_single":
187
                return ds_executors[handler]
188
            if pool and pool in executors:
189
                return executors[pool]
190
            if not event:
191
                return executors[default_pool]
192
193
            if event.name.startswith("kytos/of_core") and "sb" in executors:
194
                return executors["sb"]
195
            core_of = "kytos/core.openflow"
196
            if event.name.startswith(core_of) and "sb" in executors:
197
                return executors["sb"]
198
            return executors[default_pool]
199
200
        def inner(*args):
201
            """Decorate the handler to run in the thread pool."""
202
            event = args[1] if len(args) > 1 else None
203
            executor = get_executor(pool, event)
204
            future = executor.submit(handler_func, *args, **kwargs)
205
            future.add_done_callback(done_callback)
206
207
        inner.events = [event]
208
        inner.events.extend(events)
209
        return inner
210
211
    if executors:
212
        return thread_pool_decorator
213
    return thread_decorator
214
215
216
def alisten_to(event, *events):
217
    """Decorate async subscribing methods."""
218
219
    def decorator(handler):
220
        """Decorate the handler method.
221
222
        Returns:
223
            A method with an `events` attribute (list of events to be listened)
224
            and also decorated as an asyncio task.
225
        """
226
        # pylint: disable=unused-argument,broad-except
227 View Code Duplication
        async def handler_context(*args, **kwargs):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
228
            """Async handler's execution context."""
229
            cls, kytos_event = args[0], args[1]
230
            try:
231
                result = await handler(*args)
232
            except Exception:
233
                result = None
234
                traceback_str = traceback.format_exc()
235
                LOG.error(f"alisten_to handler: {handler}, "
236
                          f"args: {args} traceback: {traceback_str}")
237
                if hasattr(cls, "controller"):
238
                    cls.controller.dead_letter.add_event(kytos_event)
239
            return result
240
241 View Code Duplication
        async def handler_context_apm(*args, apm_client=None):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
242
            """Async handler's execution context with APM instrumentation."""
243
            cls, kytos_event = args[0], args[1]
244
            trace_parent = kytos_event.trace_parent
245
            tx_type = "kytos_event"
246
            tx = apm_client.begin_transaction(transaction_type=tx_type,
247
                                              trace_parent=trace_parent)
248
            kytos_event.trace_parent = tx.trace_parent
249
            tx.name = f"{kytos_event.name}@{cls.napp_id}"
250
            try:
251
                result = await handler(*args)
252
                tx.result = result
253
            except Exception as exc:
254
                result = None
255
                traceback_str = traceback.format_exc()
256
                LOG.error(f"alisten_to handler: {handler}, "
257
                          f"args: {args} traceback: {traceback_str}")
258
                if hasattr(cls, "controller"):
259
                    cls.controller.dead_letter.add_event(kytos_event)
260
                apm_client.capture_exception(
261
                    exc_info=(type(exc), exc, exc.__traceback__),
262
                    context={"args": args},
263
                    handled=False,
264
                )
265
            tx.end()
266
            apm_client.tracer.queue_func("transaction", tx.to_dict())
267
            return result
268
269
        handler_func, kwargs = handler_context, {}
270
        if get_apm_name() == "es":
271
            handler_func = handler_context_apm
272
            kwargs = {"apm_client": ElasticAPM.get_client()}
273
274
        async def inner(*args):
275
            """Inner decorated with events attribute."""
276
            return await handler_func(*args, **kwargs)
277
        inner.events = [event]
278
        inner.events.extend(events)
279
        return inner
280
281
    return decorator
282
283
284
def now(tzone=timezone.utc):
285
    """Return the current datetime (default to UTC).
286
287
    Args:
288
        tzone (datetime.timezone): Specific time zone used in datetime.
289
290
    Returns:
291
        datetime.datetime.now: Date time with specific time zone.
292
293
    """
294
    return datetime.now(tzone)
295
296
297
def run_on_thread(method):
298
    """Decorate to run the decorated method inside a new thread.
299
300
    Args:
301
        method (function): function used to run as a new thread.
302
303
    Returns:
304
        Decorated method that will run inside a new thread.
305
        When the decorated method is called, it will not return the created
306
        thread to the caller.
307
308
    """
309
    def threaded_method(*args):
310
        """Ensure the handler method runs inside a new thread."""
311
        thread = Thread(target=method, args=args)
312
313
        # Set daemon mode so that we don't have to wait for these threads
314
        # to finish when exiting Kytos
315
        thread.daemon = True
316
        thread.start()
317
    return threaded_method
318
319
320
def get_time(data=None):
321
    """Receive a dictionary or a string and return a datatime instance.
322
323
    data = {"year": 2006,
324
            "month": 11,
325
            "day": 21,
326
            "hour": 16,
327
            "minute": 30 ,
328
            "second": 00}
329
330
    or
331
332
    data = "21/11/06 16:30:00"
333
334
    2018-04-17T17:13:50Z
335
336
    Args:
337
        data (str, dict): python dict or string to be converted to datetime
338
339
    Returns:
340
        datetime: datetime instance.
341
342
    """
343
    if isinstance(data, str):
344
        date = datetime.strptime(data, "%Y-%m-%dT%H:%M:%S")
345
    elif isinstance(data, dict):
346
        date = datetime(**data)
347
    else:
348
        return None
349
    return date.replace(tzinfo=timezone.utc)
350
351
352
def _read_from_filename(yml_file_path: Path) -> dict:
353
    """Read from yml filename."""
354
    spec_dict, _ = read_from_filename(yml_file_path)
355
    return spec_dict
356
357
358
def load_spec(yml_file_path: Path):
359
    """Load and validate spec object given a yml file path."""
360
    spec = _read_from_filename(yml_file_path)
361
    validate_spec(spec)
362
    return Spec.from_dict(spec)
363
364
365
def _request_validation_result_or_400(errors: OpenAPIError) -> None:
366
    """Raise HTTP 400."""
367
    error_response = "The request body contains invalid API data."
368
    if not errors.__cause__:
369
        error_response = str(errors)
370
    elif (hasattr(errors.__cause__, "schema_errors") and
371
            errors.__cause__.schema_errors):
372
        schema_errors = errors.__cause__.schema_errors
373
        for error in schema_errors:
374
            error_response += f", {error.message} for field"
375
            error_response += f" {'/'.join(map(str,error.path))}."
376
    raise HTTPException(400, detail=error_response)
377
378
379
def validate_openapi_request(
380
    spec: Spec, request: Request, loop: AbstractEventLoop
381
) -> bytes:
382
    """Validate a Request given an OpenAPI spec.
383
384
    This function is meant to be called from a synchronous context
385
    since StarletteOpenAPIRequest internally uses `asgiref.sync.AsyncToSync`
386
    and its forcing not to use the current running event loop.
387
    """
388
    body = get_body(request, loop)
389
    if body:
390
        content_type_json_or_415(request)
391
    openapi_request = StarletteOpenAPIRequest(request, body)
392
    try:
393
        unmarshal_request(openapi_request, spec)
394
    except OpenAPIError as err:
395
        _request_validation_result_or_400(err)
396
    return body
397
398
399
async def avalidate_openapi_request(
400
        spec: Spec,
401
        request: Request,
402
) -> bytes:
403
    """Async validate_openapi_request.
404
405
    This function is for async routes. It also returns the request body bytes.
406
    It does not try to assume that it'll have a loadable json body to work
407
    seamlessly with as many type of endpoints with minimal friction.
408
    You can use await aget_json_or_400(request) to get the request body.
409
410
    Example:
411
412
    await avalidate_openapi_request(self.spec, request)
413
    body = await aget_json_or_400(request)
414
    """
415
    body = await request.body()
416
    if body:
417
        content_type_json_or_415(request)
418
    openapi_request = AStarletteOpenAPIRequest(request, body)
419
    try:
420
        unmarshal_request(openapi_request, spec)
421
    except OpenAPIError as err:
422
        _request_validation_result_or_400(err)
423
    return body
424
425
426
def validate_openapi(spec):
427
    """Decorator to validate a REST endpoint input.
428
429
    Uses the schema defined in the openapi.yml file
430
    to validate.
431
    """
432
    def validate_decorator(func):
433
        @functools.wraps(func)
434
        def wrapper_validate(*args, **kwargs):
435
            request: Request = None
436
            napp = None
437
            for arg in args:
438
                if isinstance(arg, Request):
439
                    request = arg
440
                if hasattr(arg, "controller"):
441
                    napp = arg
442
                if request and napp:
443
                    break
444
            if not request:
445
                err = f"{func.__name__} args doesn't have a Request argument"
446
                raise RuntimeError(err)
447
            if not napp:
448
                err = f"{func.__name__} should be a NApp method to get ev_loop"
449
                raise RuntimeError(err)
450
            validate_openapi_request(spec, request, napp.controller.loop)
451
            return func(*args, **kwargs)
452
        return wrapper_validate
453
    return validate_decorator
454