Test Failed
Pull Request — master (#196)
by Vinicius
09:32
created

kytos.core.helpers   A

Complexity

Total Complexity 11

Size/Duplication

Total Lines 197
Duplicated Lines 0 %

Test Coverage

Coverage 78.85%

Importance

Changes 0
Metric Value
eloc 57
dl 0
loc 197
ccs 41
cts 52
cp 0.7885
rs 10
c 0
b 0
f 0
wmc 11

5 Functions

Rating   Name   Duplication   Size   Complexity  
A get_thread_pool_max_workers() 0 3 1
A get_time() 0 30 3
A run_on_thread() 0 21 1
B listen_to() 0 101 5
A now() 0 11 1
1
"""Utilities functions used in Kytos."""
2 2
import logging
3
4 2
from concurrent.futures import ThreadPoolExecutor
5 2
from datetime import datetime, timezone
6 2
from threading import Thread
7
8 2
from kytos.core.config import KytosConfig
9
10 2
__all__ = ['listen_to', 'now', 'run_on_thread', 'get_time']
11
12 2
LOG = logging.getLogger(__name__)
13
14
# APP_MSG = "[App %s] %s | ID: %02d | R: %02d | P: %02d | F: %s"
15
16
17 2
def get_thread_pool_max_workers():
18
    """Get the number of thread pool max workers."""
19 2
    return int(KytosConfig().options["daemon"].thread_pool_max_workers)
20
21
22
# pylint: disable=invalid-name
23 2
executor = None
24 2
if get_thread_pool_max_workers():
25 2
    executor = ThreadPoolExecutor(max_workers=get_thread_pool_max_workers())
26
27
28 2
def listen_to(event, *events):
29
    """Decorate Event Listener methods.
30
31
    This decorator was built to be used on NAPPs methods to define which
32
    type of event the method will handle. With this, we will be able to
33
    'schedule' the app/method to receive an event when a new event is
34
    registered on the controller buffers.
35
    By using the run_on_thread decorator, we also guarantee that the method
36
    (handler) will be called from inside a new thread, avoiding this method to
37
    block its caller.
38
39
    The decorator will add an attribute to the method called 'events', that
40
    will be a list of the events that the method will handle.
41
42
    The event that will be listened to is always a string, but it can represent
43
    a regular expression to match against multiple Event Types. All listened
44
    events are documented in :doc:`/developer/listened_events` section.
45
46
    Example of usage:
47
48
    .. code-block:: python3
49
50
        class MyAppClass(KytosApp):
51
            @listen_to('kytos/of_core.messages.in')
52
            def my_handler_of_message_in(self, event):
53
                # Do stuff here...
54
55
            @listen_to('kytos/of_core.messages.out')
56
            def my_handler_of_message_out(self, event):
57
                # Do stuff here...
58
59
            @listen_to('kytos/of_core.messages.in.ofpt_hello',
60
                       'kytos/of_core.messages.out.ofpt_hello')
61
            def my_handler_of_hello_messages(self, event):
62
                # Do stuff here...
63
64
            @listen_to('kytos/of_core.message.*.hello')
65
            def my_other_handler_of_hello_messages(self, event):
66
                # Do stuff here...
67
68
            @listen_to('kytos/of_core.message.*.hello')
69
            def my_handler_of_hello_messages(self, event):
70
                # Do stuff here...
71
72
            @listen_to('kytos/of_core.message.*')
73
            def my_stats_handler_of_any_message(self, event):
74
                # Do stuff here...
75
    """
76 2
    def thread_decorator(handler):
77
        """Decorate the handler method.
78
79
        Returns:
80
            A method with an `events` attribute (list of events to be listened)
81
            and also decorated to run on a new thread.
82
83
        """
84
        @run_on_thread
85
        def threaded_handler(*args):
86
            """Decorate the handler to run from a new thread."""
87
            handler(*args)
88
89
        threaded_handler.events = [event]
90
        threaded_handler.events.extend(events)
91
        return threaded_handler
92
93 2
    def thread_pool_decorator(handler):
94
        """Decorate the handler method.
95
96
        Returns:
97
            A method with an `events` attribute (list of events to be listened)
98
            and also decorated to run on in the thread pool
99
100
        """
101 2
        def done_callback(future):
102
            """Done callback."""
103
            if not future.exception():
104
                _ = future.result()
105
            else:
106
                exc_str = f"{type(future.exception())}: {future.exception()}"
107
                args = (
108
                    getattr(future, "__inner_args")
109
                    if hasattr(future, "__inner_args")
110
                    else tuple()
111
                )
112
                LOG.error(f"listen_to handler: {handler}, "
113
                          f"args: {args}, exception: {exc_str}")
114
115 2
        def inner(*args):
116
            """Decorate the handler to run in the thread pool."""
117 2
            future = executor.submit(handler, *args)
118 2
            inner_args = args[1:] if len(args) > 1 else args
119 2
            setattr(future, "__inner_args", inner_args)
120 2
            future.add_done_callback(done_callback)
121
122 2
        inner.events = [event]
123 2
        inner.events.extend(events)
124 2
        return inner
125
126 2
    if get_thread_pool_max_workers():
127 2
        return thread_pool_decorator
128
    return thread_decorator
129
130
131 2
def now(tzone=timezone.utc):
132
    """Return the current datetime (default to UTC).
133
134
    Args:
135
        tzone (datetime.timezone): Specific time zone used in datetime.
136
137
    Returns:
138
        datetime.datetime.now: Date time with specific time zone.
139
140
    """
141 2
    return datetime.now(tzone)
142
143
144 2
def run_on_thread(method):
145
    """Decorate to run the decorated method inside a new thread.
146
147
    Args:
148
        method (function): function used to run as a new thread.
149
150
    Returns:
151
        Decorated method that will run inside a new thread.
152
        When the decorated method is called, it will not return the created
153
        thread to the caller.
154
155
    """
156 2
    def threaded_method(*args):
157
        """Ensure the handler method runs inside a new thread."""
158 2
        thread = Thread(target=method, args=args)
159
160
        # Set daemon mode so that we don't have to wait for these threads
161
        # to finish when exiting Kytos
162 2
        thread.daemon = True
163 2
        thread.start()
164 2
    return threaded_method
165
166
167 2
def get_time(data=None):
168
    """Receive a dictionary or a string and return a datatime instance.
169
170
    data = {"year": 2006,
171
            "month": 11,
172
            "day": 21,
173
            "hour": 16,
174
            "minute": 30 ,
175
            "second": 00}
176
177
    or
178
179
    data = "21/11/06 16:30:00"
180
181
    2018-04-17T17:13:50Z
182
183
    Args:
184
        data (str, dict): python dict or string to be converted to datetime
185
186
    Returns:
187
        datetime: datetime instance.
188
189
    """
190 2
    if isinstance(data, str):
191 2
        date = datetime.strptime(data, "%Y-%m-%dT%H:%M:%S")
192 2
    elif isinstance(data, dict):
193 2
        date = datetime(**data)
194
    else:
195 2
        return None
196
    return date.replace(tzinfo=timezone.utc)
197