Passed
Pull Request — master (#429)
by Vinicius
04:55
created

kytos.core.buffers.manager.KytosBuffers.__init__()   A

Complexity

Conditions 2

Size

Total Lines 48
Code Lines 19

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 2
eloc 19
nop 1
dl 0
loc 48
rs 9.45
c 0
b 0
f 0
1
"""Kytos Buffer Classes, based on Python Queue."""
2
import logging
3
4
from janus import PriorityQueue, Queue
5
6
from kytos.core.config import KytosConfig
7
from kytos.core.events import KytosEvent
8
from kytos.core.helpers import get_thread_pool_max_workers
9
10
from .buffers import KytosEventBuffer
11
from .factory import buffer_from_config
12
13
LOG = logging.getLogger(__name__)
14
15
16
class KytosBuffers:
17
    """Set of KytosEventBuffer used in Kytos."""
18
19
    def __init__(self):
20
        """Build four KytosEventBuffers.
21
22
        :attr:`conn`: :class:`~kytos.core.buffers.KytosEventBuffer` with events
23
        received from connection events.
24
25
        :attr:`raw`: :class:`~kytos.core.buffers.KytosEventBuffer` with events
26
        received from network.
27
28
        :attr:`msg_in`: :class:`~kytos.core.buffers.KytosEventBuffer` with
29
        events to be received.
30
31
        :attr:`msg_out`: :class:`~kytos.core.buffers.KytosEventBuffer` with
32
        events to be sent.
33
34
        :attr:`app`: :class:`~kytos.core.buffers.KytosEventBuffer` with events
35
        sent to NApps.
36
37
        :attr:`meta`: :class:`~kytos.core.buffers.KytosEventBuffer` with
38
        core related events sent to NApps. This is meant for general core
39
        control events.
40
        """
41
42
        self._pool_max_workers = get_thread_pool_max_workers()
43
44
        self.conn = KytosEventBuffer("conn")
45
        self.raw = KytosEventBuffer(
46
            "raw",
47
            queue=Queue(maxsize=self._get_maxsize("sb"))
48
        )
49
        self.msg_in = KytosEventBuffer(
50
            "msg_in",
51
            queue=PriorityQueue(maxsize=self._get_maxsize("sb")),
52
        )
53
        self.msg_out = KytosEventBuffer(
54
            "msg_out",
55
            queue=PriorityQueue(maxsize=self._get_maxsize("sb")),
56
        )
57
        self.app = KytosEventBuffer(
58
            "app",
59
            queue=Queue(maxsize=self._get_maxsize("app")),
60
        )
61
        self.meta = KytosEventBuffer("meta")
62
63
        buffer_conf = KytosConfig().options['daemon'].event_buffer_conf
64
65
        for name, config in buffer_conf.items():
66
            setattr(self, name, buffer_from_config(name, config))
67
68
    def get_all_buffers(self):
69
        """Get all KytosEventBuffer instances."""
70
        return [
71
            event_buffer for event_buffer in self.__dict__.values()
72
            if isinstance(event_buffer, KytosEventBuffer)
73
        ]
74
75
    def _get_maxsize(self, queue_name):
76
        """Get queue maxsize if it's been set."""
77
        return self._pool_max_workers.get(queue_name, 0)
78
79
    def send_stop_signal(self):
80
        """Send a ``kytos/core.shutdown`` event to each buffer."""
81
        LOG.info('Stop signal received by Kytos buffers.')
82
        LOG.info('Sending KytosShutdownEvent to all apps.')
83
        event = KytosEvent(name='kytos/core.shutdown')
84
        for buffer in self.get_all_buffers():
85
            buffer.put(event)
86