Passed
Push — master ( ad4b11...eeb2a1 )
by Vinicius
04:03 queued 15s
created

kytos.core.helpers.run_on_thread()   A

Complexity

Conditions 1

Size

Total Lines 21
Code Lines 6

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 6
CRAP Score 1

Importance

Changes 0
Metric Value
cc 1
eloc 6
nop 1
dl 0
loc 21
rs 10
c 0
b 0
f 0
ccs 6
cts 6
cp 1
crap 1
1
"""Utilities functions used in Kytos."""
2 1
import functools
3 1
import logging
4 1
import traceback
5 1
from concurrent.futures import ThreadPoolExecutor
6 1
from datetime import datetime, timezone
7 1
from pathlib import Path
8 1
from threading import Thread
9
10 1
from flask import request
11 1
from openapi_core.contrib.flask import FlaskOpenAPIRequest
12 1
from openapi_core.spec.shortcuts import create_spec
13 1
from openapi_core.validation.request import openapi_request_validator
14 1
from openapi_spec_validator import validate_spec
15 1
from openapi_spec_validator.readers import read_from_filename
16 1
from werkzeug.exceptions import BadRequest, UnsupportedMediaType
17
18 1
from kytos.core.apm import ElasticAPM
19 1
from kytos.core.config import KytosConfig
20
21 1
__all__ = ['listen_to', 'now', 'run_on_thread', 'get_time']
22
23 1
LOG = logging.getLogger(__name__)
24
25
26 1
def get_thread_pool_max_workers():
27
    """Get the number of thread pool max workers."""
28 1
    return KytosConfig().options["daemon"].thread_pool_max_workers
29
30
31 1
def get_apm_name():
32
    """Get apm backend name."""
33 1
    return KytosConfig().options["daemon"].apm
34
35
36
# pylint: disable=invalid-name
37 1
executors = {name: ThreadPoolExecutor(max_workers=max_workers,
38
                                      thread_name_prefix=f"thread_pool_{name}")
39
             for name, max_workers in get_thread_pool_max_workers().items()}
40
41
42 1
def listen_to(event, *events, pool=None):
43
    """Decorate Event Listener methods.
44
45
    This decorator was built to be used on NAPPs methods to define which
46
    type of event the method will handle. With this, we will be able to
47
    'schedule' the app/method to receive an event when a new event is
48
    registered on the controller buffers.
49
    By using the run_on_thread decorator, we also guarantee that the method
50
    (handler) will be called from inside a new thread, avoiding this method to
51
    block its caller.
52
53
    The decorator will add an attribute to the method called 'events', that
54
    will be a list of the events that the method will handle.
55
56
    The event that will be listened to is always a string, but it can represent
57
    a regular expression to match against multiple Event Types. All listened
58
    events are documented in :doc:`/developer/listened_events` section.
59
60
    The pool option gives you control on which ThreadPoolExecutor that will
61
    execute the decorated handler. This knob is meant for prioritization,
62
    allowing executions on different pools depending on the handler primary
63
    responsibility and importance to avoid potential scheduling starvation.
64
65
    Example of usage:
66
67
    .. code-block:: python3
68
69
        class MyAppClass(KytosApp):
70
            @listen_to('kytos/of_core.messages.in')
71
            def my_handler_of_message_in(self, event):
72
                # Do stuff here...
73
74
            @listen_to('kytos/of_core.messages.out')
75
            def my_handler_of_message_out(self, event):
76
                # Do stuff here...
77
78
            @listen_to('kytos/of_core.messages.in.ofpt_hello',
79
                       'kytos/of_core.messages.out.ofpt_hello')
80
            def my_handler_of_hello_messages(self, event):
81
                # Do stuff here...
82
83
            @listen_to('kytos/of_core.message.*.hello')
84
            def my_other_handler_of_hello_messages(self, event):
85
                # Do stuff here...
86
87
            @listen_to('kytos/of_core.message.*.hello')
88
            def my_handler_of_hello_messages(self, event):
89
                # Do stuff here...
90
91
            @listen_to('kytos/of_core.message.*')
92
            def my_stats_handler_of_any_message(self, event):
93
                # Do stuff here...
94
95
            @listen_to("some_db_oriented_event", pool="db")
96
            def db_update(self, event):
97
                # Do stuff here...
98
    """
99 1
    def thread_decorator(handler):
100
        """Decorate the handler method.
101
102
        Returns:
103
            A method with an `events` attribute (list of events to be listened)
104
            and also decorated to run on a new thread.
105
106
        """
107
        @run_on_thread
108
        def threaded_handler(*args):
109
            """Decorate the handler to run from a new thread."""
110
            handler(*args)
111
112
        threaded_handler.events = [event]
113
        threaded_handler.events.extend(events)
114
        return threaded_handler
115
116
    # pylint: disable=broad-except
117 1
    def thread_pool_decorator(handler):
118
        """Decorate the handler method.
119
120
        Returns:
121
            A method with an `events` attribute (list of events to be listened)
122
            and also decorated to run on in the thread pool
123
124
        """
125 1
        def done_callback(future):
126
            """Done callback."""
