Test Failed
Pull Request — master (#375)
by Vinicius
05:01
created

kytos.core.helpers.validate_openapi()   A

Complexity

Conditions 4

Size

Total Lines 21
Code Lines 15

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 4
eloc 15
nop 1
dl 0
loc 21
rs 9.65
c 0
b 0
f 0
1
"""Utilities functions used in Kytos."""
2
import functools
3
import logging
4
import traceback
5
from concurrent.futures import ThreadPoolExecutor
6
from datetime import datetime, timezone
7
from pathlib import Path
8
from threading import Thread
9
10
from openapi_core.spec import Spec
11
from openapi_core.spec.shortcuts import create_spec
12
from openapi_core.validation.request import openapi_request_validator
13
from openapi_core.validation.request.datatypes import RequestValidationResult
14
from openapi_spec_validator import validate_spec
15
from openapi_spec_validator.readers import read_from_filename
16
17
from kytos.core.apm import ElasticAPM
18
from kytos.core.config import KytosConfig
19
from kytos.core.rest_api import (AStarletteOpenAPIRequest, HTTPException,
20
                                 Request, StarletteOpenAPIRequest,
21
                                 content_type_json_or_415, get_body)
22
23
__all__ = ['listen_to', 'now', 'run_on_thread', 'get_time']
24
25
LOG = logging.getLogger(__name__)
26
27
28
def get_thread_pool_max_workers():
29
    """Get the number of thread pool max workers."""
30
    return KytosConfig().options["daemon"].thread_pool_max_workers
31
32
33
def get_apm_name():
34
    """Get apm backend name."""
35
    return KytosConfig().options["daemon"].apm
36
37
38
# pylint: disable=invalid-name
39
executors = {name: ThreadPoolExecutor(max_workers=max_workers,
40
                                      thread_name_prefix=f"thread_pool_{name}")
41
             for name, max_workers in get_thread_pool_max_workers().items()}
42
43
44
def listen_to(event, *events, pool=None):
45
    """Decorate Event Listener methods.
46
47
    This decorator was built to be used on NAPPs methods to define which
48
    type of event the method will handle. With this, we will be able to
49
    'schedule' the app/method to receive an event when a new event is
50
    registered on the controller buffers.
51
    By using the run_on_thread decorator, we also guarantee that the method
52
    (handler) will be called from inside a new thread, avoiding this method to
53
    block its caller.
54
55
    The decorator will add an attribute to the method called 'events', that
56
    will be a list of the events that the method will handle.
57
58
    The event that will be listened to is always a string, but it can represent
59
    a regular expression to match against multiple Event Types. All listened
60
    events are documented in :doc:`/developer/listened_events` section.
61
62
    The pool option gives you control on which ThreadPoolExecutor that will
63
    execute the decorated handler. This knob is meant for prioritization,
64
    allowing executions on different pools depending on the handler primary
65
    responsibility and importance to avoid potential scheduling starvation.
66
67
    Example of usage:
68
69
    .. code-block:: python3
70
71
        class MyAppClass(KytosApp):
72
            @listen_to('kytos/of_core.messages.in')
73
            def my_handler_of_message_in(self, event):
74
                # Do stuff here...
75
76
            @listen_to('kytos/of_core.messages.out')
77
            def my_handler_of_message_out(self, event):
78
                # Do stuff here...
79
80
            @listen_to('kytos/of_core.messages.in.ofpt_hello',
81
                       'kytos/of_core.messages.out.ofpt_hello')
82
            def my_handler_of_hello_messages(self, event):
83
                # Do stuff here...
84
85
            @listen_to('kytos/of_core.message.*.hello')
86
            def my_other_handler_of_hello_messages(self, event):
87
                # Do stuff here...
88
89
            @listen_to('kytos/of_core.message.*.hello')
90
            def my_handler_of_hello_messages(self, event):
91
                # Do stuff here...
92
93
            @listen_to('kytos/of_core.message.*')
94
            def my_stats_handler_of_any_message(self, event):
95
                # Do stuff here...
96
97
            @listen_to("some_db_oriented_event", pool="db")
98
            def db_update(self, event):
99
                # Do stuff here...
100
    """
101
    def thread_decorator(handler):
102
        """Decorate the handler method.
103
104
        Returns:
105
            A method with an `events` attribute (list of events to be listened)
106
            and also decorated to run on a new thread.
107
108
        """
109
        @run_on_thread
110
        def threaded_handler(*args):
111
            """Decorate the handler to run from a new thread."""
112
            handler(*args)
113
114
        threaded_handler.events = [event]
115
        threaded_handler.events.extend(events)
116
        return threaded_handler
117
118
    # pylint: disable=broad-except
119
    def thread_pool_decorator(handler):
120
        """Decorate the handler method.
121
122
        Returns:
123
            A method with an `events` attribute (list of events to be listened)
124
            and also decorated to run on in the thread pool
125
126
        """
127
        def done_callback(future):
128
            """Done callback."""
129
            if not future.exception():
130
                _ = future.result()
131
                return
132
133
        # pylint: disable=unused-argument
134 View Code Duplication
        def handler_context(*args, **kwargs):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
135
            """Handler's context for ThreadPool."""
136
            cls, kytos_event = args[0], args[1]
137
            try:
138
                result = handler(*args)
139
            except Exception:
140
                result = None
141
                traceback_str = traceback.format_exc().replace("\n", ", ")
142
                LOG.error(f"listen_to handler: {handler}, "
143
                          f"args: {args} traceback: {traceback_str}")
144
                if hasattr(cls, "controller"):
145
                    cls.controller.dead_letter.add_event(kytos_event)
146
            return result
147
148 View Code Duplication
        def handler_context_apm(*args, apm_client=None):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
149
            """Handler's context for ThreadPool APM instrumentation."""
150
            cls, kytos_event = args[0], args[1]
151
            trace_parent = kytos_event.trace_parent
152
            tx_type = "kytos_event"
153
            tx = apm_client.begin_transaction(transaction_type=tx_type,
154
                                              trace_parent=trace_parent)
155
            kytos_event.trace_parent = tx.trace_parent
156
            tx.name = f"{kytos_event.name}@{cls.napp_id}"
157
            try:
158
                result = handler(*args)
159
                tx.result = result
160
            except Exception as exc:
161
                result = None
162
                traceback_str = traceback.format_exc().replace("\n", ", ")
163
                LOG.error(f"listen_to handler: {handler}, "
164
                          f"args: {args} traceback: {traceback_str}")
165
                if hasattr(cls, "controller"):
166
                    cls.controller.dead_letter.add_event(kytos_event)
167
                apm_client.capture_exception(
168
                    exc_info=(type(exc), exc, exc.__traceback__),
169
                    context={"args": args},
170
                    handled=False,
171
                )
172
            tx.end()
173
            apm_client.tracer.queue_func("transaction", tx.to_dict())
174
            return result
175
176
        handler_func, kwargs = handler_context, {}
177
        if get_apm_name() == "es":
178
            handler_func = handler_context_apm
179
            kwargs = dict(apm_client=ElasticAPM.get_client())
180
181
        def get_executor(pool, event, default_pool="app"):
182
            """Get executor."""
183
            if pool:
184
                return executors[pool]
185
            if not event:
186
                return executors[default_pool]
187
188
            if event.name.startswith("kytos/of_core") and "sb" in executors:
189
                return executors["sb"]
190
            core_of = "kytos/core.openflow"
191
            if event.name.startswith(core_of) and "sb" in executors:
192
                return executors["sb"]
193
            if event.name.startswith("kytos.storehouse") and "db" in executors:
194
                return executors["db"]
195
            return executors[default_pool]
196
197
        def inner(*args):
198
            """Decorate the handler to run in the thread pool."""
199
            event = args[1] if len(args) > 1 else None
200
            executor = get_executor(pool, event)
201
            future = executor.submit(handler_func, *args, **kwargs)
202
            future.add_done_callback(done_callback)
203
204
        inner.events = [event]
205
        inner.events.extend(events)
206
        return inner
207
208
    if executors:
209
        return thread_pool_decorator
210
    return thread_decorator
211
212
213
def alisten_to(event, *events):
214
    """Decorate async subscribing methods."""
215
216
    def decorator(handler):
217
        """Decorate the handler method.
218
219
        Returns:
220
            A method with an `events` attribute (list of events to be listened)
221
            and also decorated as an asyncio task.
222
        """
223
        # pylint: disable=unused-argument,broad-except
224 View Code Duplication
        async def handler_context(*args, **kwargs):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
225
            """Async handler's execution context."""
226
            cls, kytos_event = args[0], args[1]
227
            try:
228
                result = await handler(*args)
229
            except Exception:
230
                result = None
231
                traceback_str = traceback.format_exc().replace("\n", ", ")
232
                LOG.error(f"alisten_to handler: {handler}, "
233
                          f"args: {args} traceback: {traceback_str}")
234
                if hasattr(cls, "controller"):
235
                    cls.controller.dead_letter.add_event(kytos_event)
236
            return result
237
238 View Code Duplication
        async def handler_context_apm(*args, apm_client=None):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
239
            """Async handler's execution context with APM instrumentation."""
240
            cls, kytos_event = args[0], args[1]
241
            trace_parent = kytos_event.trace_parent
242
            tx_type = "kytos_event"
243
            tx = apm_client.begin_transaction(transaction_type=tx_type,
244
                                              trace_parent=trace_parent)
245
            kytos_event.trace_parent = tx.trace_parent
246
            tx.name = f"{kytos_event.name}@{cls.napp_id}"
247
            try:
248
                result = await handler(*args)
249
                tx.result = result
250
            except Exception as exc:
251
                result = None
252
                traceback_str = traceback.format_exc().replace("\n", ", ")
253
                LOG.error(f"alisten_to handler: {handler}, "
254
                          f"args: {args} traceback: {traceback_str}")
255
                if hasattr(cls, "controller"):
256
                    cls.controller.dead_letter.add_event(kytos_event)
257
                apm_client.capture_exception(
258
                    exc_info=(type(exc), exc, exc.__traceback__),
259
                    context={"args": args},
260
                    handled=False,
261
                )
262
            tx.end()
263
            apm_client.tracer.queue_func("transaction", tx.to_dict())
264
            return result
265
266
        handler_func, kwargs = handler_context, {}
267
        if get_apm_name() == "es":
268
            handler_func = handler_context_apm
269
            kwargs = dict(apm_client=ElasticAPM.get_client())
270
271
        async def inner(*args):
272
            """Inner decorated with events attribute."""
273
            return await handler_func(*args, **kwargs)
274
        inner.events = [event]
275
        inner.events.extend(events)
276
        return inner
277
278
    return decorator
279
280
281
def now(tzone=timezone.utc):
282
    """Return the current datetime (default to UTC).
283
284
    Args:
285
        tzone (datetime.timezone): Specific time zone used in datetime.
286
287
    Returns:
288
        datetime.datetime.now: Date time with specific time zone.
289
290
    """
291
    return datetime.now(tzone)
292
293
294
def run_on_thread(method):
295
    """Decorate to run the decorated method inside a new thread.
296
297
    Args:
298
        method (function): function used to run as a new thread.
299
300
    Returns:
301
        Decorated method that will run inside a new thread.
302
        When the decorated method is called, it will not return the created
303
        thread to the caller.
304
305
    """
306
    def threaded_method(*args):
307
        """Ensure the handler method runs inside a new thread."""
308
        thread = Thread(target=method, args=args)
309
310
        # Set daemon mode so that we don't have to wait for these threads
311
        # to finish when exiting Kytos
312
        thread.daemon = True
313
        thread.start()
314
    return threaded_method
315
316
317
def get_time(data=None):
318
    """Receive a dictionary or a string and return a datatime instance.
319
320
    data = {"year": 2006,
321
            "month": 11,
322
            "day": 21,
323
            "hour": 16,
324
            "minute": 30 ,
325
            "second": 00}
326
327
    or
328
329
    data = "21/11/06 16:30:00"
330
331
    2018-04-17T17:13:50Z
332
333
    Args:
334
        data (str, dict): python dict or string to be converted to datetime
335
336
    Returns:
337
        datetime: datetime instance.
338
339
    """
340
    if isinstance(data, str):
341
        date = datetime.strptime(data, "%Y-%m-%dT%H:%M:%S")
342
    elif isinstance(data, dict):
343
        date = datetime(**data)
344
    else:
345
        return None
346
    return date.replace(tzinfo=timezone.utc)
347
348
349
def _read_from_filename(yml_file_path: Path) -> dict:
350
    """Read from yml filename."""
351
    spec_dict, _ = read_from_filename(yml_file_path)
352
    return spec_dict
353
354
355
def load_spec(yml_file_path: Path):
356
    """Load and validate spec object given a yml file path."""
357
    spec_dict = _read_from_filename(yml_file_path)
358
    validate_spec(spec_dict)
359
    return create_spec(spec_dict)
360
361
362
def _request_validation_result_or_400(result: RequestValidationResult) -> None:
363
    """Request validation result or raise HTTP 400."""
364
    if not result.errors:
365
        return
366
    error_response = (
367
        "The request body contains invalid API data."
368
    )
369
    errors = result.errors[0]
370
    if hasattr(errors, "schema_errors"):
371
        schema_errors = errors.schema_errors[0]
372
        error_log = {
373
            "error_message": schema_errors.message,
374
            "error_validator": schema_errors.validator,
375
            "error_validator_value": schema_errors.validator_value,
376
            "error_path": list(schema_errors.path),
377
            "error_schema": schema_errors.schema,
378
            "error_schema_path": list(schema_errors.schema_path),
379
        }
380
        LOG.debug(f"Invalid request (API schema): {error_log}")
381
        error_response += f" {schema_errors.message} for field"
382
        error_response += (
383
            f" {'/'.join(map(str,schema_errors.path))}."
384
        )
385
    else:
386
        error_response = str(errors)
387
    raise HTTPException(400, detail=error_response)
388
389
390
def validate_openapi_request(spec: Spec, request: Request) -> bytes:
391
    """Validate a Request given an OpenAPI spec.
392
393
    This function is meant to be called from a synchronous context
394
    since StarletteOpenAPIRequest internally uses `asgiref.sync.AsyncToSync`
395
    and its forcing not to use the current running event loop.
396
    """
397
    body = get_body(request)
398
    if body:
399
        content_type_json_or_415(request)
400
    openapi_request = StarletteOpenAPIRequest(request, body)
401
    result = openapi_request_validator.validate(spec, openapi_request)
402
    _request_validation_result_or_400(result)
403
    return body
404
405
406
async def avalidate_openapi_request(
407
        spec: Spec,
408
        request: Request,
409
) -> bytes:
410
    """Async validate_openapi_request.
411
412
    This function is for async routes. It also returns the request body bytes.
413
    It does not try to assume that it'll have a loadable json body to work
414
    seamlessly with as many type of endpoints with minimal friction.
415
    You can use await aget_json_or_400(request) to get the request body.
416
417
    Example:
418
419
    await avalidate_openapi_request(self.spec, request)
420
    body = await aget_json_or_400(request)
421
    """
422
    body = await request.body()
423
    if body:
424
        content_type_json_or_415(request)
425
    openapi_request = AStarletteOpenAPIRequest(request, body)
426
    result = openapi_request_validator.validate(spec, openapi_request)
427
    _request_validation_result_or_400(result)
428
    return body
429
430
431
def validate_openapi(spec):
432
    """Decorator to validate a REST endpoint input.
433
434
    Uses the schema defined in the openapi.yml file
435
    to validate.
436
    """
437
    def validate_decorator(func):
438
        @functools.wraps(func)
439
        def wrapper_validate(*args, **kwargs):
440
            request: Request = None
441
            for arg in args:
442
                if isinstance(arg, Request):
443
                    request = arg
444
                    break
445
            else:
446
                err = f"{func.__name__} args doesn't have a Request argument"
447
                raise HTTPException(500, detail=err)
448
            validate_openapi_request(spec, request)
449
            return func(*args, **kwargs)
450
        return wrapper_validate
451
    return validate_decorator
452