1
|
|
|
"""Kytos Buffer Classes, based on Python Queue.""" |
2
|
1 |
|
import logging |
3
|
|
|
|
4
|
|
|
# from queue import Queue |
5
|
1 |
|
from janus import Queue |
6
|
|
|
|
7
|
1 |
|
from kytos.core.events import KytosEvent |
8
|
|
|
|
9
|
1 |
|
__all__ = ('KytosBuffers', ) |
10
|
|
|
|
11
|
1 |
|
LOG = logging.getLogger(__name__) |
12
|
|
|
|
13
|
|
|
|
14
|
1 |
|
class KytosEventBuffer: |
15
|
|
|
"""KytosEventBuffer represents a queue to store a set of KytosEvents.""" |
16
|
|
|
|
17
|
1 |
|
def __init__(self, name, event_base_class=None, loop=None): |
18
|
|
|
"""Contructor of KytosEventBuffer receive the parameters below. |
19
|
|
|
|
20
|
|
|
Args: |
21
|
|
|
name (string): name of KytosEventBuffer. |
22
|
|
|
event_base_class (class): Class of KytosEvent. |
23
|
|
|
""" |
24
|
1 |
|
self.name = name |
25
|
1 |
|
self._event_base_class = event_base_class |
26
|
1 |
|
self._loop = loop |
27
|
1 |
|
self._queue = Queue(loop=self._loop) |
28
|
1 |
|
self._reject_new_events = False |
29
|
|
|
|
30
|
1 |
|
def put(self, event): |
31
|
|
|
"""Insert an event in KytosEventBuffer if reject_new_events is False. |
32
|
|
|
|
33
|
|
|
Reject new events is True when a kytos/core.shutdown message was |
34
|
|
|
received. |
35
|
|
|
|
36
|
|
|
Args: |
37
|
|
|
event (:class:`~kytos.core.events.KytosEvent`): |
38
|
|
|
KytosEvent sent to queue. |
39
|
|
|
""" |
40
|
1 |
|
if not self._reject_new_events: |
41
|
1 |
|
self._queue.sync_q.put(event) |
42
|
1 |
|
LOG.debug('[buffer: %s] Added: %s', self.name, event.name) |
43
|
|
|
|
44
|
1 |
|
if event.name == "kytos/core.shutdown": |
45
|
1 |
|
LOG.info('[buffer: %s] Stop mode enabled. Rejecting new events.', |
46
|
|
|
self.name) |
47
|
1 |
|
self._reject_new_events = True |
48
|
|
|
|
49
|
1 |
|
async def aput(self, event): |
50
|
|
|
"""Insert a event in KytosEventBuffer if reject new events is False. |
51
|
|
|
|
52
|
|
|
Reject new events is True when a kytos/core.shutdown message was |
53
|
|
|
received. |
54
|
|
|
|
55
|
|
|
Args: |
56
|
|
|
event (:class:`~kytos.core.events.KytosEvent`): |
57
|
|
|
KytosEvent sent to queue. |
58
|
|
|
""" |
59
|
|
|
# qsize = self._queue.async_q.qsize() |
60
|
|
|
# print('qsize before:', qsize) |
61
|
1 |
|
if not self._reject_new_events: |
62
|
1 |
|
await self._queue.async_q.put(event) |
63
|
1 |
|
LOG.debug('[buffer: %s] Added: %s', self.name, event.name) |
64
|
|
|
|
65
|
|
|
# qsize = self._queue.async_q.qsize() |
66
|
|
|
# print('qsize after:', qsize) |
67
|
|
|
|
68
|
1 |
|
if event.name == "kytos/core.shutdown": |
69
|
1 |
|
LOG.info('[buffer: %s] Stop mode enabled. Rejecting new events.', |
70
|
|
|
self.name) |
71
|
1 |
|
self._reject_new_events = True |
72
|
|
|
|
73
|
1 |
|
def get(self): |
74
|
|
|
"""Remove and return a event from top of queue. |
75
|
|
|
|
76
|
|
|
Returns: |
77
|
|
|
:class:`~kytos.core.events.KytosEvent`: |
78
|
|
|
Event removed from top of queue. |
79
|
|
|
|
80
|
|
|
""" |
81
|
1 |
|
event = self._queue.sync_q.get() |
82
|
|
|
|
83
|
1 |
|
LOG.debug('[buffer: %s] Removed: %s', self.name, event.name) |
84
|
|
|
|
85
|
1 |
|
return event |
86
|
|
|
|
87
|
1 |
|
async def aget(self): |
88
|
|
|
"""Remove and return a event from top of queue. |
89
|
|
|
|
90
|
|
|
Returns: |
91
|
|
|
:class:`~kytos.core.events.KytosEvent`: |
92
|
|
|
Event removed from top of queue. |
93
|
|
|
|
94
|
|
|
""" |
95
|
1 |
|
event = await self._queue.async_q.get() |
96
|
|
|
|
97
|
1 |
|
LOG.debug('[buffer: %s] Removed: %s', self.name, event.name) |
98
|
|
|
|
99
|
1 |
|
return event |
100
|
|
|
|
101
|
1 |
|
def task_done(self): |
102
|
|
|
"""Indicate that a formerly enqueued task is complete. |
103
|
|
|
|
104
|
|
|
If a :func:`~kytos.core.buffers.KytosEventBuffer.join` is currently |
105
|
|
|
blocking, it will resume if all itens in KytosEventBuffer have been |
106
|
|
|
processed (meaning that a task_done() call was received for every item |
107
|
|
|
that had been put() into the KytosEventBuffer). |
108
|
|
|
""" |
109
|
1 |
|
self._queue.sync_q.task_done() |
110
|
|
|
|
111
|
1 |
|
def join(self): |
112
|
|
|
"""Block until all events are gotten and processed. |
113
|
|
|
|
114
|
|
|
A item is processed when the method task_done is called. |
115
|
|
|
""" |
116
|
1 |
|
self._queue.sync_q.join() |
117
|
|
|
|
118
|
1 |
|
def qsize(self): |
119
|
|
|
"""Return the size of KytosEventBuffer.""" |
120
|
1 |
|
return self._queue.sync_q.qsize() |
121
|
|
|
|
122
|
1 |
|
def empty(self): |
123
|
|
|
"""Return True if KytosEventBuffer is empty.""" |
124
|
1 |
|
return self._queue.sync_q.empty() |
125
|
|
|
|
126
|
1 |
|
def full(self): |
127
|
|
|
"""Return True if KytosEventBuffer is full of KytosEvent.""" |
128
|
1 |
|
return self._queue.sync_q.full() |
129
|
|
|
|
130
|
|
|
|
131
|
1 |
|
class KytosBuffers: |
132
|
|
|
"""Set of KytosEventBuffer used in Kytos.""" |
133
|
|
|
|
134
|
1 |
|
def __init__(self, loop=None): |
135
|
|
|
"""Build four KytosEventBuffers. |
136
|
|
|
|
137
|
|
|
:attr:`raw`: :class:`~kytos.core.buffers.KytosEventBuffer` with events |
138
|
|
|
received from network. |
139
|
|
|
|
140
|
|
|
:attr:`msg_in`: :class:`~kytos.core.buffers.KytosEventBuffer` with |
141
|
|
|
events to be received. |
142
|
|
|
|
143
|
|
|
:attr:`msg_out`: :class:`~kytos.core.buffers.KytosEventBuffer` with |
144
|
|
|
events to be sent. |
145
|
|
|
|
146
|
|
|
:attr:`app`: :class:`~kytos.core.buffers.KytosEventBuffer` with events |
147
|
|
|
sent to NApps. |
148
|
|
|
""" |
149
|
1 |
|
self._loop = loop |
150
|
1 |
|
self.raw = KytosEventBuffer('raw_event', loop=self._loop) |
151
|
1 |
|
self.msg_in = KytosEventBuffer('msg_in_event', loop=self._loop) |
152
|
1 |
|
self.msg_out = KytosEventBuffer('msg_out_event', loop=self._loop) |
153
|
1 |
|
self.app = KytosEventBuffer('app_event', loop=self._loop) |
154
|
|
|
|
155
|
1 |
|
def send_stop_signal(self): |
156
|
|
|
"""Send a ``kytos/core.shutdown`` event to each buffer.""" |
157
|
1 |
|
LOG.info('Stop signal received by Kytos buffers.') |
158
|
1 |
|
LOG.info('Sending KytosShutdownEvent to all apps.') |
159
|
1 |
|
event = KytosEvent(name='kytos/core.shutdown') |
160
|
1 |
|
self.raw.put(event) |
161
|
1 |
|
self.msg_in.put(event) |
162
|
1 |
|
self.msg_out.put(event) |
163
|
|
|
self.app.put(event) |
164
|
|
|
|