1
|
|
|
"""Test kytos.core.buffers module.""" |
2
|
|
|
import asyncio |
3
|
|
|
from unittest import TestCase |
4
|
|
|
from unittest.mock import MagicMock, patch |
5
|
|
|
|
6
|
|
|
from kytos.core.buffers import KytosBuffers, KytosEventBuffer |
7
|
|
|
|
8
|
|
|
|
9
|
|
|
# pylint: disable=protected-access |
10
|
|
|
class TestKytosEventBuffer(TestCase): |
11
|
|
|
"""KytosEventBuffer tests.""" |
12
|
|
|
|
13
|
|
|
def setUp(self): |
14
|
|
|
"""Instantiate a KytosEventBuffer.""" |
15
|
|
|
self.loop = asyncio.new_event_loop() |
16
|
|
|
asyncio.set_event_loop(self.loop) |
17
|
|
|
|
18
|
|
|
self.kytos_event_buffer = KytosEventBuffer('name', loop=self.loop) |
19
|
|
|
|
20
|
|
|
@staticmethod |
21
|
|
|
def create_event_mock(name='any'): |
22
|
|
|
"""Create a new event mock.""" |
23
|
|
|
event = MagicMock() |
24
|
|
|
event.name = name |
25
|
|
|
return event |
26
|
|
|
|
27
|
|
|
def test_put_get(self): |
28
|
|
|
"""Test put and get methods.""" |
29
|
|
|
event = self.create_event_mock() |
30
|
|
|
|
31
|
|
|
self.kytos_event_buffer.put(event) |
32
|
|
|
queue_event = self.kytos_event_buffer.get() |
33
|
|
|
|
34
|
|
|
self.assertEqual(queue_event, event) |
35
|
|
|
|
36
|
|
|
def test_put__shutdown(self): |
37
|
|
|
"""Test put method to shutdown event.""" |
38
|
|
|
event = self.create_event_mock('kytos/core.shutdown') |
39
|
|
|
self.kytos_event_buffer.put(event) |
40
|
|
|
|
41
|
|
|
self.assertTrue(self.kytos_event_buffer._reject_new_events) |
42
|
|
|
|
43
|
|
|
def test_aput(self): |
44
|
|
|
"""Test aput async method.""" |
45
|
|
|
event = MagicMock() |
46
|
|
|
event.name = 'kytos/core.shutdown' |
47
|
|
|
|
48
|
|
|
self.loop.run_until_complete(self.kytos_event_buffer.aput(event)) |
49
|
|
|
|
50
|
|
|
self.assertTrue(self.kytos_event_buffer._reject_new_events) |
51
|
|
|
|
52
|
|
|
def test_aget(self): |
53
|
|
|
"""Test aget async method.""" |
54
|
|
|
event = self.create_event_mock() |
55
|
|
|
self.kytos_event_buffer._queue.sync_q.put(event) |
56
|
|
|
|
57
|
|
|
expected = self.loop.run_until_complete(self.kytos_event_buffer.aget()) |
58
|
|
|
|
59
|
|
|
self.assertEqual(event, expected) |
60
|
|
|
|
61
|
|
|
@patch('janus._SyncQueueProxy.task_done') |
62
|
|
|
def test_task_done(self, mock_task_done): |
63
|
|
|
"""Test task_done method.""" |
64
|
|
|
self.kytos_event_buffer.task_done() |
65
|
|
|
|
66
|
|
|
mock_task_done.assert_called() |
67
|
|
|
|
68
|
|
|
@patch('janus._SyncQueueProxy.join') |
69
|
|
|
def test_join(self, mock_join): |
70
|
|
|
"""Test join method.""" |
71
|
|
|
self.kytos_event_buffer.join() |
72
|
|
|
|
73
|
|
|
mock_join.assert_called() |
74
|
|
|
|
75
|
|
|
def test_qsize(self): |
76
|
|
|
"""Test qsize method to empty and with one event in query.""" |
77
|
|
|
qsize_1 = self.kytos_event_buffer.qsize() |
78
|
|
|
|
79
|
|
|
event = self.create_event_mock() |
80
|
|
|
self.kytos_event_buffer._queue.sync_q.put(event) |
81
|
|
|
|
82
|
|
|
qsize_2 = self.kytos_event_buffer.qsize() |
83
|
|
|
|
84
|
|
|
self.assertEqual(qsize_1, 0) |
85
|
|
|
self.assertEqual(qsize_2, 1) |
86
|
|
|
|
87
|
|
|
def test_empty(self): |
88
|
|
|
"""Test empty method to empty and with one event in query.""" |
89
|
|
|
empty_1 = self.kytos_event_buffer.empty() |
90
|
|
|
|
91
|
|
|
event = self.create_event_mock() |
92
|
|
|
self.kytos_event_buffer._queue.sync_q.put(event) |
93
|
|
|
|
94
|
|
|
empty_2 = self.kytos_event_buffer.empty() |
95
|
|
|
|
96
|
|
|
self.assertTrue(empty_1) |
97
|
|
|
self.assertFalse(empty_2) |
98
|
|
|
|
99
|
|
|
@patch('janus._SyncQueueProxy.full') |
100
|
|
|
def test_full(self, mock_full): |
101
|
|
|
"""Test full method to full and not full query.""" |
102
|
|
|
mock_full.side_effect = [False, True] |
103
|
|
|
|
104
|
|
|
full_1 = self.kytos_event_buffer.full() |
105
|
|
|
full_2 = self.kytos_event_buffer.full() |
106
|
|
|
|
107
|
|
|
self.assertFalse(full_1) |
108
|
|
|
self.assertTrue(full_2) |
109
|
|
|
|
110
|
|
|
|
111
|
|
|
class TestKytosBuffers(TestCase): |
112
|
|
|
"""KytosBuffers tests.""" |
113
|
|
|
|
114
|
|
|
def setUp(self): |
115
|
|
|
"""Instantiate a KytosBuffers.""" |
116
|
|
|
self.loop = asyncio.new_event_loop() |
117
|
|
|
asyncio.set_event_loop(None) |
118
|
|
|
|
119
|
|
|
self.kytos_buffers = KytosBuffers(loop=self.loop) |
120
|
|
|
|
121
|
|
|
def test_send_stop_signal(self): |
122
|
|
|
"""Test send_stop_signal method.""" |
123
|
|
|
self.kytos_buffers.send_stop_signal() |
124
|
|
|
|
125
|
|
|
self.assertTrue(self.kytos_buffers.raw._reject_new_events) |
126
|
|
|
self.assertTrue(self.kytos_buffers.msg_in._reject_new_events) |
127
|
|
|
self.assertTrue(self.kytos_buffers.msg_out._reject_new_events) |
128
|
|
|
self.assertTrue(self.kytos_buffers.app._reject_new_events) |
129
|
|
|
|