Passed
Push — master ( 63a652...50cc5d )
by Vinicius
07:40 queued 05:22
created

kytos.core.helpers.run_on_thread()   A

Complexity

Conditions 1

Size

Total Lines 21
Code Lines 6

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 6
CRAP Score 1

Importance

Changes 0
Metric Value
cc 1
eloc 6
nop 1
dl 0
loc 21
rs 10
c 0
b 0
f 0
ccs 6
cts 6
cp 1
crap 1
1
"""Utilities functions used in Kytos."""
2 1
import logging
3 1
import traceback
4 1
from concurrent.futures import ThreadPoolExecutor
5 1
from datetime import datetime, timezone
6 1
from threading import Thread
7
8 1
from kytos.core.apm import ElasticAPM
9 1
from kytos.core.config import KytosConfig
10
11 1
__all__ = ['listen_to', 'now', 'run_on_thread', 'get_time']
12
13 1
LOG = logging.getLogger(__name__)
14
15
16 1
def get_thread_pool_max_workers():
17
    """Get the number of thread pool max workers."""
18 1
    return KytosConfig().options["daemon"].thread_pool_max_workers
19
20
21 1
def get_apm_name():
22
    """Get apm backend name."""
23 1
    return KytosConfig().options["daemon"].apm
24
25
26
# pylint: disable=invalid-name
27 1
executors = {name: ThreadPoolExecutor(max_workers=max_workers,
28
                                      thread_name_prefix=f"thread_pool_{name}")
29
             for name, max_workers in get_thread_pool_max_workers().items()}
30
31
32 1
def listen_to(event, *events, pool=None):
33
    """Decorate Event Listener methods.
34
35
    This decorator was built to be used on NAPPs methods to define which
36
    type of event the method will handle. With this, we will be able to
37
    'schedule' the app/method to receive an event when a new event is
38
    registered on the controller buffers.
39
    By using the run_on_thread decorator, we also guarantee that the method
40
    (handler) will be called from inside a new thread, avoiding this method to
41
    block its caller.
42
43
    The decorator will add an attribute to the method called 'events', that
44
    will be a list of the events that the method will handle.
45
46
    The event that will be listened to is always a string, but it can represent
47
    a regular expression to match against multiple Event Types. All listened
48
    events are documented in :doc:`/developer/listened_events` section.
49
50
    The pool option gives you control on which ThreadPoolExecutor that will
51
    execute the decorated handler. This knob is meant for prioritization,
52
    allowing executions on different pools depending on the handler primary
53
    responsibility and importance to avoid potential scheduling starvation.
54
55
    Example of usage:
56
57
    .. code-block:: python3
58
59
        class MyAppClass(KytosApp):
60
            @listen_to('kytos/of_core.messages.in')
61
            def my_handler_of_message_in(self, event):
62
                # Do stuff here...
63
64
            @listen_to('kytos/of_core.messages.out')
65
            def my_handler_of_message_out(self, event):
66
                # Do stuff here...
67
68
            @listen_to('kytos/of_core.messages.in.ofpt_hello',
69
                       'kytos/of_core.messages.out.ofpt_hello')
70
            def my_handler_of_hello_messages(self, event):
71
                # Do stuff here...
72
73
            @listen_to('kytos/of_core.message.*.hello')
74
            def my_other_handler_of_hello_messages(self, event):
75
                # Do stuff here...
76
77
            @listen_to('kytos/of_core.message.*.hello')
78
            def my_handler_of_hello_messages(self, event):
79
                # Do stuff here...
80
81
            @listen_to('kytos/of_core.message.*')
82
            def my_stats_handler_of_any_message(self, event):
83
                # Do stuff here...
84
85
            @listen_to("some_db_oriented_event", pool="db")
86
            def db_update(self, event):
87
                # Do stuff here...
88
    """
89 1
    def thread_decorator(handler):
90
        """Decorate the handler method.
91
92
        Returns:
93
            A method with an `events` attribute (list of events to be listened)
94
            and also decorated to run on a new thread.
95
96
        """
97
        @run_on_thread
98
        def threaded_handler(*args):
99
            """Decorate the handler to run from a new thread."""
100
            handler(*args)
101
102
        threaded_handler.events = [event]
103
        threaded_handler.events.extend(events)
104
        return threaded_handler
105
106
    # pylint: disable=broad-except
107 1
    def thread_pool_decorator(handler):
108
        """Decorate the handler method.
109
110
        Returns:
111
            A method with an `events` attribute (list of events to be listened)
112
            and also decorated to run on in the thread pool
113
114
        """
115 1
        def done_callback(future):
116
            """Done callback."""
117
            if not future.exception():
118
                _ = future.result()
119
                return
120
121
        # pylint: disable=unused-argument
122 1
        def handler_context(*args, **kwargs):
123
            """Handler's context for ThreadPool."""
124
            cls, kytos_event = args[0], args[1]
125
            try:
