Passed
Push — master ( 074d5c...e55b84 )
by Vinicius
07:05 queued 38s
created

kytos.core.helpers   A

Complexity

Total Complexity 10

Size/Duplication

Total Lines 195
Duplicated Lines 0 %

Test Coverage

Coverage 78.43%

Importance

Changes 0
Metric Value
eloc 56
dl 0
loc 195
ccs 40
cts 51
cp 0.7843
rs 10
c 0
b 0
f 0
wmc 10

5 Functions

Rating   Name   Duplication   Size   Complexity  
A get_thread_pool_max_workers() 0 3 1
A get_time() 0 30 3
A run_on_thread() 0 21 1
A listen_to() 0 100 4
A now() 0 11 1
1
"""Utilities functions used in Kytos."""
2 2
import logging
3 2
from concurrent.futures import ThreadPoolExecutor
4 2
from datetime import datetime, timezone
5 2
from threading import Thread
6
7 2
from kytos.core.config import KytosConfig
8
9 2
__all__ = ['listen_to', 'now', 'run_on_thread', 'get_time']
10
11 2
LOG = logging.getLogger(__name__)
12
13
# APP_MSG = "[App %s] %s | ID: %02d | R: %02d | P: %02d | F: %s"
14
15
16 2
def get_thread_pool_max_workers():
17
    """Get the number of thread pool max workers."""
18 2
    return int(KytosConfig().options["daemon"].thread_pool_max_workers)
19
20
21
# pylint: disable=invalid-name
22 2
executor = None
23 2
if get_thread_pool_max_workers():
24 2
    executor = ThreadPoolExecutor(max_workers=get_thread_pool_max_workers())
25
26
27 2
def listen_to(event, *events):
28
    """Decorate Event Listener methods.
29
30
    This decorator was built to be used on NAPPs methods to define which
31
    type of event the method will handle. With this, we will be able to
32
    'schedule' the app/method to receive an event when a new event is
33
    registered on the controller buffers.
34
    By using the run_on_thread decorator, we also guarantee that the method
35
    (handler) will be called from inside a new thread, avoiding this method to
36
    block its caller.
37
38
    The decorator will add an attribute to the method called 'events', that
39
    will be a list of the events that the method will handle.
40
41
    The event that will be listened to is always a string, but it can represent
42
    a regular expression to match against multiple Event Types. All listened
43
    events are documented in :doc:`/developer/listened_events` section.
44
45
    Example of usage:
46
47
    .. code-block:: python3
48
49
        class MyAppClass(KytosApp):
50
            @listen_to('kytos/of_core.messages.in')
51
            def my_handler_of_message_in(self, event):
52
                # Do stuff here...
53
54
            @listen_to('kytos/of_core.messages.out')
55
            def my_handler_of_message_out(self, event):
56
                # Do stuff here...
57
58
            @listen_to('kytos/of_core.messages.in.ofpt_hello',
59
                       'kytos/of_core.messages.out.ofpt_hello')
60
            def my_handler_of_hello_messages(self, event):
61
                # Do stuff here...
62
63
            @listen_to('kytos/of_core.message.*.hello')
64
            def my_other_handler_of_hello_messages(self, event):
65
                # Do stuff here...
66
67
            @listen_to('kytos/of_core.message.*.hello')
68
            def my_handler_of_hello_messages(self, event):
69
                # Do stuff here...
70
71
            @listen_to('kytos/of_core.message.*')
72
            def my_stats_handler_of_any_message(self, event):
73
                # Do stuff here...
74
    """
75 2
    def thread_decorator(handler):
76
        """Decorate the handler method.
77
78
        Returns:
79
            A method with an `events` attribute (list of events to be listened)
80
            and also decorated to run on a new thread.
81
82
        """
83
        @run_on_thread
84
        def threaded_handler(*args):
85
            """Decorate the handler to run from a new thread."""
86
            handler(*args)
87
88
        threaded_handler.events = [event]
89
        threaded_handler.events.extend(events)
90
        return threaded_handler
91
92 2
    def thread_pool_decorator(handler):
93
        """Decorate the handler method.
94
95
        Returns:
96
            A method with an `events` attribute (list of events to be listened)
97
            and also decorated to run on in the thread pool
98
99
        """
100 2
        def done_callback(future):
101
            """Done callback."""
102
            if not future.exception():
103
                _ = future.result()
104
            else:
105
                exc_str = f"{type(future.exception())}: {future.exception()}"
106
                args = (
107
                    getattr(future, "__args")
108
                    if hasattr(future, "__args")
109
                    else tuple()
110
                )
111
                LOG.error(f"listen_to handler: {handler}, "
112
                          f"args: {args}, exception: {exc_str}")
113
114 2
        def inner(*args):
115
            """Decorate the handler to run in the thread pool."""
116 2
            future = executor.submit(handler, *args)
117 2
            setattr(future, "__args", args)
118 2
            future.add_done_callback(done_callback)
119
120 2
        inner.events = [event]
121 2
        inner.events.extend(events)
122 2
        return inner
123
124 2
    if get_thread_pool_max_workers():
125 2
        return thread_pool_decorator
126
    return thread_decorator
127
128
129 2
def now(tzone=timezone.utc):
130
    """Return the current datetime (default to UTC).
131
132
    Args:
133
        tzone (datetime.timezone): Specific time zone used in datetime.
134
135
    Returns:
136
        datetime.datetime.now: Date time with specific time zone.
137
138
    """
139 2
    return datetime.now(tzone)
140
141
142 2
def run_on_thread(method):
143
    """Decorate to run the decorated method inside a new thread.
144
145
    Args:
146
        method (function): function used to run as a new thread.
147
148
    Returns:
149
        Decorated method that will run inside a new thread.
150
        When the decorated method is called, it will not return the created
151
        thread to the caller.
152
153
    """
154 2
    def threaded_method(*args):
155
        """Ensure the handler method runs inside a new thread."""
156 2
        thread = Thread(target=method, args=args)
157
158
        # Set daemon mode so that we don't have to wait for these threads
159
        # to finish when exiting Kytos
160 2
        thread.daemon = True
161 2
        thread.start()
162 2
    return threaded_method
163
164
165 2
def get_time(data=None):
166
    """Receive a dictionary or a string and return a datatime instance.
167
168
    data = {"year": 2006,
169
            "month": 11,
170
            "day": 21,
171
            "hour": 16,
172
            "minute": 30 ,
173
            "second": 00}
174
175
    or
176
177
    data = "21/11/06 16:30:00"
178
179
    2018-04-17T17:13:50Z
180
181
    Args:
182
        data (str, dict): python dict or string to be converted to datetime
183
184
    Returns:
185
        datetime: datetime instance.
186
187
    """
188 2
    if isinstance(data, str):
189 2
        date = datetime.strptime(data, "%Y-%m-%dT%H:%M:%S")
190 2
    elif isinstance(data, dict):
191 2
        date = datetime(**data)
192
    else:
193 2
        return None
194
    return date.replace(tzinfo=timezone.utc)
195