Passed
Pull Request — master (#196)
by Vinicius
11:54 queued 04:17
created

kytos.core.helpers   A

Complexity

Total Complexity 11

Size/Duplication

Total Lines 196
Duplicated Lines 0 %

Test Coverage

Coverage 78.85%

Importance

Changes 0
Metric Value
wmc 11
eloc 57
dl 0
loc 196
ccs 41
cts 52
cp 0.7885
rs 10
c 0
b 0
f 0

5 Functions

Rating   Name   Duplication   Size   Complexity  
A get_thread_pool_max_workers() 0 3 1
A get_time() 0 30 3
A run_on_thread() 0 21 1
B listen_to() 0 101 5
A now() 0 11 1
1
"""Utilities functions used in Kytos."""
2 2
import logging
3 2
from concurrent.futures import ThreadPoolExecutor
4 2
from datetime import datetime, timezone
5 2
from threading import Thread
6
7 2
from kytos.core.config import KytosConfig
8
9 2
__all__ = ['listen_to', 'now', 'run_on_thread', 'get_time']
10
11 2
LOG = logging.getLogger(__name__)
12
13
# APP_MSG = "[App %s] %s | ID: %02d | R: %02d | P: %02d | F: %s"
14
15
16 2
def get_thread_pool_max_workers():
17
    """Get the number of thread pool max workers."""
18 2
    return int(KytosConfig().options["daemon"].thread_pool_max_workers)
19
20
21
# pylint: disable=invalid-name
22 2
executor = None
23 2
if get_thread_pool_max_workers():
24 2
    executor = ThreadPoolExecutor(max_workers=get_thread_pool_max_workers())
25
26
27 2
def listen_to(event, *events):
28
    """Decorate Event Listener methods.
29
30
    This decorator was built to be used on NAPPs methods to define which
31
    type of event the method will handle. With this, we will be able to
32
    'schedule' the app/method to receive an event when a new event is
33
    registered on the controller buffers.
34
    By using the run_on_thread decorator, we also guarantee that the method
35
    (handler) will be called from inside a new thread, avoiding this method to
36
    block its caller.
37
38
    The decorator will add an attribute to the method called 'events', that
39
    will be a list of the events that the method will handle.
40
41
    The event that will be listened to is always a string, but it can represent
42
    a regular expression to match against multiple Event Types. All listened
43
    events are documented in :doc:`/developer/listened_events` section.
44
45
    Example of usage:
46
47
    .. code-block:: python3
48
49
        class MyAppClass(KytosApp):
50
            @listen_to('kytos/of_core.messages.in')
51
            def my_handler_of_message_in(self, event):
52
                # Do stuff here...
53
54
            @listen_to('kytos/of_core.messages.out')
55
            def my_handler_of_message_out(self, event):
56
                # Do stuff here...
57
58
            @listen_to('kytos/of_core.messages.in.ofpt_hello',
59
                       'kytos/of_core.messages.out.ofpt_hello')
60
            def my_handler_of_hello_messages(self, event):
61
                # Do stuff here...
62
63
            @listen_to('kytos/of_core.message.*.hello')
64
            def my_other_handler_of_hello_messages(self, event):
65
                # Do stuff here...
66
67
            @listen_to('kytos/of_core.message.*.hello')
68
            def my_handler_of_hello_messages(self, event):
69
                # Do stuff here...
70
71
            @listen_to('kytos/of_core.message.*')
72
            def my_stats_handler_of_any_message(self, event):
73
                # Do stuff here...
74
    """
75 2
    def thread_decorator(handler):
76
        """Decorate the handler method.
77
78
        Returns:
79
            A method with an `events` attribute (list of events to be listened)
80
            and also decorated to run on a new thread.
81
82
        """
83
        @run_on_thread
84
        def threaded_handler(*args):
85
            """Decorate the handler to run from a new thread."""
86
            handler(*args)
87
88
        threaded_handler.events = [event]
89
        threaded_handler.events.extend(events)
90
        return threaded_handler
91
92 2
    def thread_pool_decorator(handler):
93
        """Decorate the handler method.
94
95
        Returns:
96
            A method with an `events` attribute (list of events to be listened)
97
            and also decorated to run on in the thread pool
98
99
        """
100 2
        def done_callback(future):
101
            """Done callback."""
102
            if not future.exception():
103
                _ = future.result()
104
            else:
105
                exc_str = f"{type(future.exception())}: {future.exception()}"
106
                args = (
107
                    getattr(future, "__inner_args")
108
                    if hasattr(future, "__inner_args")
109
                    else tuple()
110
                )
111
                LOG.error(f"listen_to handler: {handler}, "
112
                          f"args: {args}, exception: {exc_str}")
113
114 2
        def inner(*args):
115
            """Decorate the handler to run in the thread pool."""
116 2
            future = executor.submit(handler, *args)
117 2
            inner_args = args[1:] if len(args) > 1 else args
118 2
            setattr(future, "__inner_args", inner_args)
119 2
            future.add_done_callback(done_callback)
120
121 2
        inner.events = [event]
122 2
        inner.events.extend(events)
123 2
        return inner
124
125 2
    if get_thread_pool_max_workers():
126 2
        return thread_pool_decorator
127
    return thread_decorator
128
129
130 2
def now(tzone=timezone.utc):
131
    """Return the current datetime (default to UTC).
132
133
    Args:
134
        tzone (datetime.timezone): Specific time zone used in datetime.
135
136
    Returns:
137
        datetime.datetime.now: Date time with specific time zone.
138
139
    """
140 2
    return datetime.now(tzone)
141
142
143 2
def run_on_thread(method):
144
    """Decorate to run the decorated method inside a new thread.
145
146
    Args:
147
        method (function): function used to run as a new thread.
148
149
    Returns:
150
        Decorated method that will run inside a new thread.
151
        When the decorated method is called, it will not return the created
152
        thread to the caller.
153
154
    """
155 2
    def threaded_method(*args):
156
        """Ensure the handler method runs inside a new thread."""
157 2
        thread = Thread(target=method, args=args)
158
159
        # Set daemon mode so that we don't have to wait for these threads
160
        # to finish when exiting Kytos
161 2
        thread.daemon = True
162 2
        thread.start()
163 2
    return threaded_method
164
165
166 2
def get_time(data=None):
167
    """Receive a dictionary or a string and return a datatime instance.
168
169
    data = {"year": 2006,
170
            "month": 11,
171
            "day": 21,
172
            "hour": 16,
173
            "minute": 30 ,
174
            "second": 00}
175
176
    or
177
178
    data = "21/11/06 16:30:00"
179
180
    2018-04-17T17:13:50Z
181
182
    Args:
183
        data (str, dict): python dict or string to be converted to datetime
184
185
    Returns:
186
        datetime: datetime instance.
187
188
    """
189 2
    if isinstance(data, str):
190 2
        date = datetime.strptime(data, "%Y-%m-%dT%H:%M:%S")
191 2
    elif isinstance(data, dict):
192 2
        date = datetime(**data)
193
    else:
194 2
        return None
195
    return date.replace(tzinfo=timezone.utc)
196