126
                result = handler(*args)
127
            except Exception:
128
                result = None
129
                traceback_str = traceback.format_exc().replace("\n", ", ")
130
                LOG.error(f"listen_to handler: {handler}, "
131
                          f"args: {args} traceback: {traceback_str}")
132
                if hasattr(cls, "controller"):
133
                    cls.controller.dead_letter.add_event(kytos_event)
134
            return result
135
136 1
        def handler_context_apm(*args, apm_client=None):
137
            """Handler's context for ThreadPool APM instrumentation."""
138
            cls, kytos_event = args[0], args[1]
139
            trace_parent = kytos_event.trace_parent
140
            tx_type = "kytos_event"
141
            tx = apm_client.begin_transaction(transaction_type=tx_type,
142
                                              trace_parent=trace_parent)
143
            kytos_event.trace_parent = tx.trace_parent
144
            tx.name = f"{kytos_event.name}@{cls.napp_id}"
145
            try:
146
                result = handler(*args)
147
                tx.result = result
148
            except Exception as exc:
149
                result = None
150
                traceback_str = traceback.format_exc().replace("\n", ", ")
151
                LOG.error(f"listen_to handler: {handler}, "
152
                          f"args: {args} traceback: {traceback_str}")
153
                if hasattr(cls, "controller"):
154
                    cls.controller.dead_letter.add_event(kytos_event)
155
                apm_client.capture_exception(
156
                    exc_info=(type(exc), exc, exc.__traceback__),
157
                    context={"args": args},
158
                    handled=False,
159
                )
160
            tx.end()
161
            apm_client.tracer.queue_func("transaction", tx.to_dict())
162
            return result
163
164 1
        handler_func, kwargs = handler_context, {}
165 1
        if get_apm_name() == "es":
166
            handler_func = handler_context_apm
167
            kwargs = dict(apm_client=ElasticAPM.get_client())
168
169 1
        def get_executor(pool, event, default_pool="app"):
170
            """Get executor."""
171 1
            if pool:
172
                return executors[pool]
173 1
            if not event:
174 1
                return executors[default_pool]
175
176
            if event.name.startswith("kytos/of_core") and "sb" in executors:
177
                return executors["sb"]
178
            core_of = "kytos/core.openflow"
179
            if event.name.startswith(core_of) and "sb" in executors:
180
                return executors["sb"]
181
            if event.name.startswith("kytos.storehouse") and "db" in executors:
182
                return executors["db"]
183
            return executors[default_pool]
184
185 1
        def inner(*args):
186
            """Decorate the handler to run in the thread pool."""
187 1
            event = args[1] if len(args) > 1 else None
188 1
            executor = get_executor(pool, event)
189 1
            future = executor.submit(handler_func, *args, **kwargs)
190 1
            future.add_done_callback(done_callback)
191
192 1
        inner.events = [event]
193 1
        inner.events.extend(events)
194 1
        return inner
195
196 1
    if executors:
197 1
        return thread_pool_decorator
198
    return thread_decorator
199
200
201 1
def now(tzone=timezone.utc):
202
    """Return the current datetime (default to UTC).
203
204
    Args:
205
        tzone (datetime.timezone): Specific time zone used in datetime.
206
207
    Returns:
208
        datetime.datetime.now: Date time with specific time zone.
209
210
    """
211 1
    return datetime.now(tzone)
212
213
214 1
def run_on_thread(method):
215
    """Decorate to run the decorated method inside a new thread.
216
217
    Args:
218
        method (function): function used to run as a new thread.
219
220
    Returns:
221
        Decorated method that will run inside a new thread.
222
        When the decorated method is called, it will not return the created
223
        thread to the caller.
224
225
    """
226 1
    def threaded_method(*args):
227
        """Ensure the handler method runs inside a new thread."""
228 1
        thread = Thread(target=method, args=args)
229
230
        # Set daemon mode so that we don't have to wait for these threads
231
        # to finish when exiting Kytos
232 1
        thread.daemon = True
233 1
        thread.start()
234 1
    return threaded_method
235
236
237 1
def get_time(data=None):
238
    """Receive a dictionary or a string and return a datatime instance.
239
240
    data = {"year": 2006,
241
            "month": 11,
242
            "day": 21,
243
            "hour": 16,
244
            "minute": 30 ,
245
            "second": 00}
246
247
    or
248
249
    data = "21/11/06 16:30:00"
250
251
    2018-04-17T17:13:50Z
252
253
    Args:
254
        data (str, dict): python dict or string to be converted to datetime
255
256
    Returns:
257
        datetime: datetime instance.
258
259
    """
260 1
    if isinstance(data, str):
261 1
        date = datetime.strptime(data, "%Y-%m-%dT%H:%M:%S")
262 1
    elif isinstance(data, dict):
263 1
        date = datetime(**data)
264
    else:
265 1
        return None
266
    return date.replace(tzinfo=timezone.utc)
267