Passed
Pull Request — master (#209)
by Vinicius
13:06 queued 09:04
created

kytos.core.helpers   A

Complexity

Total Complexity 15

Size/Duplication

Total Lines 241
Duplicated Lines 0 %

Test Coverage

Coverage 52.86%

Importance

Changes 0
Metric Value
eloc 92
dl 0
loc 241
ccs 46
cts 87
cp 0.5286
rs 10
c 0
b 0
f 0
wmc 15

6 Functions

Rating   Name   Duplication   Size   Complexity  
A get_thread_pool_max_workers() 0 3 1
A get_apm_name() 0 3 1
A get_time() 0 30 3
A run_on_thread() 0 21 1
A now() 0 11 1
C listen_to() 0 140 8
1
"""Utilities functions used in Kytos."""
2 1
import logging
3 1
from concurrent.futures import ThreadPoolExecutor
4 1
from datetime import datetime, timezone
5 1
from threading import Thread
6
7 1
from kytos.core.apm import ElasticAPM
8 1
from kytos.core.config import KytosConfig
9
10 1
__all__ = ['listen_to', 'now', 'run_on_thread', 'get_time']
11
12 1
LOG = logging.getLogger(__name__)
13
14
# APP_MSG = "[App %s] %s | ID: %02d | R: %02d | P: %02d | F: %s"
15
16
17 1
def get_thread_pool_max_workers():
18
    """Get the number of thread pool max workers."""
19 1
    return int(KytosConfig().options["daemon"].thread_pool_max_workers)
20
21
22 1
def get_apm_name():
23
    """Get apm backend name."""
24 1
    return KytosConfig().options["daemon"].apm
25
26
27
# pylint: disable=invalid-name
28 1
executor = None
29 1
if get_thread_pool_max_workers():
30 1
    executor = ThreadPoolExecutor(max_workers=get_thread_pool_max_workers())
31
32
33 1
def listen_to(event, *events):
34
    """Decorate Event Listener methods.
35
36
    This decorator was built to be used on NAPPs methods to define which
37
    type of event the method will handle. With this, we will be able to
38
    'schedule' the app/method to receive an event when a new event is
39
    registered on the controller buffers.
40
    By using the run_on_thread decorator, we also guarantee that the method
41
    (handler) will be called from inside a new thread, avoiding this method to
42
    block its caller.
43
44
    The decorator will add an attribute to the method called 'events', that
45
    will be a list of the events that the method will handle.
46
47
    The event that will be listened to is always a string, but it can represent
48
    a regular expression to match against multiple Event Types. All listened
49
    events are documented in :doc:`/developer/listened_events` section.
50
51
    Example of usage:
52
53
    .. code-block:: python3
54
55
        class MyAppClass(KytosApp):
56
            @listen_to('kytos/of_core.messages.in')
57
            def my_handler_of_message_in(self, event):
58
                # Do stuff here...
59
60
            @listen_to('kytos/of_core.messages.out')
61
            def my_handler_of_message_out(self, event):
62
                # Do stuff here...
63
64
            @listen_to('kytos/of_core.messages.in.ofpt_hello',
65
                       'kytos/of_core.messages.out.ofpt_hello')
66
            def my_handler_of_hello_messages(self, event):
67
                # Do stuff here...
68
69
            @listen_to('kytos/of_core.message.*.hello')
70
            def my_other_handler_of_hello_messages(self, event):
71
                # Do stuff here...
72
73
            @listen_to('kytos/of_core.message.*.hello')
74
            def my_handler_of_hello_messages(self, event):
75
                # Do stuff here...
76
77
            @listen_to('kytos/of_core.message.*')
78
            def my_stats_handler_of_any_message(self, event):
79
                # Do stuff here...
80
    """
81 1
    def thread_decorator(handler):
82
        """Decorate the handler method.
83
84
        Returns:
85
            A method with an `events` attribute (list of events to be listened)
86
            and also decorated to run on a new thread.
87
88
        """
89
        @run_on_thread
90
        def threaded_handler(*args):
91
            """Decorate the handler to run from a new thread."""
92
            handler(*args)
93
94
        threaded_handler.events = [event]
95
        threaded_handler.events.extend(events)
96
        return threaded_handler
97
98
    # pylint: disable=broad-except
99 1
    def thread_pool_decorator(handler):
100
        """Decorate the handler method.
101
102
        Returns:
103
            A method with an `events` attribute (list of events to be listened)
104
            and also decorated to run on in the thread pool
105
106
        """
107 1
        def done_callback(future):
108
            """Done callback."""
109
            if not future.exception():
110
                _ = future.result()
111
                return
112
113
        # pylint: disable=unused-argument
114 1
        def handler_context(*args, **kwargs):
115
            """Handler's context for ThreadPool."""
116
            cls, kytos_event = args[0], args[1]
117
            try:
118
                result = handler(*args)
119
            except Exception as exc:
120
                result = None
121
                exc_str = f"{type(exc)}: {str(exc)}"
122
                LOG.error(f"listen_to handler: {handler}, "
123
                          f"args: {args}, exception: {exc_str}")
124
                if hasattr(cls, "controller"):
125
                    cls.controller.dead_letter.add_event(kytos_event)
126
            return result
127
128 1
        def handler_context_apm(*args, apm_client=None):
129
            """Handler's context for ThreadPool APM instrumentation."""
130
            cls, kytos_event = args[0], args[1]
131
            trace_parent = kytos_event.trace_parent
132
            tx_type = "kytos_event"
133
            tx = apm_client.begin_transaction(transaction_type=tx_type,
134
                                              trace_parent=trace_parent)
135
            kytos_event.trace_parent = tx.trace_parent
136
            tx.name = f"{kytos_event.name}@{cls.napp_id}"
137
            try:
138
                result = handler(*args)
139
                tx.result = result
140
            except Exception as exc:
141
                result = None
142
                exc_str = f"{type(exc)}: {str(exc)}"
143
                LOG.error(f"listen_to handler: {handler}, "
144
                          f"args: {args}, exception: {exc_str}")
145
                if hasattr(cls, "controller"):
146
                    cls.controller.dead_letter.add_event(kytos_event)
147
                apm_client.capture_exception(
148
                    exc_info=(type(exc), exc, exc.__traceback__),
149
                    context={"args": args},
150
                    handled=False,
151
                )
152
            tx.end()
153
            apm_client.tracer.queue_func("transaction", tx.to_dict())
154
            return result
155
156 1
        handler_func, kwargs = handler_context, {}
157 1
        if get_apm_name() == "es":
158
            handler_func = handler_context_apm
159
            kwargs = dict(apm_client=ElasticAPM.get_client())
160
161 1
        def inner(*args):
162
            """Decorate the handler to run in the thread pool."""
163 1
            future = executor.submit(handler_func, *args, **kwargs)
164 1
            future.add_done_callback(done_callback)
165
166 1
        inner.events = [event]
167 1
        inner.events.extend(events)
168 1
        return inner
169
170 1
    if get_thread_pool_max_workers():
171 1
        return thread_pool_decorator
172
    return thread_decorator
173
174
175 1
def now(tzone=timezone.utc):
176
    """Return the current datetime (default to UTC).
177
178
    Args:
179
        tzone (datetime.timezone): Specific time zone used in datetime.
180
181
    Returns:
182
        datetime.datetime.now: Date time with specific time zone.
183
184
    """
185 1
    return datetime.now(tzone)
186
187
188 1
def run_on_thread(method):
189
    """Decorate to run the decorated method inside a new thread.
190
191
    Args:
192
        method (function): function used to run as a new thread.
193
194
    Returns:
195
        Decorated method that will run inside a new thread.
196
        When the decorated method is called, it will not return the created
197
        thread to the caller.
198
199
    """
200 1
    def threaded_method(*args):
201
        """Ensure the handler method runs inside a new thread."""
202 1
        thread = Thread(target=method, args=args)
203
204
        # Set daemon mode so that we don't have to wait for these threads
205
        # to finish when exiting Kytos
206 1
        thread.daemon = True
207 1
        thread.start()
208 1
    return threaded_method
209
210
211 1
def get_time(data=None):
212
    """Receive a dictionary or a string and return a datatime instance.
213
214
    data = {"year": 2006,
215
            "month": 11,
216
            "day": 21,
217
            "hour": 16,
218
            "minute": 30 ,
219
            "second": 00}
220
221
    or
222
223
    data = "21/11/06 16:30:00"
224
225
    2018-04-17T17:13:50Z
226
227
    Args:
228
        data (str, dict): python dict or string to be converted to datetime
229
230
    Returns:
231
        datetime: datetime instance.
232
233
    """
234 1
    if isinstance(data, str):
235 1
        date = datetime.strptime(data, "%Y-%m-%dT%H:%M:%S")
236 1
    elif isinstance(data, dict):
237 1
        date = datetime(**data)
238
    else:
239 1
        return None
240
    return date.replace(tzinfo=timezone.utc)
241