Passed
Pull Request — master (#114)
by Aldo
04:31
created

TestTraceManagerTheadTest.commomn_patches()   A

Complexity

Conditions 2

Size

Total Lines 13
Code Lines 9

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 9
CRAP Score 2

Importance

Changes 0
Metric Value
cc 2
eloc 9
nop 2
dl 0
loc 13
ccs 9
cts 9
cp 1
crap 2
rs 9.95
c 0
b 0
f 0
1
"""
2
    Test tracing.trace_manager
3
"""
4
5 1
import asyncio
6 1
import threading
7 1
from unittest.mock import AsyncMock, MagicMock, patch
8
9 1
import pytest
10 1
from janus import Queue
11 1
from kytos.lib.helpers import (
12
    get_controller_mock,
13
    get_interface_mock,
14
    get_link_mock,
15
    get_switch_mock,
16
)
17 1
from napps.amlight.sdntrace import settings
18 1
from napps.amlight.sdntrace.shared.switches import Switches
19 1
from napps.amlight.sdntrace.tracing.trace_entries import TraceEntries
20 1
from napps.amlight.sdntrace.tracing.trace_manager import TraceManager
21
22
23
# pylint: disable=protected-access
24 1
class TestTraceManager:
25
    """Unit tests for tracing.trace_manager.TraceManager"""
26
27 1
    @pytest.fixture(autouse=True)
28 1
    def commomn_patches(self, request):
29
        """This function handles setup and cleanup for patches"""
30
        # This fixture sets up the common patches,
31
        # and a finalizer is added using addfinalizer to stop
32
        # the common patches after each test. This ensures that the cleanup
33
        # is performed after each test, and additional patch decorators
34
        # can be used within individual test functions.
35
36 1
        patcher = patch("kytos.core.helpers.run_on_thread", lambda x: x)
37 1
        mock_patch = patcher.start()
38
39 1
        _ = request.function.__name__
40
41 1
        def cleanup():
42 1
            patcher.stop()
43
44 1
        request.addfinalizer(cleanup)
45 1
        return mock_patch
46
47 1
    def setup_method(self):
48
        """Set up before each test method"""
49 1
        self.create_basic_switches(get_controller_mock())
50 1
        TraceManager.run_traces = MagicMock()
51 1
        self.trace_manager = TraceManager(controller=get_controller_mock())
52 1
        self.trace_manager._request_queue = AsyncMock()
53
54 1 View Code Duplication
    @classmethod
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
55 1
    def create_basic_switches(cls, controller):
56
        """Create basic mock switches for Kytos controller."""
57 1
        dpid_a = "00:00:00:00:00:00:00:01"
58 1
        dpid_b = "00:00:00:00:00:00:00:02"
59
60 1
        mock_switch_a = get_switch_mock(dpid_a, 0x04)
61 1
        mock_switch_b = get_switch_mock(dpid_b, 0x04)
62 1
        mock_interface_a = get_interface_mock("s1-eth1", 1, mock_switch_a)
63 1
        mock_interface_b = get_interface_mock("s2-eth1", 1, mock_switch_b)
64
65 1
        mock_link = get_link_mock(mock_interface_a, mock_interface_b)
66 1
        mock_link.id = "cf0f4071be4"
67 1
        mock_switch_a.id = dpid_a
68 1
        mock_switch_a.as_dict.return_value = {"metadata": {}}
69 1
        mock_switch_b.id = dpid_b
70 1
        mock_switch_b.as_dict.return_value = {"metadata": {}}
71
72 1
        controller.switches = {dpid_a: mock_switch_a, dpid_b: mock_switch_b}
73
74 1
        Switches(MagicMock())._switches = controller.switches
75
76 1
    async def test_is_entry_invalid(self):
77
        """Test if the entry request does not have a valid switch."""
78 1
        eth = {"dl_vlan": 100}
79 1
        dpid = {"dpid": "a", "in_port": 1}
80 1
        switch = {"switch": dpid, "eth": eth}
81 1
        entries = {"trace": switch}
82 1
        entry = await self.trace_manager.is_entry_valid(entries)
83
84 1
        assert entry == "Unknown Switch"
85
86 1
    async def test_is_entry_empty_dpid(self):
87
        """Test if the entry request does not have a valid switch."""
88 1
        eth = {"dl_vlan": 100}
89 1
        dpid = {"dpid": "", "in_port": 1}
90 1
        switch = {"switch": dpid, "eth": eth}
91 1
        entries = {"trace": switch}
92 1
        entry = await self.trace_manager.is_entry_valid(entries)
93
94 1
        assert entry == "Error: dpid allows [a-f], int, and :. Lengths: 1-16 and 23"
95
96 1
    async def test_is_entry_missing_dpid(self):
97
        """Test if the entry request with missing dpid."""
98 1
        eth = {"dl_vlan": 100}
99 1
        dpid = {}
100 1
        switch = {"switch": dpid, "eth": eth}
101 1
        entries = {"trace": switch}
102 1
        entry = await self.trace_manager.is_entry_valid(entries)
103
104 1
        assert entry == "Error: dpid not provided"
105
106 1
    @patch("napps.amlight.sdntrace.shared.colors.Colors.aget_switch_color")
107 1
    async def test_is_entry_invalid_not_colored(self, mock_acolors):
108
        """Test if the entry request does not have a valid color."""
109 1
        mock_acolors.return_value = {}
110 1
        eth = {"dl_vlan": 100}
111 1
        dpid = {"dpid": "00:00:00:00:00:00:00:01", "in_port": 1}
112 1
        switch = {"switch": dpid, "eth": eth}
113 1
        entries = {"trace": switch}
114 1
        entry = await self.trace_manager.is_entry_valid(entries)
115 1
        assert entry == "Switch not Colored"
116
117 1
    @patch("napps.amlight.sdntrace.shared.colors.Colors.aget_switch_color")
118 1
    async def test_is_entry_valid(self, mock_acolors):
119
        """Test if the entry request is valid."""
120 1
        mock_acolors.return_value = {
121
            "color_field": "dl_src",
122
            "color_value": "ee:ee:ee:ee:ee:01",
123
        }
124
125 1
        eth = {"dl_vlan": 100}
126 1
        dpid = {"dpid": "00:00:00:00:00:00:00:01", "in_port": 1}
127 1
        switch = {"switch": dpid, "eth": eth}
128 1
        entries = {"trace": switch}
129 1
        entry = await self.trace_manager.is_entry_valid(entries)
130 1
        assert entry.dpid == "00:00:00:00:00:00:00:01"
131
132 1
    @patch("napps.amlight.sdntrace.shared.colors.Colors.aget_switch_color")
133 1
    async def test_new_trace(self, mock_acolors):
134
        """Test trace manager new trace creation."""
135 1
        mock_acolors.return_value = {
136
            "color_field": "dl_src",
137
            "color_value": "ee:ee:ee:ee:ee:01",
138
        }
139
140 1
        eth = {"dl_vlan": 100}
141 1
        dpid = {"dpid": "00:00:00:00:00:00:00:01", "in_port": 1}
142 1
        switch = {"switch": dpid, "eth": eth}
143 1
        entries = {"trace": switch}
144
145 1
        trace_entries = await self.trace_manager.is_entry_valid(entries)
146 1
        assert isinstance(trace_entries, TraceEntries)
147
148 1
        trace_id = await self.trace_manager.new_trace(trace_entries)
149 1
        assert trace_id == 30001
150
151
        # new_trace does not check duplicated request.
152 1
        trace_id = await self.trace_manager.new_trace(trace_entries)
153 1
        assert trace_id == 30002
154
155 1
    def test_get_id(self):
156
        """Test trace manager ID control."""
157 1
        trace_id = self.trace_manager.get_id()
158 1
        assert trace_id == 30001
159
160 1
        trace_id = self.trace_manager.get_id()
161 1
        assert trace_id == 30002
162
163 1
        trace_id = self.trace_manager.get_id()
164 1
        assert trace_id == 30003
165
166 1
    @patch("napps.amlight.sdntrace.shared.colors.Colors.aget_switch_color")
167 1
    @patch("napps.amlight.sdntrace.tracing.tracer.TracePath.send_trace_probe")
168 1
    async def test_trace_pending(self, mock_send_probe, mock_acolors):
169
        """Test trace manager tracing request."""
170 1
        mock_acolors.return_value = {
171
            "color_field": "dl_src",
172
            "color_value": "ee:ee:ee:ee:ee:01",
173
        }
174 1
        mock_send_probe.return_value = {
175
            "dpid": "00:00:00:00:00:00:00:01",
176
            "port": 1,
177
        }, ""
178
179 1
        eth = {"dl_vlan": 100}
180 1
        dpid = {"dpid": "00:00:00:00:00:00:00:01", "in_port": 1}
181 1
        switch = {"switch": dpid, "eth": eth}
182 1
        entries = {"trace": switch}
183
184 1
        trace_entries = await self.trace_manager.is_entry_valid(entries)
185 1
        assert isinstance(trace_entries, TraceEntries)
186
187 1
        trace_id = await self.trace_manager.new_trace(trace_entries)
188 1
        assert trace_id == 30001
189
190 1
        pending = self.trace_manager.number_pending_requests()
191 1
        assert pending == 1
192
193 1
        result = self.trace_manager.get_result(trace_id)
194 1
        assert result == {"msg": "trace pending"}
195
196 1
    def test_request_invalid_trace_id(self):
197
        """Test trace manager tracing request."""
198 1
        result = self.trace_manager.get_result("1234")
199 1
        assert result == {"msg": "unknown trace id"}
200
201 1
    def test_trace_in_process(self):
202
        """Test trace manager in process."""
203 1
        self.trace_manager._spawn_trace = MagicMock()
204 1
        trace_id = 30001
205 1
        self.trace_manager._running_traces[trace_id] = MagicMock()
206 1
        result = self.trace_manager.get_result(trace_id)
207 1
        assert result == {"msg": "trace in process"}
208
209
    # pylint: disable=unused-argument, too-many-locals
210 1
    @patch(
211
        "napps.amlight.sdntrace.tracing.trace_manager.TraceManager.is_tracing_running"
212
    )
213 1
    @patch("napps.amlight.sdntrace.shared.colors.Colors.get_switch_color")
214 1
    @patch("napps.amlight.sdntrace.shared.colors.Colors.aget_switch_color")
215 1
    @patch("napps.amlight.sdntrace.tracing.tracer.TracePath.send_trace_probe")
216 1
    @patch("napps.amlight.sdntrace.tracing.tracer.TracePath.tracepath_loop")
217 1
    async def test_get_result(
218
        self,
219
        mock_trace_loop,
220
        mock_send_probe,
221
        mock_colors,
222
        mock_acolors,
223
        mock_is_running,
224
    ):
225
        """Test tracemanager tracing request and resultS."""
226
227 1
        colors = {
228
            "color_field": "dl_src",
229
            "color_value": "ee:ee:ee:ee:ee:01",
230
        }
231 1
        mock_colors.return_value = colors
232 1
        mock_acolors.return_value = colors
233 1
        mock_send_probe.return_value = {
234
            "dpid": "00:00:00:00:00:00:00:01",
235
            "port": 1,
236
        }, ""
237 1
        mock_trace_loop.return_value = True
238
239 1
        eth = {"dl_vlan": 100}
240 1
        dpid = {"dpid": "00:00:00:00:00:00:00:01", "in_port": 1}
241 1
        switch = {"switch": dpid, "eth": eth, "timeout": 0.1}
242 1
        entries = {"trace": switch}
243
244 1
        self.trace_manager._request_queue = Queue()
245 1
        self.trace_manager._is_tracing_running = True
246 1
        mock_is_running.side_effect = [True, False]
247 1
        trace_entries = await self.trace_manager.is_entry_valid(entries)
248 1
        trace_id = await self.trace_manager.new_trace(trace_entries)
249 1
        pending = self.trace_manager.number_pending_requests()
250
251 1
        trace_thread = threading.Thread(target=self.trace_manager._run_traces)
252 1
        trace_thread.start()
253
        # Waiting thread to start processing the request
254 1
        count = 0
255 1
        while pending == 1:
256 1
            result = self.trace_manager.get_result(trace_id)
257 1
            pending = self.trace_manager.number_pending_requests()
258 1
            await asyncio.sleep(0.1)
259 1
            count += 1
260 1
            if count > 30:
261
                pytest.fail("Timeout waiting to start processing trace")
262
                break
263
264 1
        count = 0
265 1
        result = self.trace_manager.get_result(trace_id)
266
        # Waiting thread to process the request
267 1
        while "msg" in result and result["msg"] == "trace in process":
268
            result = self.trace_manager.get_result(trace_id)
269
            await asyncio.sleep(0.1)
270
            count += 1
271
            if count > 30:
272
                pytest.fail("Timeout waiting to process trace")
273
                break
274 1
        self.trace_manager.stop_traces()
275 1
        trace_thread.join()
276 1
        assert result["request_id"] == 30001
277 1
        assert result["result"][0]["type"] == "starting"
278 1
        assert result["result"][0]["dpid"] == "00:00:00:00:00:00:00:01"
279 1
        assert result["result"][0]["port"] == 1
280 1
        assert result["result"][0]["time"] is not None
281 1
        assert result["start_time"] is not None
282 1
        assert result["total_time"] is not None
283 1
        assert result["request"]["trace"]["switch"]["dpid"] == "00:00:00:00:00:00:00:01"
284 1
        assert result["request"]["trace"]["switch"]["in_port"] == 1
285
286 1
    @patch("napps.amlight.sdntrace.shared.colors.Colors.aget_switch_color")
287 1
    async def test_duplicated_request(self, mock_colors):
288
        """Test trace manager new trace creation."""
289 1
        mock_colors.return_value = {
290
            "color_field": "dl_src",
291
            "color_value": "ee:ee:ee:ee:ee:01",
292
        }
293
294 1
        eth = {"dl_vlan": 100}
295 1
        dpid = {"dpid": "00:00:00:00:00:00:00:01", "in_port": 1}
296 1
        switch = {"switch": dpid, "eth": eth}
297 1
        entries = {"trace": switch}
298
299 1
        trace_entries = await self.trace_manager.is_entry_valid(entries)
300 1
        assert isinstance(trace_entries, TraceEntries)
301
302 1
        trace_id = await self.trace_manager.new_trace(trace_entries)
303 1
        assert trace_id == 30001
304
305 1
        duplicated = self.trace_manager.avoid_duplicated_request(trace_entries)
306 1
        assert duplicated is True
307
308 1
    @patch("napps.amlight.sdntrace.shared.colors.Colors.aget_switch_color")
309 1
    async def test_avoid_duplicated_request(self, mock_colors):
310
        """Test trace manager new trace creation."""
311 1
        mock_colors.return_value = {
312
            "color_field": "dl_src",
313
            "color_value": "ee:ee:ee:ee:ee:01",
314
        }
315
316 1
        eth = {"dl_vlan": 100}
317 1
        dpid = {"dpid": "00:00:00:00:00:00:00:01", "in_port": 1}
318 1
        switch = {"switch": dpid, "eth": eth}
319 1
        entries = {"trace": switch}
320
321 1
        trace_entries = await self.trace_manager.is_entry_valid(entries)
322 1
        assert isinstance(trace_entries, TraceEntries)
323
324 1
        trace_id = await self.trace_manager.new_trace(trace_entries)
325 1
        assert trace_id == 30001
326
327 1
        entries["trace"]["switch"]["dpid"] = "00:00:00:00:00:00:00:02"
328 1
        trace_entries = await self.trace_manager.is_entry_valid(entries)
329 1
        assert isinstance(trace_entries, TraceEntries)
330
331 1
        duplicated = self.trace_manager.avoid_duplicated_request(trace_entries)
332 1
        assert duplicated is False
333
334 1
    def test_limit_traces_reached(self):
335
        """Test trace manager limit for thread processing."""
336
        # filling the running traces array
337 1
        mocked_obj = MagicMock()
338 1
        for i in range(settings.PARALLEL_TRACES - 1):
339 1
            self.trace_manager._running_traces[i] = mocked_obj
340 1
            is_limit = self.trace_manager.limit_traces_reached()
341 1
            assert not is_limit
342
343 1
        self.trace_manager._running_traces[settings.PARALLEL_TRACES] = mocked_obj
344 1
        is_limit = self.trace_manager.limit_traces_reached()
345 1
        assert is_limit
346
347 1
    @patch("napps.amlight.sdntrace.tracing.tracer.TracePath.tracepath")
348 1
    def test_spawn_trace(self, mock_tracepath):
349
        """Test spawn trace."""
350
        # mock_tracepath
351 1
        trace_id = 0
352 1
        trace_entries = MagicMock()
353
354 1
        self.trace_manager._running_traces[0] = 9999
355
356 1
        self.trace_manager._spawn_trace(trace_id, trace_entries)
357 1
        self.trace_manager.add_result(trace_id, {"result": "ok"})
358 1
        mock_tracepath.assert_called_once()
359 1
        assert len(self.trace_manager._running_traces) == 0
360
361
362 1
class TestTraceManagerTheadTest:
363
    """Now, load all entries at once"""
364
365 1
    def setup_method(self):
366
        """Set up before each test method"""
367 1
        TraceManager.run_traces = MagicMock()
368 1
        self.trace_manager = TraceManager(controller=get_controller_mock())
369 1
        self.trace_manager.stop_traces()
370
371 1
        self.count_running_traces = 0
372
373 1
    def run_trace_once(self):
374
        """function to replace the <flag> in "while <flag>()" code
375
        to run the loop just once."""
376 1
        if self.count_running_traces == 0:
377 1
            self.count_running_traces = self.count_running_traces + 1
378 1
            return True
379 1
        return False
380
381 1
    @patch("napps.amlight.sdntrace.shared.colors.Colors.get_switch_color")
382 1
    @patch("napps.amlight.sdntrace.tracing.tracer.TracePath.send_trace_probe")
383 1
    @patch("napps.amlight.sdntrace.tracing.tracer.TracePath.tracepath_loop")
384 1
    async def test_run_traces(self, mock_trace_loop, mock_send_probe, mock_colors):
385
        """Test tracemanager tracing request and results."""
386 1
        self.trace_manager._async_loop = asyncio.get_running_loop()
387 1
        mock_colors.return_value = {
388
            "color_field": "dl_src",
389
            "color_value": "ee:ee:ee:ee:ee:01",
390
        }
391 1
        mock_send_probe.return_value = {
392
            "dpid": "00:00:00:00:00:00:00:01",
393
            "port": 1,
394
        }, ""
395 1
        mock_trace_loop.return_value = True
396
397 1
        eth = {"dl_vlan": 100}
398 1
        dpid = {"dpid": "00:00:00:00:00:00:00:01", "in_port": 1}
399 1
        switch = {"switch": dpid, "eth": eth}
400 1
        entries = {"trace": switch}
401 1
        TraceEntries().load_entries(entries)
402
403 1
        trace_entries = self.trace_manager._request_queue = Queue()
404 1
        _ = await self.trace_manager.new_trace(trace_entries)
405
406 1
        trace_thread = threading.Thread(target=self.trace_manager._run_traces)
407 1
        with patch.object(
408
            TraceManager, "is_tracing_running", side_effect=self.run_trace_once
409
        ):
410 1
            trace_thread.start()
411 1
            trace_thread.join()
412
413 1
        assert self.trace_manager._request_queue.async_q.qsize() == 0
414
        assert self.trace_manager.number_pending_requests() == 0
415