Passed
Pull Request — master (#375)
by Vinicius
08:19
created

kytos.core.helpers._read_from_filename()   A

Complexity

Conditions 1

Size

Total Lines 4
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 3
nop 1
dl 0
loc 4
rs 10
c 0
b 0
f 0
1
"""Utilities functions used in Kytos."""
2
import functools
3
import logging
4
import traceback
5
from asyncio import AbstractEventLoop
6
from concurrent.futures import ThreadPoolExecutor
7
from datetime import datetime, timezone
8
from pathlib import Path
9
from threading import Thread
10
11
from openapi_core.spec import Spec
12
from openapi_core.spec.shortcuts import create_spec
13
from openapi_core.validation.request import openapi_request_validator
14
from openapi_core.validation.request.datatypes import RequestValidationResult
15
from openapi_spec_validator import validate_spec
16
from openapi_spec_validator.readers import read_from_filename
17
18
from kytos.core.apm import ElasticAPM
19
from kytos.core.config import KytosConfig
20
from kytos.core.rest_api import (AStarletteOpenAPIRequest, HTTPException,
21
                                 Request, StarletteOpenAPIRequest,
22
                                 content_type_json_or_415, get_body)
23
24
__all__ = ['listen_to', 'now', 'run_on_thread', 'get_time']
25
26
LOG = logging.getLogger(__name__)
27
28
29
def get_thread_pool_max_workers():
30
    """Get the number of thread pool max workers."""
31
    return KytosConfig().options["daemon"].thread_pool_max_workers
32
33
34
def get_apm_name():
35
    """Get apm backend name."""
36
    return KytosConfig().options["daemon"].apm
37
38
39
# pylint: disable=invalid-name
40
executors = {name: ThreadPoolExecutor(max_workers=max_workers,
41
                                      thread_name_prefix=f"thread_pool_{name}")
42
             for name, max_workers in get_thread_pool_max_workers().items()}
43
44
45
def listen_to(event, *events, pool=None):
46
    """Decorate Event Listener methods.
47
48
    This decorator was built to be used on NAPPs methods to define which
49
    type of event the method will handle. With this, we will be able to
50
    'schedule' the app/method to receive an event when a new event is
51
    registered on the controller buffers.
52
    By using the run_on_thread decorator, we also guarantee that the method
53
    (handler) will be called from inside a new thread, avoiding this method to
54
    block its caller.
55
56
    The decorator will add an attribute to the method called 'events', that
57
    will be a list of the events that the method will handle.
58
59
    The event that will be listened to is always a string, but it can represent
60
    a regular expression to match against multiple Event Types. All listened
61
    events are documented in :doc:`/developer/listened_events` section.
62
63
    The pool option gives you control on which ThreadPoolExecutor that will
64
    execute the decorated handler. This knob is meant for prioritization,
65
    allowing executions on different pools depending on the handler primary
66
    responsibility and importance to avoid potential scheduling starvation.
67
68
    Example of usage:
69
70
    .. code-block:: python3
71
72
        class MyAppClass(KytosApp):
73
            @listen_to('kytos/of_core.messages.in')
74
            def my_handler_of_message_in(self, event):
75
                # Do stuff here...
76
77
            @listen_to('kytos/of_core.messages.out')
78
            def my_handler_of_message_out(self, event):
79
                # Do stuff here...
80
81
            @listen_to('kytos/of_core.messages.in.ofpt_hello',
82
                       'kytos/of_core.messages.out.ofpt_hello')
83
            def my_handler_of_hello_messages(self, event):
84
                # Do stuff here...
85
86
            @listen_to('kytos/of_core.message.*.hello')
87
            def my_other_handler_of_hello_messages(self, event):
88
                # Do stuff here...
89
90
            @listen_to('kytos/of_core.message.*.hello')
91
            def my_handler_of_hello_messages(self, event):
92
                # Do stuff here...
93
94
            @listen_to('kytos/of_core.message.*')
95
            def my_stats_handler_of_any_message(self, event):
96
                # Do stuff here...
97
98
            @listen_to("some_db_oriented_event", pool="db")
99
            def db_update(self, event):
100
                # Do stuff here...
101
    """
102
    def thread_decorator(handler):
103
        """Decorate the handler method.
104
105
        Returns:
106
            A method with an `events` attribute (list of events to be listened)
107
            and also decorated to run on a new thread.
108
109
        """
110
        @run_on_thread
111
        def threaded_handler(*args):
112
            """Decorate the handler to run from a new thread."""
113
            handler(*args)
114
115
        threaded_handler.events = [event]
116
        threaded_handler.events.extend(events)
117
        return threaded_handler
118
119
    # pylint: disable=broad-except
120
    def thread_pool_decorator(handler):
121
        """Decorate the handler method.
122
123
        Returns:
124
            A method with an `events` attribute (list of events to be listened)
125
            and also decorated to run on in the thread pool
126
127
        """
128
        def done_callback(future):
129
            """Done callback."""
130
            if not future.exception():
131
                _ = future.result()
132
                return
133
134
        # pylint: disable=unused-argument
135 View Code Duplication
        def handler_context(*args, **kwargs):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
136
            """Handler's context for ThreadPool."""
137
            cls, kytos_event = args[0], args[1]
138
            try:
139
                result = handler(*args)
140
            except Exception:
141
                result = None
142
                traceback_str = traceback.format_exc().replace("\n", ", ")
143
                LOG.error(f"listen_to handler: {handler}, "
144
                          f"args: {args} traceback: {traceback_str}")
145
                if hasattr(cls, "controller"):
146
                    cls.controller.dead_letter.add_event(kytos_event)
147
            return result
148
149 View Code Duplication
        def handler_context_apm(*args, apm_client=None):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
150
            """Handler's context for ThreadPool APM instrumentation."""
151
            cls, kytos_event = args[0], args[1]
152
            trace_parent = kytos_event.trace_parent
153
            tx_type = "kytos_event"
154
            tx = apm_client.begin_transaction(transaction_type=tx_type,
155
                                              trace_parent=trace_parent)
156
            kytos_event.trace_parent = tx.trace_parent
157
            tx.name = f"{kytos_event.name}@{cls.napp_id}"
158
            try:
159
                result = handler(*args)
160
                tx.result = result
161
            except Exception as exc:
162
                result = None
163
                traceback_str = traceback.format_exc().replace("\n", ", ")
164
                LOG.error(f"listen_to handler: {handler}, "
165
                          f"args: {args} traceback: {traceback_str}")
166
                if hasattr(cls, "controller"):
167
                    cls.controller.dead_letter.add_event(kytos_event)
168
                apm_client.capture_exception(
169
                    exc_info=(type(exc), exc, exc.__traceback__),
170
                    context={"args": args},
171
                    handled=False,
172
                )
173
            tx.end()
174
            apm_client.tracer.queue_func("transaction", tx.to_dict())
175
            return result
176
177
        handler_func, kwargs = handler_context, {}
178
        if get_apm_name() == "es":
179
            handler_func = handler_context_apm
180
            kwargs = dict(apm_client=ElasticAPM.get_client())
181
182
        def get_executor(pool, event, default_pool="app"):
183
            """Get executor."""
184
            if pool:
185
                return executors[pool]
186
            if not event:
187
                return executors[default_pool]
188
189
            if event.name.startswith("kytos/of_core") and "sb" in executors:
190
                return executors["sb"]
191
            core_of = "kytos/core.openflow"
192
            if event.name.startswith(core_of) and "sb" in executors:
193
                return executors["sb"]
194
            if event.name.startswith("kytos.storehouse") and "db" in executors:
195
                return executors["db"]
196
            return executors[default_pool]
197
198
        def inner(*args):
199
            """Decorate the handler to run in the thread pool."""
200
            event = args[1] if len(args) > 1 else None
201
            executor = get_executor(pool, event)
202
            future = executor.submit(handler_func, *args, **kwargs)
203
            future.add_done_callback(done_callback)
204
205
        inner.events = [event]
206
        inner.events.extend(events)
207
        return inner
208
209
    if executors:
210
        return thread_pool_decorator
211
    return thread_decorator
212
213
214
def alisten_to(event, *events):
215
    """Decorate async subscribing methods."""
216
217
    def decorator(handler):
218
        """Decorate the handler method.
219
220
        Returns:
221
            A method with an `events` attribute (list of events to be listened)
222
            and also decorated as an asyncio task.
223
        """
224
        # pylint: disable=unused-argument,broad-except
225 View Code Duplication
        async def handler_context(*args, **kwargs):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
226
            """Async handler's execution context."""
227
            cls, kytos_event = args[0], args[1]
228
            try:
229
                result = await handler(*args)
230
            except Exception:
231
                result = None
232
                traceback_str = traceback.format_exc().replace("\n", ", ")
233
                LOG.error(f"alisten_to handler: {handler}, "
234
                          f"args: {args} traceback: {traceback_str}")
235
                if hasattr(cls, "controller"):
236
                    cls.controller.dead_letter.add_event(kytos_event)
237
            return result
238
239 View Code Duplication
        async def handler_context_apm(*args, apm_client=None):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
240
            """Async handler's execution context with APM instrumentation."""
241
            cls, kytos_event = args[0], args[1]
242
            trace_parent = kytos_event.trace_parent
243
            tx_type = "kytos_event"
244
            tx = apm_client.begin_transaction(transaction_type=tx_type,
245
                                              trace_parent=trace_parent)
246
            kytos_event.trace_parent = tx.trace_parent
247
            tx.name = f"{kytos_event.name}@{cls.napp_id}"
248
            try:
249
                result = await handler(*args)
250
                tx.result = result
251
            except Exception as exc:
252
                result = None
253
                traceback_str = traceback.format_exc().replace("\n", ", ")
254
                LOG.error(f"alisten_to handler: {handler}, "
255
                          f"args: {args} traceback: {traceback_str}")
256
                if hasattr(cls, "controller"):
257
                    cls.controller.dead_letter.add_event(kytos_event)
258
                apm_client.capture_exception(
259
                    exc_info=(type(exc), exc, exc.__traceback__),
260
                    context={"args": args},
261
                    handled=False,
262
                )
263
            tx.end()
264
            apm_client.tracer.queue_func("transaction", tx.to_dict())
265
            return result
266
267
        handler_func, kwargs = handler_context, {}
268
        if get_apm_name() == "es":
269
            handler_func = handler_context_apm
270
            kwargs = dict(apm_client=ElasticAPM.get_client())
271
272
        async def inner(*args):
273
            """Inner decorated with events attribute."""
274
            return await handler_func(*args, **kwargs)
275
        inner.events = [event]
276
        inner.events.extend(events)
277
        return inner
278
279
    return decorator
280
281
282
def now(tzone=timezone.utc):
283
    """Return the current datetime (default to UTC).
284
285
    Args:
286
        tzone (datetime.timezone): Specific time zone used in datetime.
287
288
    Returns:
289
        datetime.datetime.now: Date time with specific time zone.
290
291
    """
292
    return datetime.now(tzone)
293
294
295
def run_on_thread(method):
296
    """Decorate to run the decorated method inside a new thread.
297
298
    Args:
299
        method (function): function used to run as a new thread.
300
301
    Returns:
302
        Decorated method that will run inside a new thread.
303
        When the decorated method is called, it will not return the created
304
        thread to the caller.
305
306
    """
307
    def threaded_method(*args):
308
        """Ensure the handler method runs inside a new thread."""
309
        thread = Thread(target=method, args=args)
310
311
        # Set daemon mode so that we don't have to wait for these threads
312
        # to finish when exiting Kytos
313
        thread.daemon = True
314
        thread.start()
315
    return threaded_method
316
317
318
def get_time(data=None):
319
    """Receive a dictionary or a string and return a datatime instance.
320
321
    data = {"year": 2006,
322
            "month": 11,
323
            "day": 21,
324
            "hour": 16,
325
            "minute": 30 ,
326
            "second": 00}
327
328
    or
329
330
    data = "21/11/06 16:30:00"
331
332
    2018-04-17T17:13:50Z
333
334
    Args:
335
        data (str, dict): python dict or string to be converted to datetime
336
337
    Returns:
338
        datetime: datetime instance.
339
340
    """
341
    if isinstance(data, str):
342
        date = datetime.strptime(data, "%Y-%m-%dT%H:%M:%S")
343
    elif isinstance(data, dict):
344
        date = datetime(**data)
345
    else:
346
        return None
347
    return date.replace(tzinfo=timezone.utc)
348
349
350
def _read_from_filename(yml_file_path: Path) -> dict:
351
    """Read from yml filename."""
352
    spec_dict, _ = read_from_filename(yml_file_path)
353
    return spec_dict
354
355
356
def load_spec(yml_file_path: Path):
357
    """Load and validate spec object given a yml file path."""
358
    spec_dict = _read_from_filename(yml_file_path)
359
    validate_spec(spec_dict)
360
    return create_spec(spec_dict)
361
362
363
def _request_validation_result_or_400(result: RequestValidationResult) -> None:
364
    """Request validation result or raise HTTP 400."""
365
    if not result.errors:
366
        return
367
    error_response = (
368
        "The request body contains invalid API data."
369
    )
370
    errors = result.errors[0]
371
    if hasattr(errors, "schema_errors"):
372
        schema_errors = errors.schema_errors[0]
373
        error_log = {
374
            "error_message": schema_errors.message,
375
            "error_validator": schema_errors.validator,
376
            "error_validator_value": schema_errors.validator_value,
377
            "error_path": list(schema_errors.path),
378
            "error_schema": schema_errors.schema,
379
            "error_schema_path": list(schema_errors.schema_path),
380
        }
381
        LOG.debug(f"Invalid request (API schema): {error_log}")
382
        error_response += f" {schema_errors.message} for field"
383
        error_response += (
384
            f" {'/'.join(map(str,schema_errors.path))}."
385
        )
386
    else:
387
        error_response = str(errors)
388
    raise HTTPException(400, detail=error_response)
389
390
391
def validate_openapi_request(
392
    spec: Spec, request: Request, loop: AbstractEventLoop
393
) -> bytes:
394
    """Validate a Request given an OpenAPI spec.
395
396
    This function is meant to be called from a synchronous context
397
    since StarletteOpenAPIRequest internally uses `asgiref.sync.AsyncToSync`
398
    and its forcing not to use the current running event loop.
399
    """
400
    body = get_body(request, loop)
401
    if body:
402
        content_type_json_or_415(request)
403
    openapi_request = StarletteOpenAPIRequest(request, body)
404
    result = openapi_request_validator.validate(spec, openapi_request)
405
    _request_validation_result_or_400(result)
406
    return body
407
408
409
async def avalidate_openapi_request(
410
        spec: Spec,
411
        request: Request,
412
) -> bytes:
413
    """Async validate_openapi_request.
414
415
    This function is for async routes. It also returns the request body bytes.
416
    It does not try to assume that it'll have a loadable json body to work
417
    seamlessly with as many type of endpoints with minimal friction.
418
    You can use await aget_json_or_400(request) to get the request body.
419
420
    Example:
421
422
    await avalidate_openapi_request(self.spec, request)
423
    body = await aget_json_or_400(request)
424
    """
425
    body = await request.body()
426
    if body:
427
        content_type_json_or_415(request)
428
    openapi_request = AStarletteOpenAPIRequest(request, body)
429
    result = openapi_request_validator.validate(spec, openapi_request)
430
    _request_validation_result_or_400(result)
431
    return body
432
433
434
def validate_openapi(spec):
435
    """Decorator to validate a REST endpoint input.
436
437
    Uses the schema defined in the openapi.yml file
438
    to validate.
439
    """
440
    def validate_decorator(func):
441
        @functools.wraps(func)
442
        def wrapper_validate(*args, **kwargs):
443
            request: Request = None
444
            napp = None
445
            for arg in args:
446
                if isinstance(arg, Request):
447
                    request = arg
448
                if hasattr(arg, "controller"):
449
                    napp = arg
450
                if request and napp:
451
                    break
452
            if not request:
453
                err = f"{func.__name__} args doesn't have a Request argument"
454
                raise RuntimeError(err)
455
            if not napp:
456
                err = f"{func.__name__} should be a NApp method to get ev_loop"
457
                raise RuntimeError(err)
458
            validate_openapi_request(spec, request, napp.controller.loop)
459
            return func(*args, **kwargs)
460
        return wrapper_validate
461
    return validate_decorator
462