Test Failed
Pull Request — master (#101)
by Vinicius
07:51 queued 03:18
created

TestLoopManager.test_process_if_looped()   A

Complexity

Conditions 1

Size

Total Lines 12
Code Lines 11

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 8
CRAP Score 1

Importance

Changes 0
Metric Value
cc 1
eloc 11
nop 1
dl 0
loc 12
ccs 8
cts 8
cp 1
crap 1
rs 9.85
c 0
b 0
f 0
1
"""Test LoopManager methods."""
2 1
from datetime import timedelta
3 1
from unittest.mock import AsyncMock, MagicMock, patch
4 1
5
import pytest
6 1
from httpx import Response
7
8 1
from kytos.lib.helpers import get_interface_mock, get_switch_mock
9 1
10
from kytos.core.helpers import now
11
from napps.kytos.of_lldp.managers.loop_manager import LoopManager
12 1
13
14 1
class TestLoopManager:
15 1
    """Tests for LoopManager."""
16 1
17 1
    def setup_method(self):
18 1
        """Execute steps before each tests."""
19 1
        controller = MagicMock()
20 1
        self.loop_manager = LoopManager(controller)
21 1
22
    async def test_process_if_looped(self):
23
        """Test process_if_looped."""
24 1
        dpid = "00:00:00:00:00:00:00:01"
25
        switch = get_switch_mock(dpid, 0x04)
26 1
        intf_a = get_interface_mock("s1-eth1", 1, switch)
27 1
        intf_b = get_interface_mock("s1-eth2", 2, switch)
28 1
        self.loop_manager.ignored_loops = {}
29 1
        self.loop_manager.publish_loop_actions = AsyncMock()
30 1
        self.loop_manager.apublish_loop_state = AsyncMock()
31 1
        assert await self.loop_manager.process_if_looped(intf_a, intf_b)
32 1
        assert self.loop_manager.publish_loop_actions.call_count == 1
33
        assert self.loop_manager.apublish_loop_state.call_count == 1
34
35
    async def test_publish_loop_state(self):
36
        """Test publish_loop_state."""
37 1
        dpid = "00:00:00:00:00:00:00:01"
38
        switch = get_switch_mock(dpid, 0x04)
39 1
        intf_a = get_interface_mock("s1-eth1", 1, switch)
40 1
        intf_b = get_interface_mock("s1-eth2", 2, switch)
41 1
        state = "detected"
42 1
        self.loop_manager.controller.buffers.app.aput = AsyncMock()
43 1
        await self.loop_manager.apublish_loop_state(intf_a, intf_b, state)
44 1
        assert self.loop_manager.controller.buffers.app.aput.call_count == 1
45 1
46 1
    async def test_publish_loop_actions(self):
47 1
        """Test publish_loop_actions."""
48 1
        dpid = "00:00:00:00:00:00:00:01"
49 1
        switch = get_switch_mock(dpid, 0x04)
50
        intf_a = get_interface_mock("s1-eth1", 1, switch)
51
        intf_b = get_interface_mock("s1-eth2", 2, switch)
52 1
        self.loop_manager.controller.buffers.app.aput = AsyncMock()
53
        await self.loop_manager.publish_loop_actions(intf_a, intf_b)
54
        assert self.loop_manager.controller.buffers.app.aput.call_count == len(
55 1
            set(self.loop_manager.actions)
56
        )
57 1
58 1
    @pytest.mark.parametrize("dpid_a,port_a,dpid_b,port_b,expected", [
59
        ("00:00:00:00:00:00:00:01", 6, "00:00:00:00:00:00:00:01", 7, True),
60 1
        ("00:00:00:00:00:00:00:01", 1, "00:00:00:00:00:00:00:01", 2, True),
61
        ("00:00:00:00:00:00:00:01", 7, "00:00:00:00:00:00:00:01", 7, True),
62
        ("00:00:00:00:00:00:00:01", 8, "00:00:00:00:00:00:00:01", 1, False),
63 1
        ("00:00:00:00:00:00:00:01", 1, "00:00:00:00:00:00:00:02", 2, False),
64 1
        ("00:00:00:00:00:00:00:01", 2, "00:00:00:00:00:00:00:02", 1, False),
65 1
    ])
66
    def test_is_looped(self, dpid_a, port_a, dpid_b, port_b, expected):
67
        """Test is_looped cases."""
68
        assert self.loop_manager.is_looped(
69
            dpid_a, port_a, dpid_b, port_b
70
        ) == expected
71
72
    def test_is_loop_ignored(self):
73 1
        """Test is_loop_ignored."""
74 1
75
        dpid = "00:00:00:00:00:00:00:01"
76
        port_a = 1
77 1
        port_b = 2
78
        self.loop_manager.ignored_loops[dpid] = [[port_a, port_b]]
79
80
        assert self.loop_manager.is_loop_ignored(
81
            dpid, port_a=port_a, port_b=port_b
82
        )
83
        assert self.loop_manager.is_loop_ignored(
84 1
            dpid, port_a=port_b, port_b=port_a
85
        )
86
87 1
        assert not self.loop_manager.is_loop_ignored(
88 1
            dpid, port_a + 20, port_b
89 1
        )
90 1
91
        dpid = "00:00:00:00:00:00:00:02"
92 1
        assert not self.loop_manager.is_loop_ignored(dpid, port_a, port_b)
93
94
    @patch("napps.kytos.of_lldp.managers.loop_manager.log")
95 1
    async def test_handle_log_action(self, mock_log):
96
        """Test handle_log_action."""
97
98
        dpid = "00:00:00:00:00:00:00:01"
99 1
        switch = get_switch_mock(dpid, 0x04)
100
        intf_a = get_interface_mock("s1-eth1", 1, switch)
101
        intf_b = get_interface_mock("s1-eth2", 2, switch)
102
103 1
        await self.loop_manager.handle_log_action(intf_a, intf_b)
104 1
        mock_log.warning.call_count = 1
105
        assert self.loop_manager.loop_counter[dpid][(1, 2)] == 0
106 1
        await self.loop_manager.handle_log_action(intf_a, intf_b)
107 1
        mock_log.warning.call_count = 1
108
        assert self.loop_manager.loop_counter[dpid][(1, 2)] == 1
109
110 1
    @patch("napps.kytos.of_lldp.managers.loop_manager.log")
111 1
    async def test_handle_disable_action(self, mock_log, monkeypatch):
112 1
        """Test handle_disable_action."""
113 1
114
        dpid = "00:00:00:00:00:00:00:01"
115 1
        switch = get_switch_mock(dpid, 0x04)
116 1
        intf_a = get_interface_mock("s1-eth1", 1, switch)
117 1
        intf_b = get_interface_mock("s1-eth2", 2, switch)
118 1
119 1
        aclient_mock, awith_mock = AsyncMock(), MagicMock()
120 1
        aclient_mock.post.return_value = Response(200, json={},
121
                                                  request=MagicMock())
122 1
        awith_mock.return_value.__aenter__.return_value = aclient_mock
123 1
        monkeypatch.setattr("httpx.AsyncClient", awith_mock)
124 1
125
        await self.loop_manager.handle_disable_action(intf_a, intf_b)
126
        assert aclient_mock.post.call_count == 1
127 1
        assert mock_log.info.call_count == 1
128 1
129 1
    @patch("napps.kytos.of_lldp.managers.loop_manager.log")
130 1
    async def test_handle_loop_stopped(self, mock_log, monkeypatch):
131
        """Test handle_loop_stopped."""
132 1
133 1
        dpid = "00:00:00:00:00:00:00:01"
134 1
        switch = get_switch_mock(dpid, 0x04)
135 1
        intf_a = get_interface_mock("s1-eth1", 1, switch)
136 1
        intf_b = get_interface_mock("s1-eth2", 2, switch)
137 1
138
        aclient_mock, awith_mock = AsyncMock(), MagicMock()
139 1
        aclient_mock.post.return_value = Response(200, json={},
140 1
                                                  request=MagicMock())
141 1
        awith_mock.return_value.__aenter__.return_value = aclient_mock
142
        monkeypatch.setattr("httpx.AsyncClient", awith_mock)
143
144 1
        self.loop_manager.loop_state[dpid][(1, 2)] = {"state": "detected"}
145 1
        self.loop_manager.actions = ["log", "disable"]
146 1
        await self.loop_manager.handle_loop_stopped(intf_a, intf_b)
147 1
        assert intf_a.remove_metadata.call_count == 1
148
        intf_a.remove_metadata.assert_called_with("looped")
149 1
        assert "log" in self.loop_manager.actions
150 1
        assert "disable" in self.loop_manager.actions
151 1
        assert mock_log.info.call_count == 2
152 1
        assert self.loop_manager.loop_state[dpid][(1, 2)]["state"] == "stopped"
153 1
154 1
    async def test_set_loop_detected(self):
155 1
        """Test set_loop_detected."""
156 1
157 1
        dpid = "00:00:00:00:00:00:00:01"
158 1
        switch = get_switch_mock(dpid, 0x04)
159 1
        intf_a = get_interface_mock("s1-eth1", 1, switch)
160
        intf_b = get_interface_mock("s1-eth2", 2, switch)
161 1
162
        port_pair = [intf_a.port_number, intf_b.port_number]
163 1
        await self.loop_manager.set_loop_detected(intf_a, port_pair)
164 1
        assert intf_a.extend_metadata.call_count == 1
165 1
166 1
        tuple_pair = tuple(port_pair)
167 1
        loop_state = self.loop_manager.loop_state
168 1
        assert loop_state[dpid][tuple_pair]["state"] == "detected"
169 1
        detected_at = loop_state[dpid][tuple_pair]["detected_at"]
170
        assert detected_at
171 1
        updated_at = loop_state[dpid][tuple_pair]["updated_at"]
172
        assert updated_at
173 1
174 1
        # if it's called again updated_at should be udpated
175 1
        await self.loop_manager.set_loop_detected(intf_a, port_pair)
176 1
        assert intf_a.extend_metadata.call_count == 1
177 1
        assert loop_state[dpid][tuple_pair]["detected_at"] == detected_at
178
        assert loop_state[dpid][tuple_pair]["updated_at"] >= updated_at
179 1
180 1
        # force a different initial state to ensure it would overwrite
181 1
        self.loop_manager.loop_state[dpid][tuple_pair]["state"] = "stopped"
182
        await self.loop_manager.set_loop_detected(intf_a, port_pair)
183
        assert intf_a.extend_metadata.call_count == 2
184
185 1
    def test_get_stopped_loops(self):
186
        """Test get_stopped_loops."""
187
        dpid = "00:00:00:00:00:00:00:01"
188 1
        port_pairs = [(1, 2), (3, 3)]
189 1
190
        delta = now() - timedelta(minutes=1)
191
        looped_entry = {
192 1
            "state": "detected",
193 1
            "updated_at": delta.strftime("%Y-%m-%dT%H:%M:%S"),
194 1
        }
195
        for port_pair in port_pairs:
196 1
            self.loop_manager.loop_state[dpid][port_pair] = looped_entry
197 1
        assert self.loop_manager.get_stopped_loops() == {dpid: port_pairs}
198
199 1
    async def test_handle_topology_loaded(self):
200 1
        """Test handle_topology loaded."""
201 1
        dpid = "00:00:00:00:00:00:00:01"
202 1
        switch = get_switch_mock(dpid, 0x04)
203
        mock_topo = MagicMock()
204
        switch.metadata = {"ignored_loops": [[1, 2]]}
205
        mock_topo.switches = {dpid: switch}
206
207
        self.loop_manager.ignored_loops = {}
208 1
        assert dpid not in self.loop_manager.ignored_loops
209 1
        await self.loop_manager.handle_topology_loaded(mock_topo)
210
        assert self.loop_manager.ignored_loops[dpid] == [[1, 2]]
211 1
212 1
    async def test_handle_switch_metadata_changed_added(self):
213
        """Test handle_switch_metadata_changed added."""
214 1
        dpid = "00:00:00:00:00:00:00:01"
215 1
        switch = get_switch_mock(dpid, 0x04)
216 1
        switch.metadata = {"ignored_loops": [[1, 2]]}
217 1
218 1
        self.loop_manager.ignored_loops = {}
219
        assert dpid not in self.loop_manager.ignored_loops
220 1
        await self.loop_manager.handle_switch_metadata_changed(switch)
221
        assert self.loop_manager.ignored_loops[dpid] == [[1, 2]]
222 1
223 1
    async def test_handle_switch_metadata_changed_incrementally(self):
224
        """Test handle_switch_metadata_changed incrementally."""
225 1
        dpid = "00:00:00:00:00:00:00:01"
226 1
        switch = get_switch_mock(dpid, 0x04)
227
        switch.id = dpid
228
        switch.metadata = {"ignored_loops": [[1, 2]]}
229
        self.loop_manager.ignored_loops = {}
230 1
231 1
        assert dpid not in self.loop_manager.ignored_loops
232 1
        await self.loop_manager.handle_switch_metadata_changed(switch)
233
        assert self.loop_manager.ignored_loops[dpid] == [[1, 2]]
234 1
235
        switch.metadata = {"ignored_loops": [[1, 2], [3, 4]]}
236 1
        await self.loop_manager.handle_switch_metadata_changed(switch)
237 1
        assert self.loop_manager.ignored_loops[dpid] == [[1, 2], [3, 4]]
238 1
239 1
    async def test_handle_switch_metadata_changed_removed(self):
240 1
        """Test handle_switch_metadata_changed removed."""
241
        dpid = "00:00:00:00:00:00:00:01"
242 1
        switch = get_switch_mock(dpid, 0x04)
243 1
        switch.id = dpid
244 1
        switch.metadata = {"some_key": "some_value"}
245
        self.loop_manager.ignored_loops[dpid] = [[1, 2]]
246 1
247
        assert dpid in self.loop_manager.ignored_loops
248 1
        await self.loop_manager.handle_switch_metadata_changed(switch)
249
        assert dpid not in self.loop_manager.ignored_loops
250