Passed
Pull Request — master (#235)
by Vinicius
05:20
created

kytos.core.helpers.run_on_thread()   A

Complexity

Conditions 1

Size

Total Lines 21
Code Lines 6

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 6
CRAP Score 1

Importance

Changes 0
Metric Value
cc 1
eloc 6
nop 1
dl 0
loc 21
rs 10
c 0
b 0
f 0
ccs 6
cts 6
cp 1
crap 1
1
"""Utilities functions used in Kytos."""
2 1
import logging
3 1
import traceback
4 1
from concurrent.futures import ThreadPoolExecutor
5 1
from datetime import datetime, timezone
6 1
from threading import Thread
7
8 1
from kytos.core.apm import ElasticAPM
9 1
from kytos.core.config import KytosConfig
10
11 1
__all__ = ['listen_to', 'now', 'run_on_thread', 'get_time']
12
13 1
LOG = logging.getLogger(__name__)
14
15
16 1
def get_thread_pool_max_workers():
17
    """Get the number of thread pool max workers."""
18 1
    return KytosConfig().options["daemon"].thread_pool_max_workers
19
20
21 1
def get_apm_name():
22
    """Get apm backend name."""
23 1
    return KytosConfig().options["daemon"].apm
24
25
26
# pylint: disable=invalid-name
27 1
executors = {name: ThreadPoolExecutor(max_workers=max_workers,
28
                                      thread_name_prefix=f"thread_pool_{name}")
29
             for name, max_workers in get_thread_pool_max_workers().items()}
30
31
32 1
def listen_to(event, *events, pool=None):
33
    """Decorate Event Listener methods.
34
35
    This decorator was built to be used on NAPPs methods to define which
36
    type of event the method will handle. With this, we will be able to
37
    'schedule' the app/method to receive an event when a new event is
38
    registered on the controller buffers.
39
    By using the run_on_thread decorator, we also guarantee that the method
40
    (handler) will be called from inside a new thread, avoiding this method to
41
    block its caller.
42
43
    The decorator will add an attribute to the method called 'events', that
44
    will be a list of the events that the method will handle.
45
46
    The event that will be listened to is always a string, but it can represent
47
    a regular expression to match against multiple Event Types. All listened
48
    events are documented in :doc:`/developer/listened_events` section.
49
50
    The pool option gives you control on which ThreadPoolExecutor that will
51
    execute the decorated handler. This knob is meant for prioritization,
52
    allowing executions on different pools depending on the handler primary
53
    responsibility and importance to avoid potential scheduling starvation.
54
55
    Example of usage:
56
57
    .. code-block:: python3
58
59
        class MyAppClass(KytosApp):
60
            @listen_to('kytos/of_core.messages.in')
61
            def my_handler_of_message_in(self, event):
62
                # Do stuff here...
63
64
            @listen_to('kytos/of_core.messages.out')
65
            def my_handler_of_message_out(self, event):
66
                # Do stuff here...
67
68
            @listen_to('kytos/of_core.messages.in.ofpt_hello',
69
                       'kytos/of_core.messages.out.ofpt_hello')
70
            def my_handler_of_hello_messages(self, event):
71
                # Do stuff here...
72
73
            @listen_to('kytos/of_core.message.*.hello')
74
            def my_other_handler_of_hello_messages(self, event):
75
                # Do stuff here...
76
77
            @listen_to('kytos/of_core.message.*.hello')
78
            def my_handler_of_hello_messages(self, event):
79
                # Do stuff here...
80
81
            @listen_to('kytos/of_core.message.*')
82
            def my_stats_handler_of_any_message(self, event):
83
                # Do stuff here...
84
85
            @listen_to("some_db_oriented_event", pool="db")
86
            def db_update(self, event):
87
                # Do stuff here...
88
    """
89 1
    def thread_decorator(handler):
90
        """Decorate the handler method.
91
92
        Returns:
93
            A method with an `events` attribute (list of events to be listened)
94
            and also decorated to run on a new thread.
95
96
        """
97
        @run_on_thread
98
        def threaded_handler(*args):
99
            """Decorate the handler to run from a new thread."""
100
            handler(*args)
101
102
        threaded_handler.events = [event]
103
        threaded_handler.events.extend(events)
104
        return threaded_handler
105
106
    # pylint: disable=broad-except
107 1
    def thread_pool_decorator(handler):
108
        """Decorate the handler method.
109
110
        Returns:
111
            A method with an `events` attribute (list of events to be listened)
112
            and also decorated to run on in the thread pool
113
114
        """
115 1
        def done_callback(future):
116
            """Done callback."""
117
            if not future.exception():
118
                _ = future.result()
119
                return
120
121
        # pylint: disable=unused-argument
122 1
        def handler_context(*args, **kwargs):
123
            """Handler's context for ThreadPool."""
124
            cls, kytos_event = args[0], args[1]
125
            try:
126
                result = handler(*args)
127
            except Exception:
128
                result = None
129
                traceback_str = traceback.format_exc().replace("\n", ", ")
130
                LOG.error(f"listen_to handler: {handler}, "
131
                          f"args: {args} traceback: {traceback_str}")
132
                if hasattr(cls, "controller"):
133
                    cls.controller.dead_letter.add_event(kytos_event)
134
            return result
135
136 1 View Code Duplication
        def handler_context_apm(*args, apm_client=None):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
137
            """Handler's context for ThreadPool APM instrumentation."""
138
            cls, kytos_event = args[0], args[1]
139
            trace_parent = kytos_event.trace_parent
140
            tx_type = "kytos_event"
141
            tx = apm_client.begin_transaction(transaction_type=tx_type,
142
                                              trace_parent=trace_parent)
143
            kytos_event.trace_parent = tx.trace_parent
144
            tx.name = f"{kytos_event.name}@{cls.napp_id}"
145
            try:
146
                result = handler(*args)
147
                tx.result = result
148
            except Exception as exc:
149
                result = None
150
                traceback_str = traceback.format_exc().replace("\n", ", ")
151
                LOG.error(f"listen_to handler: {handler}, "
152
                          f"args: {args} traceback: {traceback_str}")
153
                if hasattr(cls, "controller"):
154
                    cls.controller.dead_letter.add_event(kytos_event)
155
                apm_client.capture_exception(
156
                    exc_info=(type(exc), exc, exc.__traceback__),
157
                    context={"args": args},
158
                    handled=False,
159
                )
160
            tx.end()
161
            apm_client.tracer.queue_func("transaction", tx.to_dict())
162
            return result
163
164 1
        handler_func, kwargs = handler_context, {}
165 1
        if get_apm_name() == "es":
166
            handler_func = handler_context_apm
167
            kwargs = dict(apm_client=ElasticAPM.get_client())
168
169 1
        def get_executor(pool, event, default_pool="app"):
170
            """Get executor."""
171 1
            if pool:
172
                return executors[pool]
173 1
            if not event:
174 1
                return executors[default_pool]
175
176
            if event.name.startswith("kytos/of_core") and "sb" in executors:
177
                return executors["sb"]
178
            core_of = "kytos/core.openflow"
179
            if event.name.startswith(core_of) and "sb" in executors:
180
                return executors["sb"]
181
            if event.name.startswith("kytos.storehouse") and "db" in executors:
182
                return executors["db"]
183
            return executors[default_pool]
184
185 1
        def inner(*args):
186
            """Decorate the handler to run in the thread pool."""
187 1
            event = args[1] if len(args) > 1 else None
188 1
            executor = get_executor(pool, event)
189 1
            future = executor.submit(handler_func, *args, **kwargs)
190 1
            future.add_done_callback(done_callback)
191
192 1
        inner.events = [event]
193 1
        inner.events.extend(events)
194 1
        return inner
195
196 1
    if executors:
197 1
        return thread_pool_decorator
198
    return thread_decorator
199
200
201 1
def alisten_to(event, *events):
202
    """Decorate async subscribing methods."""
203
204 1
    def decorator(handler):
205
        """Decorate the handler method.
206
207
        Returns:
208
            A method with an `events` attribute (list of events to be listened)
209
            and also decorated as an asyncio task.
210
        """
211
        # pylint: disable=unused-argument,broad-except
212 1
        async def handler_context(*args, **kwargs):
213
            """Async handler's execution context."""
214 1
            cls, kytos_event = args[0], args[1]
215 1
            try:
216 1
                result = await handler(*args)
217
            except Exception as exc:
218
                result = None
219
                exc_str = f"{type(exc)}: {str(exc)}"
220
                LOG.error(f"alisten_to handler: {handler}, "
221
                          f"args: {args}, exception: {exc_str}")
222
                if hasattr(cls, "controller"):
223
                    cls.controller.dead_letter.add_event(kytos_event)
224 1
            return result
225
226 1 View Code Duplication
        async def handler_context_apm(*args, apm_client=None):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
227
            """Async handler's execution context with APM instrumentation."""
228
            cls, kytos_event = args[0], args[1]
229
            trace_parent = kytos_event.trace_parent
230
            tx_type = "kytos_event"
231
            tx = apm_client.begin_transaction(transaction_type=tx_type,
232
                                              trace_parent=trace_parent)
233
            kytos_event.trace_parent = tx.trace_parent
234
            tx.name = f"{kytos_event.name}@{cls.napp_id}"
235
            try:
236
                result = await handler(*args)
237
                tx.result = result
238
            except Exception as exc:
239
                result = None
240
                exc_str = f"{type(exc)}: {str(exc)}"
241
                LOG.error(f"alisten_to handler: {handler}, "
242
                          f"args: {args}, exception: {exc_str}")
243
                if hasattr(cls, "controller"):
244
                    cls.controller.dead_letter.add_event(kytos_event)
245
                apm_client.capture_exception(
246
                    exc_info=(type(exc), exc, exc.__traceback__),
247
                    context={"args": args},
248
                    handled=False,
249
                )
250
            tx.end()
251
            apm_client.tracer.queue_func("transaction", tx.to_dict())
252
            return result
253
254 1
        handler_func, kwargs = handler_context, {}
255 1
        if get_apm_name() == "es":
256
            handler_func = handler_context_apm
257
            kwargs = dict(apm_client=ElasticAPM.get_client())
258
259 1
        async def inner(*args):
260
            """Inner decorated with events attribute."""
261 1
            return await handler_func(*args, **kwargs)
262 1
        inner.events = [event]
263 1
        inner.events.extend(events)
264 1
        return inner
265
266 1
    return decorator
267
268
269 1
def now(tzone=timezone.utc):
270
    """Return the current datetime (default to UTC).
271
272
    Args:
273
        tzone (datetime.timezone): Specific time zone used in datetime.
274
275
    Returns:
276
        datetime.datetime.now: Date time with specific time zone.
277
278
    """
279 1
    return datetime.now(tzone)
280
281
282 1
def run_on_thread(method):
283
    """Decorate to run the decorated method inside a new thread.
284
285
    Args:
286
        method (function): function used to run as a new thread.
287
288
    Returns:
289
        Decorated method that will run inside a new thread.
290
        When the decorated method is called, it will not return the created
291
        thread to the caller.
292
293
    """
294 1
    def threaded_method(*args):
295
        """Ensure the handler method runs inside a new thread."""
296 1
        thread = Thread(target=method, args=args)
297
298
        # Set daemon mode so that we don't have to wait for these threads
299
        # to finish when exiting Kytos
300 1
        thread.daemon = True
301 1
        thread.start()
302 1
    return threaded_method
303
304
305 1
def get_time(data=None):
306
    """Receive a dictionary or a string and return a datatime instance.
307
308
    data = {"year": 2006,
309
            "month": 11,
310
            "day": 21,
311
            "hour": 16,
312
            "minute": 30 ,
313
            "second": 00}
314
315
    or
316
317
    data = "21/11/06 16:30:00"
318
319
    2018-04-17T17:13:50Z
320
321
    Args:
322
        data (str, dict): python dict or string to be converted to datetime
323
324
    Returns:
325
        datetime: datetime instance.
326
327
    """
328 1
    if isinstance(data, str):
329 1
        date = datetime.strptime(data, "%Y-%m-%dT%H:%M:%S")
330 1
    elif isinstance(data, dict):
331 1
        date = datetime(**data)
332
    else:
333 1
        return None
334
    return date.replace(tzinfo=timezone.utc)
335