127
            if not future.exception():
128
                _ = future.result()
129
                return
130
131
        # pylint: disable=unused-argument
132 1 View Code Duplication
        def handler_context(*args, **kwargs):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
133
            """Handler's context for ThreadPool."""
134
            cls, kytos_event = args[0], args[1]
135
            try:
136
                result = handler(*args)
137
            except Exception:
138
                result = None
139
                traceback_str = traceback.format_exc().replace("\n", ", ")
140
                LOG.error(f"listen_to handler: {handler}, "
141
                          f"args: {args} traceback: {traceback_str}")
142
                if hasattr(cls, "controller"):
143
                    cls.controller.dead_letter.add_event(kytos_event)
144
            return result
145
146 1 View Code Duplication
        def handler_context_apm(*args, apm_client=None):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
147
            """Handler's context for ThreadPool APM instrumentation."""
148
            cls, kytos_event = args[0], args[1]
149
            trace_parent = kytos_event.trace_parent
150
            tx_type = "kytos_event"
151
            tx = apm_client.begin_transaction(transaction_type=tx_type,
152
                                              trace_parent=trace_parent)
153
            kytos_event.trace_parent = tx.trace_parent
154
            tx.name = f"{kytos_event.name}@{cls.napp_id}"
155
            try:
156
                result = handler(*args)
157
                tx.result = result
158
            except Exception as exc:
159
                result = None
160
                traceback_str = traceback.format_exc().replace("\n", ", ")
161
                LOG.error(f"listen_to handler: {handler}, "
162
                          f"args: {args} traceback: {traceback_str}")
163
                if hasattr(cls, "controller"):
164
                    cls.controller.dead_letter.add_event(kytos_event)
165
                apm_client.capture_exception(
166
                    exc_info=(type(exc), exc, exc.__traceback__),
167
                    context={"args": args},
168
                    handled=False,
169
                )
170
            tx.end()
171
            apm_client.tracer.queue_func("transaction", tx.to_dict())
172
            return result
173
174 1
        handler_func, kwargs = handler_context, {}
175 1
        if get_apm_name() == "es":
176
            handler_func = handler_context_apm
177
            kwargs = dict(apm_client=ElasticAPM.get_client())
178
179 1
        def get_executor(pool, event, default_pool="app"):
180
            """Get executor."""
181 1
            if pool:
182
                return executors[pool]
183 1
            if not event:
184 1
                return executors[default_pool]
185
186
            if event.name.startswith("kytos/of_core") and "sb" in executors:
187
                return executors["sb"]
188
            core_of = "kytos/core.openflow"
189
            if event.name.startswith(core_of) and "sb" in executors:
190
                return executors["sb"]
191
            if event.name.startswith("kytos.storehouse") and "db" in executors:
192
                return executors["db"]
193
            return executors[default_pool]
194
195 1
        def inner(*args):
196
            """Decorate the handler to run in the thread pool."""
197 1
            event = args[1] if len(args) > 1 else None
198 1
            executor = get_executor(pool, event)
199 1
            future = executor.submit(handler_func, *args, **kwargs)
200 1
            future.add_done_callback(done_callback)
201
202 1
        inner.events = [event]
203 1
        inner.events.extend(events)
204 1
        return inner
205
206 1
    if executors:
207 1
        return thread_pool_decorator
208
    return thread_decorator
209
210
211 1
def alisten_to(event, *events):
212
    """Decorate async subscribing methods."""
213
214 1
    def decorator(handler):
215
        """Decorate the handler method.
216
217
        Returns:
218
            A method with an `events` attribute (list of events to be listened)
219
            and also decorated as an asyncio task.
220
        """
221
        # pylint: disable=unused-argument,broad-except
222 1 View Code Duplication
        async def handler_context(*args, **kwargs):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
223
            """Async handler's execution context."""
224 1
            cls, kytos_event = args[0], args[1]
225 1
            try:
226 1
                result = await handler(*args)
227
            except Exception:
228
                result = None
229
                traceback_str = traceback.format_exc().replace("\n", ", ")
230
                LOG.error(f"alisten_to handler: {handler}, "
231
                          f"args: {args} traceback: {traceback_str}")
232
                if hasattr(cls, "controller"):
233
                    cls.controller.dead_letter.add_event(kytos_event)
234 1
            return result
235
236 1 View Code Duplication
        async def handler_context_apm(*args, apm_client=None):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
237
            """Async handler's execution context with APM instrumentation."""
238
            cls, kytos_event = args[0], args[1]
239
            trace_parent = kytos_event.trace_parent
240
            tx_type = "kytos_event"
241
            tx = apm_client.begin_transaction(transaction_type=tx_type,
242
                                              trace_parent=trace_parent)
243
            kytos_event.trace_parent = tx.trace_parent
244
            tx.name = f"{kytos_event.name}@{cls.napp_id}"
245
            try:
246
                result = await handler(*args)
247
                tx.result = result
248
            except Exception as exc:
249
                result = None
250
                traceback_str = traceback.format_exc().replace("\n", ", ")
251
                LOG.error(f"alisten_to handler: {handler}, "
252
                          f"args: {args} traceback: {traceback_str}")
