Passed
Pull Request — master (#429)
by Vinicius
04:55
created

KytosEventBuffer.qsize()   A

Complexity

Conditions 1

Size

Total Lines 3
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 2
nop 1
dl 0
loc 3
rs 10
c 0
b 0
f 0
1
"""Kytos Buffer Classes, based on Python Queue."""
2
import logging
3
from typing import Optional
4
5
from janus import Queue
6
7
LOG = logging.getLogger(__name__)
8
9
10
class KytosEventBuffer:
11
    """KytosEventBuffer represents a queue to store a set of KytosEvents."""
12
13
    def __init__(self, name, queue: Queue = None):
14
        """Contructor of KytosEventBuffer receive the parameters below.
15
16
        Args:
17
            name (string): name of KytosEventBuffer.
18
            event_base_class (class): Class of KytosEvent.
19
            maxsize (int): maxsize _queue producer buffer
20
            queue_cls (class): queue class from janus
21
        """
22
        self.name = name
23
        self._queue = queue if queue is not None else Queue()
24
        self._reject_new_events = False
25
26
    def put(self, event, timeout: Optional[float] = None):
27
        """Insert an event in KytosEventBuffer if reject_new_events is False.
28
29
        Reject new events is True when a kytos/core.shutdown message was
30
        received.
31
32
        Args:
33
            event (:class:`~kytos.core.events.KytosEvent`):
34
                KytosEvent sent to queue.
35
            timeout: Block if necessary until a free slot is available.
36
            If 'timeout' is a non-negative number, it blocks at most 'timeout'
37
            seconds and raises an Full exception if no free slot was available.
38
        """
39
        if not self._reject_new_events:
40
            self._queue.sync_q.put(event, timeout=timeout)
41
            LOG.debug('[buffer: %s] Added: %s', self.name, event.name)
42
43
        if event.name == "kytos/core.shutdown":
44
            LOG.info('[buffer: %s] Stop mode enabled. Rejecting new events.',
45
                     self.name)
46
            self._reject_new_events = True
47
48
    async def aput(self, event):
49
        """Insert a event in KytosEventBuffer if reject new events is False.
50
51
        Reject new events is True when a kytos/core.shutdown message was
52
        received.
53
54
        Args:
55
            event (:class:`~kytos.core.events.KytosEvent`):
56
                KytosEvent sent to queue.
57
        """
58
        if not self._reject_new_events:
59
            await self._queue.async_q.put(event)
60
            LOG.debug('[buffer: %s] Added: %s', self.name, event.name)
61
62
        if event.name == "kytos/core.shutdown":
63
            LOG.info('[buffer: %s] Stop mode enabled. Rejecting new events.',
64
                     self.name)
65
            self._reject_new_events = True
66
67
    def get(self):
68
        """Remove and return a event from top of queue.
69
70
        Returns:
71
            :class:`~kytos.core.events.KytosEvent`:
72
                Event removed from top of queue.
73
74
        """
75
        event = self._queue.sync_q.get()
76
        LOG.debug('[buffer: %s] Removed: %s', self.name, event.name)
77
78
        return event
79
80
    async def aget(self):
81
        """Remove and return a event from top of queue.
82
83
        Returns:
84
            :class:`~kytos.core.events.KytosEvent`:
85
                Event removed from top of queue.
86
87
        """
88
        event = await self._queue.async_q.get()
89
        LOG.debug('[buffer: %s] Removed: %s', self.name, event.name)
90
91
        return event
92
93
    def task_done(self):
94
        """Indicate that a formerly enqueued task is complete.
95
96
        If a :func:`~kytos.core.buffers.KytosEventBuffer.join` is currently
97
        blocking, it will resume if all itens in KytosEventBuffer have been
98
        processed (meaning that a task_done() call was received for every item
99
        that had been put() into the KytosEventBuffer).
100
        """
101
        self._queue.sync_q.task_done()
102
103
    def join(self):
104
        """Block until all events are gotten and processed.
105
106
        A item is processed when the method task_done is called.
107
        """
108
        self._queue.sync_q.join()
109
110
    def qsize(self):
111
        """Return the size of KytosEventBuffer."""
112
        return self._queue.sync_q.qsize()
113
114
    def empty(self):
115
        """Return True if KytosEventBuffer is empty."""
116
        return self._queue.sync_q.empty()
117
118
    def full(self):
119
        """Return True if KytosEventBuffer is full of KytosEvent."""
120
        return self._queue.sync_q.full()
121