Passed
Pull Request — master (#460)
by Aldo
05:49
created

kytos.core.helpers._read_from_filename()   A

Complexity

Conditions 1

Size

Total Lines 4
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 3
nop 1
dl 0
loc 4
rs 10
c 0
b 0
f 0
1
"""Utilities functions used in Kytos."""
2
import functools
3
import logging
4
import traceback
5
from asyncio import AbstractEventLoop
6
from collections import defaultdict
7
from concurrent.futures import ThreadPoolExecutor
8
from datetime import datetime, timezone
9
from pathlib import Path
10
from threading import Thread
11
12
from openapi_core import OpenAPI
13
from openapi_core.contrib.starlette import StarletteOpenAPIRequest
14
from openapi_core.unmarshalling.request.datatypes import RequestUnmarshalResult
15
16
from kytos.core.apm import ElasticAPM
17
from kytos.core.config import KytosConfig
18
from kytos.core.rest_api import (HTTPException, Request,
19
                                 content_type_json_or_415, get_body)
20
21
__all__ = ['listen_to', 'now', 'run_on_thread', 'get_time']
22
23
LOG = logging.getLogger(__name__)
24
25
26
def get_thread_pool_max_workers():
27
    """Get the number of thread pool max workers."""
28
    return KytosConfig().options["daemon"].thread_pool_max_workers
29
30
31
def get_apm_name():
32
    """Get apm backend name."""
33
    return KytosConfig().options["daemon"].apm
34
35
36
# pylint: disable=invalid-name
37
executors = {name: ThreadPoolExecutor(max_workers=max_workers,
38
                                      thread_name_prefix=f"thread_pool_{name}")
39
             for name, max_workers in get_thread_pool_max_workers().items()}
40
41
ds_executors = defaultdict(lambda: ThreadPoolExecutor(max_workers=1,
42
                           thread_name_prefix="dynamic_single"))
43
44
45
def listen_to(event, *events, pool=None):
46
    """Decorate Event Listener methods.
47
48
    This decorator was built to be used on NAPPs methods to define which
49
    type of event the method will handle. With this, we will be able to
50
    'schedule' the app/method to receive an event when a new event is
51
    registered on the controller buffers.
52
    By using the run_on_thread decorator, we also guarantee that the method
53
    (handler) will be called from inside a new thread, avoiding this method to
54
    block its caller.
55
56
    The decorator will add an attribute to the method called 'events', that
57
    will be a list of the events that the method will handle.
58
59
    The event that will be listened to is always a string, but it can represent
60
    a regular expression to match against multiple Event Types. All listened
61
    events are documented in :doc:`/developer/listened_events` section.
62
63
    The pool option gives you control on which ThreadPoolExecutor that will
64
    execute the decorated handler. This knob is meant for prioritization,
65
    allowing executions on different pools depending on the handler primary
66
    responsibility and importance to avoid potential scheduling starvation.
67
68
    Example of usage:
69
70
    .. code-block:: python3
71
72
        class MyAppClass(KytosApp):
73
            @listen_to('kytos/of_core.messages.in')
74
            def my_handler_of_message_in(self, event):
75
                # Do stuff here...
76
77
            @listen_to('kytos/of_core.messages.out')
78
            def my_handler_of_message_out(self, event):
79
                # Do stuff here...
80
81
            @listen_to('kytos/of_core.messages.in.ofpt_hello',
82
                       'kytos/of_core.messages.out.ofpt_hello')
83
            def my_handler_of_hello_messages(self, event):
84
                # Do stuff here...
85
86
            @listen_to('kytos/of_core.message.*.hello')
87
            def my_other_handler_of_hello_messages(self, event):
88
                # Do stuff here...
89
90
            @listen_to('kytos/of_core.message.*.hello')
91
            def my_handler_of_hello_messages(self, event):
92
                # Do stuff here...
93
94
            @listen_to('kytos/of_core.message.*')
95
            def my_stats_handler_of_any_message(self, event):
96
                # Do stuff here...
97
98
            @listen_to("some_db_oriented_event", pool="db")
99
            def db_update(self, event):
100
                # Do stuff here...
101
    """
102
    def thread_decorator(handler):
103
        """Decorate the handler method.
104
105
        Returns:
106
            A method with an `events` attribute (list of events to be listened)
107
            and also decorated to run on a new thread.
108
109
        """
110
        @run_on_thread
111
        def threaded_handler(*args):
112
            """Decorate the handler to run from a new thread."""
113
            handler(*args)
114
115
        threaded_handler.events = [event]
116
        threaded_handler.events.extend(events)
117
        return threaded_handler
118
119
    # pylint: disable=broad-except
120
    def thread_pool_decorator(handler):
121
        """Decorate the handler method.
122
123
        Returns:
124
            A method with an `events` attribute (list of events to be listened)
125
            and also decorated to run on in the thread pool
126
127
        """
128
        def done_callback(future):
129
            """Done callback."""
130
            if not future.exception():
131
                _ = future.result()
132
                return
133
134
        # pylint: disable=unused-argument
135 View Code Duplication
        def handler_context(*args, **kwargs):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
136
            """Handler's context for ThreadPool."""
137
            cls, kytos_event = args[0], args[1]
138
            try:
139
                result = handler(*args)
140
            except Exception:
141
                result = None
142
                traceback_str = traceback.format_exc().replace("\n", ", ")
143
                LOG.error(f"listen_to handler: {handler}, "
144
                          f"args: {args} traceback: {traceback_str}")
145
                if hasattr(cls, "controller"):
146
                    cls.controller.dead_letter.add_event(kytos_event)
147
            return result
148
149 View Code Duplication
        def handler_context_apm(*args, apm_client=None):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
150
            """Handler's context for ThreadPool APM instrumentation."""
151
            cls, kytos_event = args[0], args[1]
152
            trace_parent = kytos_event.trace_parent
153
            tx_type = "kytos_event"
154
            tx = apm_client.begin_transaction(transaction_type=tx_type,
155
                                              trace_parent=trace_parent)
156
            kytos_event.trace_parent = tx.trace_parent
157
            tx.name = f"{kytos_event.name}@{cls.napp_id}"
158
            try:
159
                result = handler(*args)
160
                tx.result = result
161
            except Exception as exc:
162
                result = None
163
                traceback_str = traceback.format_exc().replace("\n", ", ")
164
                LOG.error(f"listen_to handler: {handler}, "
165
                          f"args: {args} traceback: {traceback_str}")
166
                if hasattr(cls, "controller"):
167
                    cls.controller.dead_letter.add_event(kytos_event)
168
                apm_client.capture_exception(
169
                    exc_info=(type(exc), exc, exc.__traceback__),
170
                    context={"args": args},
171
                    handled=False,
172
                )
173
            tx.end()
174
            apm_client.tracer.queue_func("transaction", tx.to_dict())
175
            return result
176
177
        handler_func, kwargs = handler_context, {}
178
        if get_apm_name() == "es":
179
            handler_func = handler_context_apm
180
            kwargs = {"apm_client": ElasticAPM.get_client()}
181
182
        def get_executor(pool, event, default_pool="app", handler=handler):
183
            """Get executor."""
184
            if pool == "dynamic_single":
185
                return ds_executors[handler]
186
            if pool and pool in executors:
187
                return executors[pool]
188
            if not event:
189
                return executors[default_pool]
190
191
            if event.name.startswith("kytos/of_core") and "sb" in executors:
192
                return executors["sb"]
193
            core_of = "kytos/core.openflow"
194
            if event.name.startswith(core_of) and "sb" in executors:
195
                return executors["sb"]
196
            return executors[default_pool]
197
198
        def inner(*args):
199
            """Decorate the handler to run in the thread pool."""
200
            event = args[1] if len(args) > 1 else None
201
            executor = get_executor(pool, event)
202
            future = executor.submit(handler_func, *args, **kwargs)
203
            future.add_done_callback(done_callback)
204
205
        inner.events = [event]
206
        inner.events.extend(events)
207
        return inner
208
209
    if executors:
210
        return thread_pool_decorator
211
    return thread_decorator
212
213
214
def alisten_to(event, *events):
215
    """Decorate async subscribing methods."""
216
217
    def decorator(handler):
218
        """Decorate the handler method.
219
220
        Returns:
221
            A method with an `events` attribute (list of events to be listened)
222
            and also decorated as an asyncio task.
223
        """
224
        # pylint: disable=unused-argument,broad-except
225 View Code Duplication
        async def handler_context(*args, **kwargs):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
226
            """Async handler's execution context."""
227
            cls, kytos_event = args[0], args[1]
228
            try:
229
                result = await handler(*args)
230
            except Exception:
231
                result = None
232
                traceback_str = traceback.format_exc().replace("\n", ", ")
233
                LOG.error(f"alisten_to handler: {handler}, "
234
                          f"args: {args} traceback: {traceback_str}")
235
                if hasattr(cls, "controller"):
236
                    cls.controller.dead_letter.add_event(kytos_event)
237
            return result
238
239 View Code Duplication
        async def handler_context_apm(*args, apm_client=None):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
240
            """Async handler's execution context with APM instrumentation."""
241
            cls, kytos_event = args[0], args[1]
242
            trace_parent = kytos_event.trace_parent
243
            tx_type = "kytos_event"
244
            tx = apm_client.begin_transaction(transaction_type=tx_type,
245
                                              trace_parent=trace_parent)
246
            kytos_event.trace_parent = tx.trace_parent
247
            tx.name = f"{kytos_event.name}@{cls.napp_id}"
248
            try:
249
                result = await handler(*args)
250
                tx.result = result
251
            except Exception as exc:
252
                result = None
253
                traceback_str = traceback.format_exc().replace("\n", ", ")
254
                LOG.error(f"alisten_to handler: {handler}, "
255
                          f"args: {args} traceback: {traceback_str}")
256
                if hasattr(cls, "controller"):
257
                    cls.controller.dead_letter.add_event(kytos_event)
258
                apm_client.capture_exception(
259
                    exc_info=(type(exc), exc, exc.__traceback__),
260
                    context={"args": args},
261
                    handled=False,
262
                )
263
            tx.end()
264
            apm_client.tracer.queue_func("transaction", tx.to_dict())
265
            return result
266
267
        handler_func, kwargs = handler_context, {}
268
        if get_apm_name() == "es":
269
            handler_func = handler_context_apm
270
            kwargs = {"apm_client": ElasticAPM.get_client()}
271
272
        async def inner(*args):
273
            """Inner decorated with events attribute."""
274
            return await handler_func(*args, **kwargs)
275
        inner.events = [event]
276
        inner.events.extend(events)
277
        return inner
278
279
    return decorator
280
281
282
def now(tzone=timezone.utc):
283
    """Return the current datetime (default to UTC).
284
285
    Args:
286
        tzone (datetime.timezone): Specific time zone used in datetime.
287
288
    Returns:
289
        datetime.datetime.now: Date time with specific time zone.
290
291
    """
292
    return datetime.now(tzone)
293
294
295
def run_on_thread(method):
296
    """Decorate to run the decorated method inside a new thread.
297
298
    Args:
299
        method (function): function used to run as a new thread.
300
301
    Returns:
302
        Decorated method that will run inside a new thread.
303
        When the decorated method is called, it will not return the created
304
        thread to the caller.
305
306
    """
307
    def threaded_method(*args):
308
        """Ensure the handler method runs inside a new thread."""
309
        thread = Thread(target=method, args=args)
310
311
        # Set daemon mode so that we don't have to wait for these threads
312
        # to finish when exiting Kytos
313
        thread.daemon = True
314
        thread.start()
315
    return threaded_method
316
317
318
def get_time(data=None):
319
    """Receive a dictionary or a string and return a datatime instance.
320
321
    data = {"year": 2006,
322
            "month": 11,
323
            "day": 21,
324
            "hour": 16,
325
            "minute": 30 ,
326
            "second": 00}
327
328
    or
329
330
    data = "21/11/06 16:30:00"
331
332
    2018-04-17T17:13:50Z
333
334
    Args:
335
        data (str, dict): python dict or string to be converted to datetime
336
337
    Returns:
338
        datetime: datetime instance.
339
340
    """
341
    if isinstance(data, str):
342
        date = datetime.strptime(data, "%Y-%m-%dT%H:%M:%S")
343
    elif isinstance(data, dict):
344
        date = datetime(**data)
345
    else:
346
        return None
347
    return date.replace(tzinfo=timezone.utc)
348
349
350
def load_spec(yml_file_path: Path):
351
    """Load and validate spec object given a yml file path."""
352
    return OpenAPI.from_file_path(yml_file_path)
353
354
355
def _request_validation_result_or_400(result: RequestUnmarshalResult) -> None:
356
    """Request validation result or raise HTTP 400."""
357
    if not result.errors:
358
        return
359
    error_response = (
360
        "The request body contains invalid API data."
361
    )
362
    errors = result.errors[0]
363
    if not errors.__cause__:
364
        error_response = str(errors)
365
    elif hasattr(errors.__cause__, "schema_errors"):
366
        errors = errors.__cause__
367
        schema_errors = errors.schema_errors[0]
368
        error_log = {
369
            "error_message": schema_errors.message,
370
            "error_validator": schema_errors.validator,
371
            "error_validator_value": schema_errors.validator_value,
372
            "error_path": list(schema_errors.path),
373
            "error_schema": schema_errors.schema,
374
            "error_schema_path": list(schema_errors.schema_path),
375
        }
376
        LOG.debug(f"Invalid request (API schema): {error_log}")
377
        error_response += f" {schema_errors.message} for field"
378
        error_response += (
379
            f" {'/'.join(map(str,schema_errors.path))}."
380
        )
381
    else:
382
        error_response = str(errors.__cause__)
383
    raise HTTPException(400, detail=error_response)
384
385
386
def validate_openapi_request(
387
    spec: OpenAPI, request: Request, loop: AbstractEventLoop
388
) -> bytes:
389
    """Validate a Request given an OpenAPI spec.
390
391
    This function is meant to be called from a synchronous context
392
    since StarletteOpenAPIRequest internally uses `asgiref.sync.AsyncToSync`
393
    and its forcing not to use the current running event loop.
394
    """
395
    body = get_body(request, loop)
396
    if body:
397
        content_type_json_or_415(request)
398
    openapi_request = StarletteOpenAPIRequest(request, body)
399
    result = spec.unmarshal_request(openapi_request)
400
    _request_validation_result_or_400(result)
401
    return body
402
403
404
async def avalidate_openapi_request(
405
        spec: OpenAPI,
406
        request: Request,
407
) -> bytes:
408
    """Async validate_openapi_request.
409
410
    This function is for async routes. It also returns the request body bytes.
411
    It does not try to assume that it'll have a loadable json body to work
412
    seamlessly with as many type of endpoints with minimal friction.
413
    You can use await aget_json_or_400(request) to get the request body.
414
415
    Example:
416
417
    await avalidate_openapi_request(self.spec, request)
418
    body = await aget_json_or_400(request)
419
    """
420
    body = await request.body()
421
    if body:
422
        content_type_json_or_415(request)
423
    openapi_request = StarletteOpenAPIRequest(request, body)
424
    result = spec.unmarshal_request(openapi_request)
425
    _request_validation_result_or_400(result)
426
    return body
427
428
429
def validate_openapi(spec):
430
    """Decorator to validate a REST endpoint input.
431
432
    Uses the schema defined in the openapi.yml file
433
    to validate.
434
    """
435
    def validate_decorator(func):
436
        @functools.wraps(func)
437
        def wrapper_validate(*args, **kwargs):
438
            request: Request = None
439
            napp = None
440
            for arg in args:
441
                if isinstance(arg, Request):
442
                    request = arg
443
                if hasattr(arg, "controller"):
444
                    napp = arg
445
                if request and napp:
446
                    break
447
            if not request:
448
                err = f"{func.__name__} args doesn't have a Request argument"
449
                raise RuntimeError(err)
450
            if not napp:
451
                err = f"{func.__name__} should be a NApp method to get ev_loop"
452
                raise RuntimeError(err)
453
            validate_openapi_request(spec, request, napp.controller.loop)
454
            return func(*args, **kwargs)
455
        return wrapper_validate
456
    return validate_decorator
457