Passed
Pull Request — master (#235)
by Vinicius
05:20
created

kytos.core.buffers.KytosEventBuffer.get()   A

Complexity

Conditions 2

Size

Total Lines 14
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 4
CRAP Score 2.032

Importance

Changes 0
Metric Value
cc 2
eloc 5
nop 1
dl 0
loc 14
rs 10
c 0
b 0
f 0
ccs 4
cts 5
cp 0.8
crap 2.032
1
"""Kytos Buffer Classes, based on Python Queue."""
2 1
import logging
3
4 1
from janus import PriorityQueue, Queue
5
6 1
from kytos.core.events import KytosEvent
7 1
from kytos.core.helpers import get_thread_pool_max_workers
8
9 1
__all__ = ('KytosBuffers', )
10
11 1
LOG = logging.getLogger(__name__)
12
13
14 1
class KytosEventBuffer:
15
    """KytosEventBuffer represents a queue to store a set of KytosEvents."""
16
17 1
    def __init__(self, name, event_base_class=None, loop=None, maxsize=0,
18
                 queue_cls=Queue):
19
        """Contructor of KytosEventBuffer receive the parameters below.
20
21
        Args:
22
            name (string): name of KytosEventBuffer.
23
            event_base_class (class): Class of KytosEvent.
24
            loop (class): asyncio event loop
25
            maxsize (int): maxsize _queue producer buffer
26
            queue_cls (class): queue class from janus
27
        """
28 1
        self.name = name
29 1
        self._event_base_class = event_base_class
30 1
        self._loop = loop
31 1
        self._queue = queue_cls(maxsize=maxsize, loop=self._loop)
32 1
        self._reject_new_events = False
33
34 1
    def put(self, event):
35
        """Insert an event in KytosEventBuffer if reject_new_events is False.
36
37
        Reject new events is True when a kytos/core.shutdown message was
38
        received.
39
40
        Args:
41
            event (:class:`~kytos.core.events.KytosEvent`):
42
                KytosEvent sent to queue.
43
        """
44 1
        if not self._reject_new_events:
45 1
            self._queue.sync_q.put(event)
46 1
            if logging.DEBUG >= LOG.getEffectiveLevel():
47
                LOG.debug('[buffer: %s] Added: %s', self.name, event.name)
48
49 1
        if event.name == "kytos/core.shutdown":
50 1
            LOG.info('[buffer: %s] Stop mode enabled. Rejecting new events.',
51
                     self.name)
52 1
            self._reject_new_events = True
53
54 1
    async def aput(self, event):
55
        """Insert a event in KytosEventBuffer if reject new events is False.
56
57
        Reject new events is True when a kytos/core.shutdown message was
58
        received.
59
60
        Args:
61
            event (:class:`~kytos.core.events.KytosEvent`):
62
                KytosEvent sent to queue.
63
        """
64 1
        if not self._reject_new_events:
65 1
            await self._queue.async_q.put(event)
66 1
            if logging.DEBUG >= LOG.getEffectiveLevel():
67
                LOG.debug('[buffer: %s] Added: %s', self.name, event.name)
68
69 1
        if event.name == "kytos/core.shutdown":
70 1
            LOG.info('[buffer: %s] Stop mode enabled. Rejecting new events.',
71
                     self.name)
72 1
            self._reject_new_events = True
73
74 1
    def get(self):
75
        """Remove and return a event from top of queue.
76
77
        Returns:
78
            :class:`~kytos.core.events.KytosEvent`:
79
                Event removed from top of queue.
80
81
        """
82 1
        event = self._queue.sync_q.get()
83
84 1
        if logging.DEBUG >= LOG.getEffectiveLevel():
85
            LOG.debug('[buffer: %s] Removed: %s', self.name, event.name)
86
87 1
        return event
88
89 1
    async def aget(self):
90
        """Remove and return a event from top of queue.
91
92
        Returns:
93
            :class:`~kytos.core.events.KytosEvent`:
94
                Event removed from top of queue.
95
96
        """
97 1
        event = await self._queue.async_q.get()
98
99 1
        if logging.DEBUG >= LOG.getEffectiveLevel():
100
            LOG.debug('[buffer: %s] Removed: %s', self.name, event.name)
101
102 1
        return event
103
104 1
    def task_done(self):
105
        """Indicate that a formerly enqueued task is complete.
106
107
        If a :func:`~kytos.core.buffers.KytosEventBuffer.join` is currently
108
        blocking, it will resume if all itens in KytosEventBuffer have been
109
        processed (meaning that a task_done() call was received for every item
110
        that had been put() into the KytosEventBuffer).
111
        """
112 1
        self._queue.sync_q.task_done()
113
114 1
    def join(self):
115
        """Block until all events are gotten and processed.
116
117
        A item is processed when the method task_done is called.
118
        """
119 1
        self._queue.sync_q.join()
120
121 1
    def qsize(self):
122
        """Return the size of KytosEventBuffer."""
123 1
        return self._queue.sync_q.qsize()
124
125 1
    def empty(self):
126
        """Return True if KytosEventBuffer is empty."""
127 1
        return self._queue.sync_q.empty()
128
129 1
    def full(self):
130
        """Return True if KytosEventBuffer is full of KytosEvent."""
131 1
        return self._queue.sync_q.full()
132
133
134 1
class KytosBuffers:
135
    """Set of KytosEventBuffer used in Kytos."""
136
137 1
    def __init__(self, loop=None):
138
        """Build four KytosEventBuffers.
139
140
        :attr:`conn`: :class:`~kytos.core.buffers.KytosEventBuffer` with events
141
        received from connection events.
142
143
        :attr:`raw`: :class:`~kytos.core.buffers.KytosEventBuffer` with events
144
        received from network.
145
146
        :attr:`msg_in`: :class:`~kytos.core.buffers.KytosEventBuffer` with
147
        events to be received.
148
149
        :attr:`msg_out`: :class:`~kytos.core.buffers.KytosEventBuffer` with
150
        events to be sent.
151
152
        :attr:`app`: :class:`~kytos.core.buffers.KytosEventBuffer` with events
153
        sent to NApps.
154
        """
155 1
        self._pool_max_workers = get_thread_pool_max_workers()
156 1
        self._loop = loop
157 1
        self.conn = KytosEventBuffer("conn", loop=self._loop)
158 1
        self.raw = KytosEventBuffer("raw", loop=self._loop,
159
                                    maxsize=self._get_maxsize("sb"))
160 1
        self.msg_in = KytosEventBuffer("msg_in", loop=self._loop,
161
                                       maxsize=self._get_maxsize("sb"),
162
                                       queue_cls=PriorityQueue)
163 1
        self.msg_out = KytosEventBuffer("msg_out", loop=self._loop,
164
                                        maxsize=self._get_maxsize("sb"),
165
                                        queue_cls=PriorityQueue)
166 1
        self.app = KytosEventBuffer("app", loop=self._loop,
167
                                    maxsize=self._get_maxsize("app"))
168
169 1
    def get_all_buffers(self):
170
        """Get all KytosEventBuffer instances."""
171 1
        return [
172
            event_buffer for event_buffer in self.__dict__.values()
173
            if isinstance(event_buffer, KytosEventBuffer)
174
        ]
175
176 1
    def _get_maxsize(self, queue_name):
177
        """Get queue maxsize if it's been set."""
178 1
        return self._pool_max_workers.get(queue_name, 0)
179
180 1
    def send_stop_signal(self):
181
        """Send a ``kytos/core.shutdown`` event to each buffer."""
182 1
        LOG.info('Stop signal received by Kytos buffers.')
183 1
        LOG.info('Sending KytosShutdownEvent to all apps.')
184 1
        event = KytosEvent(name='kytos/core.shutdown')
185 1
        for buffer in self.get_all_buffers():
186
            buffer.put(event)
187