Passed
Pull Request — master (#258)
by Vinicius
04:43
created

kytos.core.buffers.KytosEventBuffer.task_done()   A

Complexity

Conditions 1

Size

Total Lines 9
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 1

Importance

Changes 0
Metric Value
cc 1
eloc 2
nop 1
dl 0
loc 9
rs 10
c 0
b 0
f 0
ccs 2
cts 2
cp 1
crap 1
1
"""Kytos Buffer Classes, based on Python Queue."""
2 1
import logging
3
4 1
from janus import PriorityQueue, Queue
5
6 1
from kytos.core.events import KytosEvent
7 1
from kytos.core.helpers import get_thread_pool_max_workers
8
9 1
__all__ = ('KytosBuffers', )
10
11 1
LOG = logging.getLogger(__name__)
12
13
14 1
class KytosEventBuffer:
15
    """KytosEventBuffer represents a queue to store a set of KytosEvents."""
16
17 1
    def __init__(self, name, event_base_class=None, loop=None, maxsize=0,
18
                 queue_cls=Queue):
19
        """Contructor of KytosEventBuffer receive the parameters below.
20
21
        Args:
22
            name (string): name of KytosEventBuffer.
23
            event_base_class (class): Class of KytosEvent.
24
            loop (class): asyncio event loop
25
            maxsize (int): maxsize _queue producer buffer
26
            queue_cls (class): queue class from janus
27
        """
28 1
        self.name = name
29 1
        self._event_base_class = event_base_class
30 1
        self._loop = loop
31 1
        self._queue = queue_cls(maxsize=maxsize, loop=self._loop)
32 1
        self._reject_new_events = False
33
34 1
    def put(self, event):
35
        """Insert an event in KytosEventBuffer if reject_new_events is False.
36
37
        Reject new events is True when a kytos/core.shutdown message was
38
        received.
39
40
        Args:
41
            event (:class:`~kytos.core.events.KytosEvent`):
42
                KytosEvent sent to queue.
43
        """
44 1
        if not self._reject_new_events:
45 1
            self._queue.sync_q.put(event)
46 1
            LOG.debug('[buffer: %s] Added: %s', self.name, event.name)
47
48 1
        if event.name == "kytos/core.shutdown":
49 1
            LOG.info('[buffer: %s] Stop mode enabled. Rejecting new events.',
50
                     self.name)
51 1
            self._reject_new_events = True
52
53 1
    async def aput(self, event):
54
        """Insert a event in KytosEventBuffer if reject new events is False.
55
56
        Reject new events is True when a kytos/core.shutdown message was
57
        received.
58
59
        Args:
60
            event (:class:`~kytos.core.events.KytosEvent`):
61
                KytosEvent sent to queue.
62
        """
63 1
        if not self._reject_new_events:
64 1
            await self._queue.async_q.put(event)
65 1
            LOG.debug('[buffer: %s] Added: %s', self.name, event.name)
66
67 1
        if event.name == "kytos/core.shutdown":
68 1
            LOG.info('[buffer: %s] Stop mode enabled. Rejecting new events.',
69
                     self.name)
70 1
            self._reject_new_events = True
71
72 1
    def get(self):
73
        """Remove and return a event from top of queue.
74
75
        Returns:
76
            :class:`~kytos.core.events.KytosEvent`:
77
                Event removed from top of queue.
78
79
        """
80 1
        event = self._queue.sync_q.get()
81 1
        LOG.debug('[buffer: %s] Removed: %s', self.name, event.name)
82
83 1
        return event
84
85 1
    async def aget(self):
86
        """Remove and return a event from top of queue.
87
88
        Returns:
89
            :class:`~kytos.core.events.KytosEvent`:
90
                Event removed from top of queue.
91
92
        """
93 1
        event = await self._queue.async_q.get()
94 1
        LOG.debug('[buffer: %s] Removed: %s', self.name, event.name)
95
96 1
        return event
97
98 1
    def task_done(self):
99
        """Indicate that a formerly enqueued task is complete.
100
101
        If a :func:`~kytos.core.buffers.KytosEventBuffer.join` is currently
102
        blocking, it will resume if all itens in KytosEventBuffer have been
103
        processed (meaning that a task_done() call was received for every item
104
        that had been put() into the KytosEventBuffer).
105
        """
106 1
        self._queue.sync_q.task_done()
107
108 1
    def join(self):
109
        """Block until all events are gotten and processed.
110
111
        A item is processed when the method task_done is called.
112
        """
113 1
        self._queue.sync_q.join()
114
115 1
    def qsize(self):
116
        """Return the size of KytosEventBuffer."""
117 1
        return self._queue.sync_q.qsize()
118
119 1
    def empty(self):
120
        """Return True if KytosEventBuffer is empty."""
121 1
        return self._queue.sync_q.empty()
122
123 1
    def full(self):
124
        """Return True if KytosEventBuffer is full of KytosEvent."""
125 1
        return self._queue.sync_q.full()
126
127
128 1
class KytosBuffers:
129
    """Set of KytosEventBuffer used in Kytos."""
130
131 1
    def __init__(self, loop=None):
132
        """Build four KytosEventBuffers.
133
134
        :attr:`conn`: :class:`~kytos.core.buffers.KytosEventBuffer` with events
135
        received from connection events.
136
137
        :attr:`raw`: :class:`~kytos.core.buffers.KytosEventBuffer` with events
138
        received from network.
139
140
        :attr:`msg_in`: :class:`~kytos.core.buffers.KytosEventBuffer` with
141
        events to be received.
142
143
        :attr:`msg_out`: :class:`~kytos.core.buffers.KytosEventBuffer` with
144
        events to be sent.
145
146
        :attr:`app`: :class:`~kytos.core.buffers.KytosEventBuffer` with events
147
        sent to NApps.
148
        """
149 1
        self._pool_max_workers = get_thread_pool_max_workers()
150 1
        self._loop = loop
151 1
        self.conn = KytosEventBuffer("conn", loop=self._loop)
152 1
        self.raw = KytosEventBuffer("raw", loop=self._loop,
153
                                    maxsize=self._get_maxsize("sb"))
154 1
        self.msg_in = KytosEventBuffer("msg_in", loop=self._loop,
155
                                       maxsize=self._get_maxsize("sb"),
156
                                       queue_cls=PriorityQueue)
157 1
        self.msg_out = KytosEventBuffer("msg_out", loop=self._loop,
158
                                        maxsize=self._get_maxsize("sb"),
159
                                        queue_cls=PriorityQueue)
160 1
        self.app = KytosEventBuffer("app", loop=self._loop,
161
                                    maxsize=self._get_maxsize("app"))
162
163 1
    def get_all_buffers(self):
164
        """Get all KytosEventBuffer instances."""
165 1
        return [
166
            event_buffer for event_buffer in self.__dict__.values()
167
            if isinstance(event_buffer, KytosEventBuffer)
168
        ]
169
170 1
    def _get_maxsize(self, queue_name):
171
        """Get queue maxsize if it's been set."""
172 1
        return self._pool_max_workers.get(queue_name, 0)
173
174 1
    def send_stop_signal(self):
175
        """Send a ``kytos/core.shutdown`` event to each buffer."""
176 1
        LOG.info('Stop signal received by Kytos buffers.')
177 1
        LOG.info('Sending KytosShutdownEvent to all apps.')
178 1
        event = KytosEvent(name='kytos/core.shutdown')
179 1
        for buffer in self.get_all_buffers():
180
            buffer.put(event)
181