1
|
|
|
"""Kytos Buffer Classes, based on Python Queue.""" |
2
|
1 |
|
import logging |
3
|
|
|
|
4
|
1 |
|
from janus import PriorityQueue, Queue |
5
|
|
|
|
6
|
1 |
|
from kytos.core.events import KytosEvent |
7
|
1 |
|
from kytos.core.helpers import get_thread_pool_max_workers |
8
|
|
|
|
9
|
1 |
|
__all__ = ('KytosBuffers', ) |
10
|
|
|
|
11
|
1 |
|
LOG = logging.getLogger(__name__) |
12
|
|
|
|
13
|
|
|
|
14
|
1 |
|
class KytosEventBuffer: |
15
|
|
|
"""KytosEventBuffer represents a queue to store a set of KytosEvents.""" |
16
|
|
|
|
17
|
1 |
|
def __init__(self, name, event_base_class=None, loop=None, maxsize=0, |
18
|
|
|
queue_cls=Queue): |
19
|
|
|
"""Contructor of KytosEventBuffer receive the parameters below. |
20
|
|
|
|
21
|
|
|
Args: |
22
|
|
|
name (string): name of KytosEventBuffer. |
23
|
|
|
event_base_class (class): Class of KytosEvent. |
24
|
|
|
loop (class): asyncio event loop |
25
|
|
|
maxsize (int): maxsize _queue producer buffer |
26
|
|
|
queue_cls (class): queue class from janus |
27
|
|
|
""" |
28
|
1 |
|
self.name = name |
29
|
1 |
|
self._event_base_class = event_base_class |
30
|
1 |
|
self._loop = loop |
31
|
1 |
|
self._queue = queue_cls(maxsize=maxsize, loop=self._loop) |
32
|
1 |
|
self._reject_new_events = False |
33
|
|
|
|
34
|
1 |
|
def put(self, event): |
35
|
|
|
"""Insert an event in KytosEventBuffer if reject_new_events is False. |
36
|
|
|
|
37
|
|
|
Reject new events is True when a kytos/core.shutdown message was |
38
|
|
|
received. |
39
|
|
|
|
40
|
|
|
Args: |
41
|
|
|
event (:class:`~kytos.core.events.KytosEvent`): |
42
|
|
|
KytosEvent sent to queue. |
43
|
|
|
""" |
44
|
1 |
|
if not self._reject_new_events: |
45
|
1 |
|
self._queue.sync_q.put(event) |
46
|
1 |
|
LOG.debug('[buffer: %s] Added: %s', self.name, event.name) |
47
|
|
|
|
48
|
1 |
|
if event.name == "kytos/core.shutdown": |
49
|
1 |
|
LOG.info('[buffer: %s] Stop mode enabled. Rejecting new events.', |
50
|
|
|
self.name) |
51
|
1 |
|
self._reject_new_events = True |
52
|
|
|
|
53
|
1 |
|
async def aput(self, event): |
54
|
|
|
"""Insert a event in KytosEventBuffer if reject new events is False. |
55
|
|
|
|
56
|
|
|
Reject new events is True when a kytos/core.shutdown message was |
57
|
|
|
received. |
58
|
|
|
|
59
|
|
|
Args: |
60
|
|
|
event (:class:`~kytos.core.events.KytosEvent`): |
61
|
|
|
KytosEvent sent to queue. |
62
|
|
|
""" |
63
|
1 |
|
if not self._reject_new_events: |
64
|
1 |
|
await self._queue.async_q.put(event) |
65
|
1 |
|
LOG.debug('[buffer: %s] Added: %s', self.name, event.name) |
66
|
|
|
|
67
|
1 |
|
if event.name == "kytos/core.shutdown": |
68
|
1 |
|
LOG.info('[buffer: %s] Stop mode enabled. Rejecting new events.', |
69
|
|
|
self.name) |
70
|
1 |
|
self._reject_new_events = True |
71
|
|
|
|
72
|
1 |
|
def get(self): |
73
|
|
|
"""Remove and return a event from top of queue. |
74
|
|
|
|
75
|
|
|
Returns: |
76
|
|
|
:class:`~kytos.core.events.KytosEvent`: |
77
|
|
|
Event removed from top of queue. |
78
|
|
|
|
79
|
|
|
""" |
80
|
1 |
|
event = self._queue.sync_q.get() |
81
|
1 |
|
LOG.debug('[buffer: %s] Removed: %s', self.name, event.name) |
82
|
|
|
|
83
|
1 |
|
return event |
84
|
|
|
|
85
|
1 |
|
async def aget(self): |
86
|
|
|
"""Remove and return a event from top of queue. |
87
|
|
|
|
88
|
|
|
Returns: |
89
|
|
|
:class:`~kytos.core.events.KytosEvent`: |
90
|
|
|
Event removed from top of queue. |
91
|
|
|
|
92
|
|
|
""" |
93
|
1 |
|
event = await self._queue.async_q.get() |
94
|
1 |
|
LOG.debug('[buffer: %s] Removed: %s', self.name, event.name) |
95
|
|
|
|
96
|
1 |
|
return event |
97
|
|
|
|
98
|
1 |
|
def task_done(self): |
99
|
|
|
"""Indicate that a formerly enqueued task is complete. |
100
|
|
|
|
101
|
|
|
If a :func:`~kytos.core.buffers.KytosEventBuffer.join` is currently |
102
|
|
|
blocking, it will resume if all itens in KytosEventBuffer have been |
103
|
|
|
processed (meaning that a task_done() call was received for every item |
104
|
|
|
that had been put() into the KytosEventBuffer). |
105
|
|
|
""" |
106
|
1 |
|
self._queue.sync_q.task_done() |
107
|
|
|
|
108
|
1 |
|
def join(self): |
109
|
|
|
"""Block until all events are gotten and processed. |
110
|
|
|
|
111
|
|
|
A item is processed when the method task_done is called. |
112
|
|
|
""" |
113
|
1 |
|
self._queue.sync_q.join() |
114
|
|
|
|
115
|
1 |
|
def qsize(self): |
116
|
|
|
"""Return the size of KytosEventBuffer.""" |
117
|
1 |
|
return self._queue.sync_q.qsize() |
118
|
|
|
|
119
|
1 |
|
def empty(self): |
120
|
|
|
"""Return True if KytosEventBuffer is empty.""" |
121
|
1 |
|
return self._queue.sync_q.empty() |
122
|
|
|
|
123
|
1 |
|
def full(self): |
124
|
|
|
"""Return True if KytosEventBuffer is full of KytosEvent.""" |
125
|
1 |
|
return self._queue.sync_q.full() |
126
|
|
|
|
127
|
|
|
|
128
|
1 |
|
class KytosBuffers: |
129
|
|
|
"""Set of KytosEventBuffer used in Kytos.""" |
130
|
|
|
|
131
|
1 |
|
def __init__(self, loop=None): |
132
|
|
|
"""Build four KytosEventBuffers. |
133
|
|
|
|
134
|
|
|
:attr:`conn`: :class:`~kytos.core.buffers.KytosEventBuffer` with events |
135
|
|
|
received from connection events. |
136
|
|
|
|
137
|
|
|
:attr:`raw`: :class:`~kytos.core.buffers.KytosEventBuffer` with events |
138
|
|
|
received from network. |
139
|
|
|
|
140
|
|
|
:attr:`msg_in`: :class:`~kytos.core.buffers.KytosEventBuffer` with |
141
|
|
|
events to be received. |
142
|
|
|
|
143
|
|
|
:attr:`msg_out`: :class:`~kytos.core.buffers.KytosEventBuffer` with |
144
|
|
|
events to be sent. |
145
|
|
|
|
146
|
|
|
:attr:`app`: :class:`~kytos.core.buffers.KytosEventBuffer` with events |
147
|
|
|
sent to NApps. |
148
|
|
|
""" |
149
|
1 |
|
self._pool_max_workers = get_thread_pool_max_workers() |
150
|
1 |
|
self._loop = loop |
151
|
1 |
|
self.conn = KytosEventBuffer("conn", loop=self._loop) |
152
|
1 |
|
self.raw = KytosEventBuffer("raw", loop=self._loop, |
153
|
|
|
maxsize=self._get_maxsize("sb")) |
154
|
1 |
|
self.msg_in = KytosEventBuffer("msg_in", loop=self._loop, |
155
|
|
|
maxsize=self._get_maxsize("sb"), |
156
|
|
|
queue_cls=PriorityQueue) |
157
|
1 |
|
self.msg_out = KytosEventBuffer("msg_out", loop=self._loop, |
158
|
|
|
maxsize=self._get_maxsize("sb"), |
159
|
|
|
queue_cls=PriorityQueue) |
160
|
1 |
|
self.app = KytosEventBuffer("app", loop=self._loop, |
161
|
|
|
maxsize=self._get_maxsize("app")) |
162
|
|
|
|
163
|
1 |
|
def get_all_buffers(self): |
164
|
|
|
"""Get all KytosEventBuffer instances.""" |
165
|
1 |
|
return [ |
166
|
|
|
event_buffer for event_buffer in self.__dict__.values() |
167
|
|
|
if isinstance(event_buffer, KytosEventBuffer) |
168
|
|
|
] |
169
|
|
|
|
170
|
1 |
|
def _get_maxsize(self, queue_name): |
171
|
|
|
"""Get queue maxsize if it's been set.""" |
172
|
1 |
|
return self._pool_max_workers.get(queue_name, 0) |
173
|
|
|
|
174
|
1 |
|
def send_stop_signal(self): |
175
|
|
|
"""Send a ``kytos/core.shutdown`` event to each buffer.""" |
176
|
1 |
|
LOG.info('Stop signal received by Kytos buffers.') |
177
|
1 |
|
LOG.info('Sending KytosShutdownEvent to all apps.') |
178
|
1 |
|
event = KytosEvent(name='kytos/core.shutdown') |
179
|
1 |
|
for buffer in self.get_all_buffers(): |
180
|
|
|
buffer.put(event) |
181
|
|
|
|