Passed
Pull Request — master (#213)
by Vinicius
10:19 queued 06:10
created

kytos.core.helpers   A

Complexity

Total Complexity 24

Size/Duplication

Total Lines 266
Duplicated Lines 0 %

Test Coverage

Coverage 50%

Importance

Changes 0
Metric Value
eloc 108
dl 0
loc 266
ccs 50
cts 100
cp 0.5
rs 10
c 0
b 0
f 0
wmc 24

6 Functions

Rating   Name   Duplication   Size   Complexity  
A get_apm_name() 0 3 1
A get_thread_pool_max_workers() 0 3 1
A get_time() 0 30 3
A run_on_thread() 0 21 1
F listen_to() 0 167 17
A now() 0 11 1
1
"""Utilities functions used in Kytos."""
2 1
import logging
3 1
from concurrent.futures import ThreadPoolExecutor
4 1
from datetime import datetime, timezone
5 1
from threading import Thread
6
7 1
from kytos.core.apm import ElasticAPM
8 1
from kytos.core.config import KytosConfig
9
10 1
__all__ = ['listen_to', 'now', 'run_on_thread', 'get_time']
11
12 1
LOG = logging.getLogger(__name__)
13
14
15 1
def get_thread_pool_max_workers():
16
    """Get the number of thread pool max workers."""
17 1
    return KytosConfig().options["daemon"].thread_pool_max_workers
18
19
20 1
def get_apm_name():
21
    """Get apm backend name."""
22 1
    return KytosConfig().options["daemon"].apm
23
24
25
# pylint: disable=invalid-name
26 1
executors = {name: ThreadPoolExecutor(max_workers=max_workers,
27
                                      thread_name_prefix=f"thread_pool_{name}")
28
             for name, max_workers in get_thread_pool_max_workers().items()}
29
30
31 1
def listen_to(event, *events, pool=None):
32
    """Decorate Event Listener methods.
33
34
    This decorator was built to be used on NAPPs methods to define which
35
    type of event the method will handle. With this, we will be able to
36
    'schedule' the app/method to receive an event when a new event is
37
    registered on the controller buffers.
38
    By using the run_on_thread decorator, we also guarantee that the method
39
    (handler) will be called from inside a new thread, avoiding this method to
40
    block its caller.
41
42
    The decorator will add an attribute to the method called 'events', that
43
    will be a list of the events that the method will handle.
44
45
    The event that will be listened to is always a string, but it can represent
46
    a regular expression to match against multiple Event Types. All listened
47
    events are documented in :doc:`/developer/listened_events` section.
48
49
    The pool option gives you control on which ThreadPoolExecutor that will
50
    execute the decorated handler. This knob is meant for prioritization,
51
    allowing executions on different pools depending on the handler primary
52
    responsibility and importance to avoid potential scheduling starvation.
53
54
    Example of usage:
55
56
    .. code-block:: python3
57
58
        class MyAppClass(KytosApp):
59
            @listen_to('kytos/of_core.messages.in')
60
            def my_handler_of_message_in(self, event):
61
                # Do stuff here...
62
63
            @listen_to('kytos/of_core.messages.out')
64
            def my_handler_of_message_out(self, event):
65
                # Do stuff here...
66
67
            @listen_to('kytos/of_core.messages.in.ofpt_hello',
68
                       'kytos/of_core.messages.out.ofpt_hello')
69
            def my_handler_of_hello_messages(self, event):
70
                # Do stuff here...
71
72
            @listen_to('kytos/of_core.message.*.hello')
73
            def my_other_handler_of_hello_messages(self, event):
74
                # Do stuff here...
75
76
            @listen_to('kytos/of_core.message.*.hello')
77
            def my_handler_of_hello_messages(self, event):
78
                # Do stuff here...
79
80
            @listen_to('kytos/of_core.message.*')
81
            def my_stats_handler_of_any_message(self, event):
82
                # Do stuff here...
83
84
            @listen_to("some_db_oriented_event", pool="db")
85
            def db_update(self, event):
86
                # Do stuff here...
87
    """
88 1
    def thread_decorator(handler):
89
        """Decorate the handler method.
90
91
        Returns:
92
            A method with an `events` attribute (list of events to be listened)
93
            and also decorated to run on a new thread.
94
95
        """
96
        @run_on_thread
97
        def threaded_handler(*args):
98
            """Decorate the handler to run from a new thread."""
99
            handler(*args)
100
101
        threaded_handler.events = [event]
102
        threaded_handler.events.extend(events)
103
        return threaded_handler
104
105
    # pylint: disable=broad-except
106 1
    def thread_pool_decorator(handler):
107
        """Decorate the handler method.
108
109
        Returns:
110
            A method with an `events` attribute (list of events to be listened)
111
            and also decorated to run on in the thread pool
112
113
        """
114 1
        def done_callback(future):
115
            """Done callback."""
116
            if not future.exception():
117
                _ = future.result()
118
                return
119
120
        # pylint: disable=unused-argument
121 1
        def handler_context(*args, **kwargs):
122
            """Handler's context for ThreadPool."""
123
            cls, kytos_event = args[0], args[1]
124
            try:
125
                result = handler(*args)
126
            except Exception as exc:
127
                result = None
128
                exc_str = f"{type(exc)}: {str(exc)}"
129
                LOG.error(f"listen_to handler: {handler}, "
130
                          f"args: {args}, exception: {exc_str}")
131
                if hasattr(cls, "controller"):
132
                    cls.controller.dead_letter.add_event(kytos_event)
133
            return result
134
135 1
        def handler_context_apm(*args, apm_client=None):
136
            """Handler's context for ThreadPool APM instrumentation."""
137
            cls, kytos_event = args[0], args[1]
138
            trace_parent = kytos_event.trace_parent
139
            tx_type = "kytos_event"
140
            tx = apm_client.begin_transaction(transaction_type=tx_type,
141
                                              trace_parent=trace_parent)
142
            kytos_event.trace_parent = tx.trace_parent
143
            tx.name = f"{kytos_event.name}@{cls.napp_id}"
144
            try:
145
                result = handler(*args)
146
                tx.result = result
147
            except Exception as exc:
148
                result = None
149
                exc_str = f"{type(exc)}: {str(exc)}"
150
                LOG.error(f"listen_to handler: {handler}, "
151
                          f"args: {args}, exception: {exc_str}")
152
                if hasattr(cls, "controller"):
153
                    cls.controller.dead_letter.add_event(kytos_event)
154
                apm_client.capture_exception(
155
                    exc_info=(type(exc), exc, exc.__traceback__),
156
                    context={"args": args},
157
                    handled=False,
158
                )
159
            tx.end()
160
            apm_client.tracer.queue_func("transaction", tx.to_dict())
161
            return result
162
163 1
        handler_func, kwargs = handler_context, {}
164 1
        if get_apm_name() == "es":
165
            handler_func = handler_context_apm
166
            kwargs = dict(apm_client=ElasticAPM.get_client())
167
168 1
        def get_executor(pool, event, default_pool="app"):
169
            """Get executor."""
170 1
            if pool:
171
                return executors[pool]
172 1
            if not event:
173 1
                return executors[default_pool]
174
175
            if event.name.startswith("kytos/of_core") and "sb" in executors:
176
                return executors["sb"]
177
            core_of = "kytos/core.openflow"
178
            if event.name.startswith(core_of) and "sb" in executors:
179
                return executors["sb"]
180
            if event.name.startswith("kytos.storehouse") and "db" in executors:
181
                return executors["db"]
182
            return executors[default_pool]
183
184 1
        def inner(*args):
185
            """Decorate the handler to run in the thread pool."""
186 1
            event = args[1] if len(args) > 1 else None
187 1
            executor = get_executor(pool, event)
188 1
            future = executor.submit(handler_func, *args, **kwargs)
189 1
            future.add_done_callback(done_callback)
190
191 1
        inner.events = [event]
192 1
        inner.events.extend(events)
193 1
        return inner
194
195 1
    if executors:
196 1
        return thread_pool_decorator
197
    return thread_decorator
198
199
200 1
def now(tzone=timezone.utc):
201
    """Return the current datetime (default to UTC).
202
203
    Args:
204
        tzone (datetime.timezone): Specific time zone used in datetime.
205
206
    Returns:
207
        datetime.datetime.now: Date time with specific time zone.
208
209
    """
210 1
    return datetime.now(tzone)
211
212
213 1
def run_on_thread(method):
214
    """Decorate to run the decorated method inside a new thread.
215
216
    Args:
217
        method (function): function used to run as a new thread.
218
219
    Returns:
220
        Decorated method that will run inside a new thread.
221
        When the decorated method is called, it will not return the created
222
        thread to the caller.
223
224
    """
225 1
    def threaded_method(*args):
226
        """Ensure the handler method runs inside a new thread."""
227 1
        thread = Thread(target=method, args=args)
228
229
        # Set daemon mode so that we don't have to wait for these threads
230
        # to finish when exiting Kytos
231 1
        thread.daemon = True
232 1
        thread.start()
233 1
    return threaded_method
234
235
236 1
def get_time(data=None):
237
    """Receive a dictionary or a string and return a datatime instance.
238
239
    data = {"year": 2006,
240
            "month": 11,
241
            "day": 21,
242
            "hour": 16,
243
            "minute": 30 ,
244
            "second": 00}
245
246
    or
247
248
    data = "21/11/06 16:30:00"
249
250
    2018-04-17T17:13:50Z
251
252
    Args:
253
        data (str, dict): python dict or string to be converted to datetime
254
255
    Returns:
256
        datetime: datetime instance.
257
258
    """
259 1
    if isinstance(data, str):
260 1
        date = datetime.strptime(data, "%Y-%m-%dT%H:%M:%S")
261 1
    elif isinstance(data, dict):
262 1
        date = datetime(**data)
263
    else:
264 1
        return None
265
    return date.replace(tzinfo=timezone.utc)
266