kytos.core.buffers   A
last analyzed

Complexity

Total Complexity 16

Size/Duplication

Total Lines 164
Duplicated Lines 0 %

Test Coverage

Coverage 100%

Importance

Changes 0
Metric Value
eloc 62
dl 0
loc 164
rs 10
c 0
b 0
f 0
ccs 59
cts 59
cp 1
wmc 16

12 Methods

Rating   Name   Duplication   Size   Complexity  
A KytosBuffers.__init__() 0 20 1
A KytosEventBuffer.__init__() 0 12 1
A KytosEventBuffer.task_done() 0 9 1
A KytosBuffers.send_stop_signal() 0 9 1
A KytosEventBuffer.put() 0 18 3
A KytosEventBuffer.qsize() 0 3 1
A KytosEventBuffer.full() 0 3 1
A KytosEventBuffer.get() 0 13 1
A KytosEventBuffer.join() 0 6 1
A KytosEventBuffer.empty() 0 3 1
A KytosEventBuffer.aget() 0 13 1
A KytosEventBuffer.aput() 0 23 3
1
"""Kytos Buffer Classes, based on Python Queue."""
2 1
import logging
3
4
# from queue import Queue
5 1
from janus import Queue
6
7 1
from kytos.core.events import KytosEvent
8
9 1
__all__ = ('KytosBuffers', )
10
11 1
LOG = logging.getLogger(__name__)
12
13
14 1
class KytosEventBuffer:
15
    """KytosEventBuffer represents a queue to store a set of KytosEvents."""
16
17 1
    def __init__(self, name, event_base_class=None, loop=None):
18
        """Contructor of KytosEventBuffer receive the parameters below.
19
20
        Args:
21
            name (string): name of KytosEventBuffer.
22
            event_base_class (class): Class of KytosEvent.
23
        """
24 1
        self.name = name
25 1
        self._event_base_class = event_base_class
26 1
        self._loop = loop
27 1
        self._queue = Queue(loop=self._loop)
28 1
        self._reject_new_events = False
29
30 1
    def put(self, event):
31
        """Insert an event in KytosEventBuffer if reject_new_events is False.
32
33
        Reject new events is True when a kytos/core.shutdown message was
34
        received.
35
36
        Args:
37
            event (:class:`~kytos.core.events.KytosEvent`):
38
                KytosEvent sent to queue.
39
        """
40 1
        if not self._reject_new_events:
41 1
            self._queue.sync_q.put(event)
42 1
            LOG.debug('[buffer: %s] Added: %s', self.name, event.name)
43
44 1
        if event.name == "kytos/core.shutdown":
45 1
            LOG.info('[buffer: %s] Stop mode enabled. Rejecting new events.',
46
                     self.name)
47 1
            self._reject_new_events = True
48
49 1
    async def aput(self, event):
50
        """Insert a event in KytosEventBuffer if reject new events is False.
51
52
        Reject new events is True when a kytos/core.shutdown message was
53
        received.
54
55
        Args:
56
            event (:class:`~kytos.core.events.KytosEvent`):
57
                KytosEvent sent to queue.
58
        """
59
        # qsize = self._queue.async_q.qsize()
60
        # print('qsize before:', qsize)
61 1
        if not self._reject_new_events:
62 1
            await self._queue.async_q.put(event)
63 1
            LOG.debug('[buffer: %s] Added: %s', self.name, event.name)
64
65
        # qsize = self._queue.async_q.qsize()
66
        # print('qsize after:', qsize)
67
68 1
        if event.name == "kytos/core.shutdown":
69 1
            LOG.info('[buffer: %s] Stop mode enabled. Rejecting new events.',
70
                     self.name)
71 1
            self._reject_new_events = True
72
73 1
    def get(self):
74
        """Remove and return a event from top of queue.
75
76
        Returns:
77
            :class:`~kytos.core.events.KytosEvent`:
78
                Event removed from top of queue.
79
80
        """
81 1
        event = self._queue.sync_q.get()
82
83 1
        LOG.debug('[buffer: %s] Removed: %s', self.name, event.name)
84
85 1
        return event
86
87 1
    async def aget(self):
88
        """Remove and return a event from top of queue.
89
90
        Returns:
91
            :class:`~kytos.core.events.KytosEvent`:
92
                Event removed from top of queue.
93
94
        """
95 1
        event = await self._queue.async_q.get()
96
97 1
        LOG.debug('[buffer: %s] Removed: %s', self.name, event.name)
98
99 1
        return event
100
101 1
    def task_done(self):
102
        """Indicate that a formerly enqueued task is complete.
103
104
        If a :func:`~kytos.core.buffers.KytosEventBuffer.join` is currently
105
        blocking, it will resume if all itens in KytosEventBuffer have been
106
        processed (meaning that a task_done() call was received for every item
107
        that had been put() into the KytosEventBuffer).
108
        """
109 1
        self._queue.sync_q.task_done()
110
111 1
    def join(self):
112
        """Block until all events are gotten and processed.
113
114
        A item is processed when the method task_done is called.
115
        """
116 1
        self._queue.sync_q.join()
117
118 1
    def qsize(self):
119
        """Return the size of KytosEventBuffer."""
120 1
        return self._queue.sync_q.qsize()
121
122 1
    def empty(self):
123
        """Return True if KytosEventBuffer is empty."""
124 1
        return self._queue.sync_q.empty()
125
126 1
    def full(self):
127
        """Return True if KytosEventBuffer is full of KytosEvent."""
128 1
        return self._queue.sync_q.full()
129
130
131 1
class KytosBuffers:
132
    """Set of KytosEventBuffer used in Kytos."""
133
134 1
    def __init__(self, loop=None):
135
        """Build four KytosEventBuffers.
136
137
        :attr:`raw`: :class:`~kytos.core.buffers.KytosEventBuffer` with events
138
        received from network.
139
140
        :attr:`msg_in`: :class:`~kytos.core.buffers.KytosEventBuffer` with
141
        events to be received.
142
143
        :attr:`msg_out`: :class:`~kytos.core.buffers.KytosEventBuffer` with
144
        events to be sent.
145
146
        :attr:`app`: :class:`~kytos.core.buffers.KytosEventBuffer` with events
147
        sent to NApps.
148
        """
149 1
        self._loop = loop
150 1
        self.raw = KytosEventBuffer('raw_event', loop=self._loop)
151 1
        self.msg_in = KytosEventBuffer('msg_in_event', loop=self._loop)
152 1
        self.msg_out = KytosEventBuffer('msg_out_event', loop=self._loop)
153 1
        self.app = KytosEventBuffer('app_event', loop=self._loop)
154
155 1
    def send_stop_signal(self):
156
        """Send a ``kytos/core.shutdown`` event to each buffer."""
157 1
        LOG.info('Stop signal received by Kytos buffers.')
158 1
        LOG.info('Sending KytosShutdownEvent to all apps.')
159 1
        event = KytosEvent(name='kytos/core.shutdown')
160 1
        self.raw.put(event)
161 1
        self.msg_in.put(event)
162 1
        self.msg_out.put(event)
163
        self.app.put(event)
164