Passed
Pull Request — master (#114)
by Aldo
04:12
created

TestTraceManagerTheadTest.setup_method()   A

Complexity

Conditions 1

Size

Total Lines 7
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 5
CRAP Score 1

Importance

Changes 0
Metric Value
cc 1
eloc 5
nop 1
dl 0
loc 7
ccs 5
cts 5
cp 1
crap 1
rs 10
c 0
b 0
f 0
1
"""
2
    Test tracing.trace_manager
3
"""
4
5 1
import asyncio
6 1
import threading
7 1
from unittest.mock import AsyncMock, MagicMock, patch
8
9 1
import pytest
10 1
from janus import Queue
11 1
from kytos.lib.helpers import (
12
    get_controller_mock,
13
    get_interface_mock,
14
    get_link_mock,
15
    get_switch_mock,
16
)
17 1
from pyof.foundation.network_types import Ethernet
18 1
from napps.amlight.sdntrace import settings
19 1
from napps.amlight.sdntrace.shared.switches import Switches
20 1
from napps.amlight.sdntrace.tracing.trace_entries import TraceEntries
21 1
from napps.amlight.sdntrace.tracing.trace_manager import TraceManager
22
23
24
# pylint: disable=protected-access
25 1
class TestTraceManager:
26
    """Unit tests for tracing.trace_manager.TraceManager"""
27
28 1
    @pytest.fixture(autouse=True)
29 1
    def commomn_patches(self, request):
30
        """This function handles setup and cleanup for patches"""
31
        # This fixture sets up the common patches,
32
        # and a finalizer is added using addfinalizer to stop
33
        # the common patches after each test. This ensures that the cleanup
34
        # is performed after each test, and additional patch decorators
35
        # can be used within individual test functions.
36
37 1
        patcher = patch("kytos.core.helpers.run_on_thread", lambda x: x)
38 1
        mock_patch = patcher.start()
39
40 1
        _ = request.function.__name__
41
42 1
        def cleanup():
43 1
            patcher.stop()
44
45 1
        request.addfinalizer(cleanup)
46 1
        return mock_patch
47
48 1
    def setup_method(self):
49
        """Set up before each test method"""
50 1
        self.create_basic_switches(get_controller_mock())
51 1
        TraceManager.run_traces = MagicMock()
52 1
        self.trace_manager = TraceManager(controller=get_controller_mock())
53 1
        self.trace_manager._request_queue = AsyncMock()
54
55 1 View Code Duplication
    @classmethod
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
56 1
    def create_basic_switches(cls, controller):
57
        """Create basic mock switches for Kytos controller."""
58 1
        dpid_a = "00:00:00:00:00:00:00:01"
59 1
        dpid_b = "00:00:00:00:00:00:00:02"
60
61 1
        mock_switch_a = get_switch_mock(dpid_a, 0x04)
62 1
        mock_switch_b = get_switch_mock(dpid_b, 0x04)
63 1
        mock_interface_a = get_interface_mock("s1-eth1", 1, mock_switch_a)
64 1
        mock_interface_b = get_interface_mock("s2-eth1", 1, mock_switch_b)
65
66 1
        mock_link = get_link_mock(mock_interface_a, mock_interface_b)
67 1
        mock_link.id = "cf0f4071be4"
68 1
        mock_switch_a.id = dpid_a
69 1
        mock_switch_a.as_dict.return_value = {"metadata": {}}
70 1
        mock_switch_b.id = dpid_b
71 1
        mock_switch_b.as_dict.return_value = {"metadata": {}}
72
73 1
        controller.switches = {dpid_a: mock_switch_a, dpid_b: mock_switch_b}
74
75 1
        Switches(MagicMock())._switches = controller.switches
76
77 1
    async def test_is_entry_invalid(self):
78
        """Test if the entry request does not have a valid switch."""
79 1
        eth = {"dl_vlan": 100}
80 1
        dpid = {"dpid": "a", "in_port": 1}
81 1
        switch = {"switch": dpid, "eth": eth}
82 1
        entries = {"trace": switch}
83 1
        entry = await self.trace_manager.is_entry_valid(entries)
84
85 1
        assert entry == "Unknown Switch"
86
87 1
    async def test_is_entry_empty_dpid(self):
88
        """Test if the entry request does not have a valid switch."""
89 1
        eth = {"dl_vlan": 100}
90 1
        dpid = {"dpid": "", "in_port": 1}
91 1
        switch = {"switch": dpid, "eth": eth}
92 1
        entries = {"trace": switch}
93 1
        entry = await self.trace_manager.is_entry_valid(entries)
94
95 1
        assert entry == "Error: dpid allows [a-f], int, and :. Lengths: 1-16 and 23"
96
97 1
    async def test_is_entry_missing_dpid(self):
98
        """Test if the entry request with missing dpid."""
99 1
        eth = {"dl_vlan": 100}
100 1
        dpid = {}
101 1
        switch = {"switch": dpid, "eth": eth}
102 1
        entries = {"trace": switch}
103 1
        entry = await self.trace_manager.is_entry_valid(entries)
104
105 1
        assert entry == "Error: dpid not provided"
106
107 1
    @patch("napps.amlight.sdntrace.shared.colors.Colors.aget_switch_color")
108 1
    async def test_is_entry_invalid_not_colored(self, mock_acolors):
109
        """Test if the entry request does not have a valid color."""
110 1
        mock_acolors.return_value = {}
111 1
        eth = {"dl_vlan": 100}
112 1
        dpid = {"dpid": "00:00:00:00:00:00:00:01", "in_port": 1}
113 1
        switch = {"switch": dpid, "eth": eth}
114 1
        entries = {"trace": switch}
115 1
        entry = await self.trace_manager.is_entry_valid(entries)
116 1
        assert entry == "Switch not Colored"
117
118 1
    @patch("napps.amlight.sdntrace.shared.colors.Colors.aget_switch_color")
119 1
    async def test_is_entry_valid(self, mock_acolors):
120
        """Test if the entry request is valid."""
121 1
        mock_acolors.return_value = {
122
            "color_field": "dl_src",
123
            "color_value": "ee:ee:ee:ee:ee:01",
124
        }
125
126 1
        eth = {"dl_vlan": 100}
127 1
        dpid = {"dpid": "00:00:00:00:00:00:00:01", "in_port": 1}
128 1
        switch = {"switch": dpid, "eth": eth}
129 1
        entries = {"trace": switch}
130 1
        entry = await self.trace_manager.is_entry_valid(entries)
131 1
        assert entry.dpid == "00:00:00:00:00:00:00:01"
132
133 1
    @patch("napps.amlight.sdntrace.shared.colors.Colors.aget_switch_color")
134 1
    async def test_new_trace(self, mock_acolors):
135
        """Test trace manager new trace creation."""
136 1
        mock_acolors.return_value = {
137
            "color_field": "dl_src",
138
            "color_value": "ee:ee:ee:ee:ee:01",
139
        }
140
141 1
        eth = {"dl_vlan": 100}
142 1
        dpid = {"dpid": "00:00:00:00:00:00:00:01", "in_port": 1}
143 1
        switch = {"switch": dpid, "eth": eth}
144 1
        entries = {"trace": switch}
145
146 1
        trace_entries = await self.trace_manager.is_entry_valid(entries)
147 1
        assert isinstance(trace_entries, TraceEntries)
148
149 1
        trace_id = await self.trace_manager.new_trace(trace_entries)
150 1
        assert trace_id == 30001
151
152
        # new_trace does not check duplicated request.
153 1
        trace_id = await self.trace_manager.new_trace(trace_entries)
154 1
        assert trace_id == 30002
155
156 1
    def test_get_id(self):
157
        """Test trace manager ID control."""
158 1
        trace_id = self.trace_manager.get_id()
159 1
        assert trace_id == 30001
160
161 1
        trace_id = self.trace_manager.get_id()
162 1
        assert trace_id == 30002
163
164 1
        trace_id = self.trace_manager.get_id()
165 1
        assert trace_id == 30003
166
167 1
    @patch("napps.amlight.sdntrace.shared.colors.Colors.aget_switch_color")
168 1
    @patch("napps.amlight.sdntrace.tracing.tracer.TracePath.send_trace_probe")
169 1
    async def test_trace_pending(self, mock_send_probe, mock_acolors):
170
        """Test trace manager tracing request."""
171 1
        mock_acolors.return_value = {
172
            "color_field": "dl_src",
173
            "color_value": "ee:ee:ee:ee:ee:01",
174
        }
175 1
        mock_send_probe.return_value = {
176
            "dpid": "00:00:00:00:00:00:00:01",
177
            "port": 1,
178
        }, ""
179
180 1
        eth = {"dl_vlan": 100}
181 1
        dpid = {"dpid": "00:00:00:00:00:00:00:01", "in_port": 1}
182 1
        switch = {"switch": dpid, "eth": eth}
183 1
        entries = {"trace": switch}
184
185 1
        trace_entries = await self.trace_manager.is_entry_valid(entries)
186 1
        assert isinstance(trace_entries, TraceEntries)
187
188 1
        trace_id = await self.trace_manager.new_trace(trace_entries)
189 1
        assert trace_id == 30001
190
191 1
        pending = self.trace_manager.number_pending_requests()
192 1
        assert pending == 1
193
194 1
        result = self.trace_manager.get_result(trace_id)
195 1
        assert result == {"msg": "trace pending"}
196
197 1
    def test_request_invalid_trace_id(self):
198
        """Test trace manager tracing request."""
199 1
        result = self.trace_manager.get_result("1234")
200 1
        assert result == {"msg": "unknown trace id"}
201
202 1
    def test_trace_in_process(self):
203
        """Test trace manager in process."""
204 1
        self.trace_manager._spawn_trace = MagicMock()
205 1
        trace_id = 30001
206 1
        self.trace_manager._running_traces[trace_id] = MagicMock()
207 1
        result = self.trace_manager.get_result(trace_id)
208 1
        assert result == {"msg": "trace in process"}
209
210
    # pylint: disable=unused-argument, too-many-locals
211 1
    @patch(
212
        "napps.amlight.sdntrace.tracing.trace_manager.TraceManager.is_tracing_running"
213
    )
214 1
    @patch("napps.amlight.sdntrace.shared.colors.Colors.get_switch_color")
215 1
    @patch("napps.amlight.sdntrace.shared.colors.Colors.aget_switch_color")
216 1
    @patch("napps.amlight.sdntrace.tracing.tracer.TracePath.send_trace_probe")
217 1
    @patch("napps.amlight.sdntrace.tracing.tracer.TracePath.tracepath_loop")
218 1
    async def test_get_result(
219
        self,
220
        mock_trace_loop,
221
        mock_send_probe,
222
        mock_colors,
223
        mock_acolors,
224
        mock_is_running,
225
    ):
226
        """Test tracemanager tracing request and resultS."""
227
228 1
        colors = {
229
            "color_field": "dl_src",
230
            "color_value": "ee:ee:ee:ee:ee:01",
231
        }
232 1
        mock_colors.return_value = colors
233 1
        mock_acolors.return_value = colors
234 1
        mock_send_probe.return_value = {
235
            "dpid": "00:00:00:00:00:00:00:01",
236
            "port": 1,
237
        }, ""
238 1
        mock_trace_loop.return_value = True
239
240 1
        eth = {"dl_vlan": 100}
241 1
        dpid = {"dpid": "00:00:00:00:00:00:00:01", "in_port": 1}
242 1
        switch = {"switch": dpid, "eth": eth, "timeout": 0.1}
243 1
        entries = {"trace": switch}
244
245 1
        self.trace_manager._request_queue = Queue()
246 1
        self.trace_manager._is_tracing_running = True
247 1
        mock_is_running.side_effect = [True, False]
248 1
        trace_entries = await self.trace_manager.is_entry_valid(entries)
249 1
        trace_id = await self.trace_manager.new_trace(trace_entries)
250 1
        pending = self.trace_manager.number_pending_requests()
251
252 1
        trace_thread = threading.Thread(target=self.trace_manager._run_traces)
253 1
        trace_thread.start()
254
        # Waiting thread to start processing the request
255 1
        count = 0
256 1
        while pending == 1:
257 1
            result = self.trace_manager.get_result(trace_id)
258 1
            pending = self.trace_manager.number_pending_requests()
259 1
            await asyncio.sleep(0.1)
260 1
            count += 1
261 1
            if count > 30:
262
                pytest.fail("Timeout waiting to start processing trace")
263
                break
264
265 1
        count = 0
266 1
        result = self.trace_manager.get_result(trace_id)
267
        # Waiting thread to process the request
268 1
        while "msg" in result and result["msg"] == "trace in process":
269
            result = self.trace_manager.get_result(trace_id)
270
            await asyncio.sleep(0.1)
271
            count += 1
272
            if count > 30:
273
                pytest.fail("Timeout waiting to process trace")
274
                break
275 1
        self.trace_manager.stop_traces()
276 1
        trace_thread.join()
277 1
        assert result["request_id"] == 30001
278 1
        assert result["result"][0]["type"] == "starting"
279 1
        assert result["result"][0]["dpid"] == "00:00:00:00:00:00:00:01"
280 1
        assert result["result"][0]["port"] == 1
281 1
        assert result["result"][0]["time"] is not None
282 1
        assert result["start_time"] is not None
283 1
        assert result["total_time"] is not None
284 1
        assert result["request"]["trace"]["switch"]["dpid"] == "00:00:00:00:00:00:00:01"
285 1
        assert result["request"]["trace"]["switch"]["in_port"] == 1
286
287 1
    @patch("napps.amlight.sdntrace.shared.colors.Colors.aget_switch_color")
288 1
    async def test_duplicated_request(self, mock_colors):
289
        """Test trace manager new trace creation."""
290 1
        mock_colors.return_value = {
291
            "color_field": "dl_src",
292
            "color_value": "ee:ee:ee:ee:ee:01",
293
        }
294
295 1
        eth = {"dl_vlan": 100}
296 1
        dpid = {"dpid": "00:00:00:00:00:00:00:01", "in_port": 1}
297 1
        switch = {"switch": dpid, "eth": eth}
298 1
        entries = {"trace": switch}
299
300 1
        trace_entries = await self.trace_manager.is_entry_valid(entries)
301 1
        assert isinstance(trace_entries, TraceEntries)
302
303 1
        trace_id = await self.trace_manager.new_trace(trace_entries)
304 1
        assert trace_id == 30001
305
306 1
        duplicated = self.trace_manager.avoid_duplicated_request(trace_entries)
307 1
        assert duplicated is True
308
309 1
    @patch("napps.amlight.sdntrace.shared.colors.Colors.aget_switch_color")
310 1
    async def test_avoid_duplicated_request(self, mock_colors):
311
        """Test trace manager new trace creation."""
312 1
        mock_colors.return_value = {
313
            "color_field": "dl_src",
314
            "color_value": "ee:ee:ee:ee:ee:01",
315
        }
316
317 1
        eth = {"dl_vlan": 100}
318 1
        dpid = {"dpid": "00:00:00:00:00:00:00:01", "in_port": 1}
319 1
        switch = {"switch": dpid, "eth": eth}
320 1
        entries = {"trace": switch}
321
322 1
        trace_entries = await self.trace_manager.is_entry_valid(entries)
323 1
        assert isinstance(trace_entries, TraceEntries)
324
325 1
        trace_id = await self.trace_manager.new_trace(trace_entries)
326 1
        assert trace_id == 30001
327
328 1
        entries["trace"]["switch"]["dpid"] = "00:00:00:00:00:00:00:02"
329 1
        trace_entries = await self.trace_manager.is_entry_valid(entries)
330 1
        assert isinstance(trace_entries, TraceEntries)
331
332 1
        duplicated = self.trace_manager.avoid_duplicated_request(trace_entries)
333 1
        assert duplicated is False
334
335 1
    def test_limit_traces_reached(self):
336
        """Test trace manager limit for thread processing."""
337
        # filling the running traces array
338 1
        mocked_obj = MagicMock()
339 1
        for i in range(settings.PARALLEL_TRACES - 1):
340 1
            self.trace_manager._running_traces[i] = mocked_obj
341 1
            is_limit = self.trace_manager.limit_traces_reached()
342 1
            assert not is_limit
343
344 1
        self.trace_manager._running_traces[settings.PARALLEL_TRACES] = mocked_obj
345 1
        is_limit = self.trace_manager.limit_traces_reached()
346 1
        assert is_limit
347
348 1
    @patch("napps.amlight.sdntrace.tracing.tracer.TracePath.tracepath")
349 1
    def test_spawn_trace(self, mock_tracepath):
350
        """Test spawn trace."""
351
        # mock_tracepath
352 1
        trace_id = 0
353 1
        trace_entries = MagicMock()
354
355 1
        self.trace_manager._running_traces[0] = 9999
356
357 1
        self.trace_manager._spawn_trace(trace_id, trace_entries)
358 1
        self.trace_manager.add_result(trace_id, {"result": "ok"})
359 1
        mock_tracepath.assert_called_once()
360 1
        assert len(self.trace_manager._running_traces) == 0
361
362
363 1
class TestTraceManagerTheadTest:
364
    """Now, load all entries at once"""
365
366 1
    def setup_method(self):
367
        """Set up before each test method"""
368 1
        TraceManager.run_traces = MagicMock()
369 1
        self.trace_manager = TraceManager(controller=get_controller_mock())
370 1
        self.trace_manager.stop_traces()
371
372 1
        self.count_running_traces = 0
373
374 1
    def run_trace_once(self):
375
        """function to replace the <flag> in "while <flag>()" code
376
        to run the loop just once."""
377 1
        if self.count_running_traces == 0:
378 1
            self.count_running_traces = self.count_running_traces + 1
379 1
            return True
380 1
        return False
381
382 1
    @patch("napps.amlight.sdntrace.shared.colors.Colors.get_switch_color")
383 1
    @patch("napps.amlight.sdntrace.tracing.tracer.TracePath.send_trace_probe")
384 1
    @patch("napps.amlight.sdntrace.tracing.tracer.TracePath.tracepath_loop")
385 1
    async def test_run_traces(self, mock_trace_loop, mock_send_probe, mock_colors):
386
        """Test tracemanager tracing request and results."""
387 1
        self.trace_manager._async_loop = asyncio.get_running_loop()
388 1
        mock_colors.return_value = {
389
            "color_field": "dl_src",
390
            "color_value": "ee:ee:ee:ee:ee:01",
391
        }
392 1
        mock_send_probe.return_value = {
393
            "dpid": "00:00:00:00:00:00:00:01",
394
            "port": 1,
395
        }, ""
396 1
        mock_trace_loop.return_value = True
397
398 1
        eth = {"dl_vlan": 100}
399 1
        dpid = {"dpid": "00:00:00:00:00:00:00:01", "in_port": 1}
400 1
        switch = {"switch": dpid, "eth": eth}
401 1
        entries = {"trace": switch}
402 1
        TraceEntries().load_entries(entries)
403
404 1
        trace_entries = self.trace_manager._request_queue = Queue()
405 1
        _ = await self.trace_manager.new_trace(trace_entries)
406
407 1
        trace_thread = threading.Thread(target=self.trace_manager._run_traces)
408 1
        with patch.object(
409
            TraceManager, "is_tracing_running", side_effect=self.run_trace_once
410
        ):
411 1
            trace_thread.start()
412 1
            trace_thread.join()
413
414 1
        assert self.trace_manager._request_queue.async_q.qsize() == 0
415 1
        assert self.trace_manager.number_pending_requests() == 0
416
417 1
    async def test_queue_probe_packet_error(self):
418
        """Test queue_probe_packet handle error."""
419 1
        self.trace_manager._trace_pkt_in = {30001: MagicMock()}
420 1
        self.trace_manager._trace_pkt_in[30001].async_q.put = AsyncMock()
421 1
        mock_msg = (
422
            b"\01\xca\xfe\xca\xfe\xca\xfe\xee\xee\xee\xee\xee\x01\x88\xa8\x00\x01"
423
            b"\x81\x00\x00\x01\x08\x00E\x00\x00{\x00\x00\x00\x00\xff\x00\xb7~\x01"
424
            b"\x01\x01\x01\x01\x01\x01\x02\x80\x04\x95\\\x00\x00\x00\x00\x00\x00"
425
            b"\x00\x8c(napps.amlight.sdntrace.tracing.trace_msg\x94\x8c\x08TraceMsg"
426
            b"\x94\x93\x94)\x81\x94}\x94(\x8c\x0b_request_id\x94M1u\x8c\x05_step"
427
            b"\x94K\x00ub."
428
        )
429 1
        eth = Ethernet()
430 1
        eth.unpack(mock_msg)
431 1
        await self.trace_manager.queue_probe_packet("event_mock", eth, 1, MagicMock())
432
433
        assert self.trace_manager._trace_pkt_in[30001].async_q.put.call_count == 0
434