|
1
|
|
|
"""Kytos Buffer Classes, based on Python Queue.""" |
|
2
|
|
|
import logging |
|
3
|
|
|
from typing import Optional |
|
4
|
|
|
|
|
5
|
|
|
from janus import Queue |
|
6
|
|
|
|
|
7
|
|
|
LOG = logging.getLogger(__name__) |
|
8
|
|
|
|
|
9
|
|
|
|
|
10
|
|
|
class KytosEventBuffer: |
|
11
|
|
|
"""KytosEventBuffer represents a queue to store a set of KytosEvents.""" |
|
12
|
|
|
|
|
13
|
|
|
def __init__(self, name, queue: Queue = None): |
|
14
|
|
|
"""Contructor of KytosEventBuffer receive the parameters below. |
|
15
|
|
|
|
|
16
|
|
|
Args: |
|
17
|
|
|
name (string): name of KytosEventBuffer. |
|
18
|
|
|
event_base_class (class): Class of KytosEvent. |
|
19
|
|
|
maxsize (int): maxsize _queue producer buffer |
|
20
|
|
|
queue_cls (class): queue class from janus |
|
21
|
|
|
""" |
|
22
|
|
|
self.name = name |
|
23
|
|
|
self._queue = queue if queue is not None else Queue() |
|
24
|
|
|
self._reject_new_events = False |
|
25
|
|
|
|
|
26
|
|
|
def put(self, event, timeout: Optional[float] = None): |
|
27
|
|
|
"""Insert an event in KytosEventBuffer if reject_new_events is False. |
|
28
|
|
|
|
|
29
|
|
|
Reject new events is True when a kytos/core.shutdown message was |
|
30
|
|
|
received. |
|
31
|
|
|
|
|
32
|
|
|
Args: |
|
33
|
|
|
event (:class:`~kytos.core.events.KytosEvent`): |
|
34
|
|
|
KytosEvent sent to queue. |
|
35
|
|
|
timeout: Block if necessary until a free slot is available. |
|
36
|
|
|
If 'timeout' is a non-negative number, it blocks at most 'timeout' |
|
37
|
|
|
seconds and raises an Full exception if no free slot was available. |
|
38
|
|
|
""" |
|
39
|
|
|
if not self._reject_new_events: |
|
40
|
|
|
self._queue.sync_q.put(event, timeout=timeout) |
|
41
|
|
|
LOG.debug('[buffer: %s] Added: %s', self.name, event.name) |
|
42
|
|
|
|
|
43
|
|
|
if event.name == "kytos/core.shutdown": |
|
44
|
|
|
LOG.info('[buffer: %s] Stop mode enabled. Rejecting new events.', |
|
45
|
|
|
self.name) |
|
46
|
|
|
self._reject_new_events = True |
|
47
|
|
|
|
|
48
|
|
|
async def aput(self, event): |
|
49
|
|
|
"""Insert a event in KytosEventBuffer if reject new events is False. |
|
50
|
|
|
|
|
51
|
|
|
Reject new events is True when a kytos/core.shutdown message was |
|
52
|
|
|
received. |
|
53
|
|
|
|
|
54
|
|
|
Args: |
|
55
|
|
|
event (:class:`~kytos.core.events.KytosEvent`): |
|
56
|
|
|
KytosEvent sent to queue. |
|
57
|
|
|
""" |
|
58
|
|
|
if not self._reject_new_events: |
|
59
|
|
|
await self._queue.async_q.put(event) |
|
60
|
|
|
LOG.debug('[buffer: %s] Added: %s', self.name, event.name) |
|
61
|
|
|
|
|
62
|
|
|
if event.name == "kytos/core.shutdown": |
|
63
|
|
|
LOG.info('[buffer: %s] Stop mode enabled. Rejecting new events.', |
|
64
|
|
|
self.name) |
|
65
|
|
|
self._reject_new_events = True |
|
66
|
|
|
|
|
67
|
|
|
def get(self): |
|
68
|
|
|
"""Remove and return a event from top of queue. |
|
69
|
|
|
|
|
70
|
|
|
Returns: |
|
71
|
|
|
:class:`~kytos.core.events.KytosEvent`: |
|
72
|
|
|
Event removed from top of queue. |
|
73
|
|
|
|
|
74
|
|
|
""" |
|
75
|
|
|
event = self._queue.sync_q.get() |
|
76
|
|
|
LOG.debug('[buffer: %s] Removed: %s', self.name, event.name) |
|
77
|
|
|
|
|
78
|
|
|
return event |
|
79
|
|
|
|
|
80
|
|
|
async def aget(self): |
|
81
|
|
|
"""Remove and return a event from top of queue. |
|
82
|
|
|
|
|
83
|
|
|
Returns: |
|
84
|
|
|
:class:`~kytos.core.events.KytosEvent`: |
|
85
|
|
|
Event removed from top of queue. |
|
86
|
|
|
|
|
87
|
|
|
""" |
|
88
|
|
|
event = await self._queue.async_q.get() |
|
89
|
|
|
LOG.debug('[buffer: %s] Removed: %s', self.name, event.name) |
|
90
|
|
|
|
|
91
|
|
|
return event |
|
92
|
|
|
|
|
93
|
|
|
def task_done(self): |
|
94
|
|
|
"""Indicate that a formerly enqueued task is complete. |
|
95
|
|
|
|
|
96
|
|
|
If a :func:`~kytos.core.buffers.KytosEventBuffer.join` is currently |
|
97
|
|
|
blocking, it will resume if all itens in KytosEventBuffer have been |
|
98
|
|
|
processed (meaning that a task_done() call was received for every item |
|
99
|
|
|
that had been put() into the KytosEventBuffer). |
|
100
|
|
|
""" |
|
101
|
|
|
self._queue.sync_q.task_done() |
|
102
|
|
|
|
|
103
|
|
|
def join(self): |
|
104
|
|
|
"""Block until all events are gotten and processed. |
|
105
|
|
|
|
|
106
|
|
|
A item is processed when the method task_done is called. |
|
107
|
|
|
""" |
|
108
|
|
|
self._queue.sync_q.join() |
|
109
|
|
|
|
|
110
|
|
|
def qsize(self): |
|
111
|
|
|
"""Return the size of KytosEventBuffer.""" |
|
112
|
|
|
return self._queue.sync_q.qsize() |
|
113
|
|
|
|
|
114
|
|
|
def empty(self): |
|
115
|
|
|
"""Return True if KytosEventBuffer is empty.""" |
|
116
|
|
|
return self._queue.sync_q.empty() |
|
117
|
|
|
|
|
118
|
|
|
def full(self): |
|
119
|
|
|
"""Return True if KytosEventBuffer is full of KytosEvent.""" |
|
120
|
|
|
return self._queue.sync_q.full() |
|
121
|
|
|
|