253
                if hasattr(cls, "controller"):
254
                    cls.controller.dead_letter.add_event(kytos_event)
255
                apm_client.capture_exception(
256
                    exc_info=(type(exc), exc, exc.__traceback__),
257
                    context={"args": args},
258
                    handled=False,
259
                )
260
            tx.end()
261
            apm_client.tracer.queue_func("transaction", tx.to_dict())
262
            return result
263
264 1
        handler_func, kwargs = handler_context, {}
265 1
        if get_apm_name() == "es":
266
            handler_func = handler_context_apm
267
            kwargs = dict(apm_client=ElasticAPM.get_client())
268
269 1
        async def inner(*args):
270
            """Inner decorated with events attribute."""
271 1
            return await handler_func(*args, **kwargs)
272 1
        inner.events = [event]
273 1
        inner.events.extend(events)
274 1
        return inner
275
276 1
    return decorator
277
278
279 1
def now(tzone=timezone.utc):
280
    """Return the current datetime (default to UTC).
281
282
    Args:
283
        tzone (datetime.timezone): Specific time zone used in datetime.
284
285
    Returns:
286
        datetime.datetime.now: Date time with specific time zone.
287
288
    """
289 1
    return datetime.now(tzone)
290
291
292 1
def run_on_thread(method):
293
    """Decorate to run the decorated method inside a new thread.
294
295
    Args:
296
        method (function): function used to run as a new thread.
297
298
    Returns:
299
        Decorated method that will run inside a new thread.
300
        When the decorated method is called, it will not return the created
301
        thread to the caller.
302
303
    """
304 1
    def threaded_method(*args):
305
        """Ensure the handler method runs inside a new thread."""
306 1
        thread = Thread(target=method, args=args)
307
308
        # Set daemon mode so that we don't have to wait for these threads
309
        # to finish when exiting Kytos
310 1
        thread.daemon = True
311 1
        thread.start()
312 1
    return threaded_method
313
314
315 1
def get_time(data=None):
316
    """Receive a dictionary or a string and return a datatime instance.
317
318
    data = {"year": 2006,
319
            "month": 11,
320
            "day": 21,
321
            "hour": 16,
322
            "minute": 30 ,
323
            "second": 00}
324
325
    or
326
327
    data = "21/11/06 16:30:00"
328
329
    2018-04-17T17:13:50Z
330
331
    Args:
332
        data (str, dict): python dict or string to be converted to datetime
333
334
    Returns:
335
        datetime: datetime instance.
336
337
    """
338 1
    if isinstance(data, str):
339 1
        date = datetime.strptime(data, "%Y-%m-%dT%H:%M:%S")
340 1
    elif isinstance(data, dict):
341 1
        date = datetime(**data)
342
    else:
343 1
        return None
344 1
    return date.replace(tzinfo=timezone.utc)
345
346
347 1
def _read_from_filename(yml_file_path: Path) -> dict:
348
    """Read from yml filename."""
349
    spec_dict, _ = read_from_filename(yml_file_path)
350
    return spec_dict
351
352
353 1
def load_spec(yml_file_path: Path):
354
    """Load and validate spec object given a yml file path."""
355 1
    spec_dict = _read_from_filename(yml_file_path)
356 1
    validate_spec(spec_dict)
357 1
    return create_spec(spec_dict)
358
359
360 1
def validate_openapi(spec):
361
    """Decorator to validate a REST endpoint input.
362
363
    Uses the schema defined in the openapi.yml file
364
    to validate.
365
    """
366
367
    def validate_decorator(func):
368
        @functools.wraps(func)
369
        def wrapper_validate(*args, **kwargs):
370
            try:
371
                data = request.get_json()
372
            except BadRequest:
373
                result = "The request body is not a well-formed JSON."
374
                raise BadRequest(result) from BadRequest
375
            if data is None:
376
                result = "The request body mimetype is not application/json."
377
                raise UnsupportedMediaType(result)
378
379
            openapi_request = FlaskOpenAPIRequest(request)
380
            result = openapi_request_validator.validate(spec, openapi_request)
381
            if result.errors:
382
                error_response = (
383
                    "The request body contains invalid API data."
384
                )
385
                errors = result.errors[0]
386
                if hasattr(errors, "schema_errors"):
387
                    schema_errors = errors.schema_errors[0]
388
                    error_log = {
389
                        "error_message": schema_errors.message,
390
                        "error_validator": schema_errors.validator,
391
                        "error_validator_value": schema_errors.validator_value,
392
                        "error_path": list(schema_errors.path),
393
                        "error_schema": schema_errors.schema,
394
                        "error_schema_path": list(schema_errors.schema_path),
395
                    }
396
                    LOG.debug(f"Invalid request (API schema): {error_log}")
397
                    error_response += f" {schema_errors.message} for field"
398
                    error_response += (
399
                        f" {'/'.join(map(str,schema_errors.path))}."
400
                    )
401
                raise BadRequest(error_response) from BadRequest
402
            return func(*args, data=data, **kwargs)
403
404
        return wrapper_validate
405
406
    return validate_decorator
407