TestLoopManager.test_set_loop_detected()   A
last analyzed

Complexity

Conditions 1

Size

Total Lines 30
Code Lines 22

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 22
CRAP Score 1

Importance

Changes 0
Metric Value
cc 1
eloc 22
nop 1
dl 0
loc 30
ccs 22
cts 22
cp 1
crap 1
rs 9.352
c 0
b 0
f 0
1
"""Test LoopManager methods."""
2 1
from datetime import timedelta
3 1
from unittest.mock import AsyncMock, MagicMock, patch
4
5 1
import pytest
6 1
from httpx import Response
7
8 1
from kytos.lib.helpers import get_interface_mock, get_switch_mock
9
10 1
from kytos.core.helpers import now
11 1
from napps.kytos.of_lldp.managers.loop_manager import LoopManager
12
13
14 1
class TestLoopManager:
15
    """Tests for LoopManager."""
16
17 1
    def setup_method(self):
18
        """Execute steps before each tests."""
19 1
        controller = MagicMock()
20 1
        self.loop_manager = LoopManager(controller)
21
22 1
    async def test_process_if_looped(self):
23
        """Test process_if_looped."""
24 1
        dpid = "00:00:00:00:00:00:00:01"
25 1
        switch = get_switch_mock(dpid, 0x04)
26 1
        intf_a = get_interface_mock("s1-eth1", 1, switch)
27 1
        intf_b = get_interface_mock("s1-eth2", 2, switch)
28 1
        self.loop_manager.ignored_loops = {}
29 1
        self.loop_manager.publish_loop_actions = AsyncMock()
30 1
        self.loop_manager.apublish_loop_state = AsyncMock()
31 1
        assert await self.loop_manager.process_if_looped(intf_a, intf_b)
32 1
        assert self.loop_manager.publish_loop_actions.call_count == 1
33 1
        assert self.loop_manager.apublish_loop_state.call_count == 1
34
35 1
    async def test_publish_loop_state(self):
36
        """Test publish_loop_state."""
37 1
        dpid = "00:00:00:00:00:00:00:01"
38 1
        switch = get_switch_mock(dpid, 0x04)
39 1
        intf_a = get_interface_mock("s1-eth1", 1, switch)
40 1
        intf_b = get_interface_mock("s1-eth2", 2, switch)
41 1
        state = "detected"
42 1
        self.loop_manager.controller.buffers.app.aput = AsyncMock()
43 1
        await self.loop_manager.apublish_loop_state(intf_a, intf_b, state)
44 1
        assert self.loop_manager.controller.buffers.app.aput.call_count == 1
45
46 1
    async def test_publish_loop_actions(self):
47
        """Test publish_loop_actions."""
48 1
        dpid = "00:00:00:00:00:00:00:01"
49 1
        switch = get_switch_mock(dpid, 0x04)
50 1
        intf_a = get_interface_mock("s1-eth1", 1, switch)
51 1
        intf_b = get_interface_mock("s1-eth2", 2, switch)
52 1
        self.loop_manager.controller.buffers.app.aput = AsyncMock()
53 1
        await self.loop_manager.publish_loop_actions(intf_a, intf_b)
54 1
        assert self.loop_manager.controller.buffers.app.aput.call_count == len(
55
            set(self.loop_manager.actions)
56
        )
57
58 1
    @pytest.mark.parametrize("dpid_a,port_a,dpid_b,port_b,expected", [
59
        ("00:00:00:00:00:00:00:01", 6, "00:00:00:00:00:00:00:01", 7, True),
60
        ("00:00:00:00:00:00:00:01", 1, "00:00:00:00:00:00:00:01", 2, True),
61
        ("00:00:00:00:00:00:00:01", 7, "00:00:00:00:00:00:00:01", 7, True),
62
        ("00:00:00:00:00:00:00:01", 8, "00:00:00:00:00:00:00:01", 1, False),
63
        ("00:00:00:00:00:00:00:01", 1, "00:00:00:00:00:00:00:02", 2, False),
64
        ("00:00:00:00:00:00:00:01", 2, "00:00:00:00:00:00:00:02", 1, False),
65
    ])
66 1
    def test_is_looped(self, dpid_a, port_a, dpid_b, port_b, expected):
67
        """Test is_looped cases."""
68 1
        assert self.loop_manager.is_looped(
69
            dpid_a, port_a, dpid_b, port_b
70
        ) == expected
71
72 1
    def test_is_loop_ignored(self):
73
        """Test is_loop_ignored."""
74
75 1
        dpid = "00:00:00:00:00:00:00:01"
76 1
        port_a = 1
77 1
        port_b = 2
78 1
        self.loop_manager.ignored_loops[dpid] = [[port_a, port_b]]
79
80 1
        assert self.loop_manager.is_loop_ignored(
81
            dpid, port_a=port_a, port_b=port_b
82
        )
83 1
        assert self.loop_manager.is_loop_ignored(
84
            dpid, port_a=port_b, port_b=port_a
85
        )
86
87 1
        assert not self.loop_manager.is_loop_ignored(
88
            dpid, port_a + 20, port_b
89
        )
90
91 1
        dpid = "00:00:00:00:00:00:00:02"
92 1
        assert not self.loop_manager.is_loop_ignored(dpid, port_a, port_b)
93
94 1
    @patch("napps.kytos.of_lldp.managers.loop_manager.log")
95 1
    async def test_handle_log_action(self, mock_log):
96
        """Test handle_log_action."""
97
98 1
        dpid = "00:00:00:00:00:00:00:01"
99 1
        switch = get_switch_mock(dpid, 0x04)
100 1
        intf_a = get_interface_mock("s1-eth1", 1, switch)
101 1
        intf_b = get_interface_mock("s1-eth2", 2, switch)
102
103 1
        await self.loop_manager.handle_log_action(intf_a, intf_b)
104 1
        mock_log.warning.call_count = 1
105 1
        assert self.loop_manager.loop_counter[dpid][(1, 2)] == 0
106 1
        await self.loop_manager.handle_log_action(intf_a, intf_b)
107 1
        mock_log.warning.call_count = 1
108 1
        assert self.loop_manager.loop_counter[dpid][(1, 2)] == 1
109
110 1
    @patch("napps.kytos.of_lldp.managers.loop_manager.log")
111 1
    async def test_handle_disable_action(self, mock_log, monkeypatch):
112
        """Test handle_disable_action."""
113
114 1
        dpid = "00:00:00:00:00:00:00:01"
115 1
        switch = get_switch_mock(dpid, 0x04)
116 1
        intf_a = get_interface_mock("s1-eth1", 1, switch)
117 1
        intf_b = get_interface_mock("s1-eth2", 2, switch)
118
119 1
        aclient_mock, awith_mock = AsyncMock(), MagicMock()
120 1
        aclient_mock.post.return_value = Response(200, json={},
121
                                                  request=MagicMock())
122 1
        awith_mock.return_value.__aenter__.return_value = aclient_mock
123 1
        monkeypatch.setattr("httpx.AsyncClient", awith_mock)
124
125 1
        await self.loop_manager.handle_disable_action(intf_a, intf_b)
126 1
        assert aclient_mock.post.call_count == 1
127 1
        assert mock_log.info.call_count == 1
128
129 1
    @patch("napps.kytos.of_lldp.managers.loop_manager.log")
130 1
    async def test_handle_loop_stopped(self, mock_log, monkeypatch):
131
        """Test handle_loop_stopped."""
132
133 1
        dpid = "00:00:00:00:00:00:00:01"
134 1
        switch = get_switch_mock(dpid, 0x04)
135 1
        intf_a = get_interface_mock("s1-eth1", 1, switch)
136 1
        intf_b = get_interface_mock("s1-eth2", 2, switch)
137
138 1
        aclient_mock, awith_mock = AsyncMock(), MagicMock()
139 1
        aclient_mock.post.return_value = Response(200, json={},
140
                                                  request=MagicMock())
141 1
        awith_mock.return_value.__aenter__.return_value = aclient_mock
142 1
        monkeypatch.setattr("httpx.AsyncClient", awith_mock)
143
144 1
        self.loop_manager.loop_state[dpid][(1, 2)] = {"state": "detected"}
145 1
        self.loop_manager.actions = ["log", "disable"]
146 1
        await self.loop_manager.handle_loop_stopped(intf_a, intf_b)
147 1
        assert intf_a.remove_metadata.call_count == 1
148 1
        intf_a.remove_metadata.assert_called_with("looped")
149 1
        assert "log" in self.loop_manager.actions
150 1
        assert "disable" in self.loop_manager.actions
151 1
        assert mock_log.info.call_count == 2
152 1
        assert self.loop_manager.loop_state[dpid][(1, 2)]["state"] == "stopped"
153
154 1
    async def test_set_loop_detected(self):
155
        """Test set_loop_detected."""
156
157 1
        dpid = "00:00:00:00:00:00:00:01"
158 1
        switch = get_switch_mock(dpid, 0x04)
159 1
        intf_a = get_interface_mock("s1-eth1", 1, switch)
160 1
        intf_b = get_interface_mock("s1-eth2", 2, switch)
161
162 1
        port_pair = [intf_a.port_number, intf_b.port_number]
163 1
        await self.loop_manager.set_loop_detected(intf_a, port_pair)
164 1
        assert intf_a.extend_metadata.call_count == 1
165
166 1
        tuple_pair = tuple(port_pair)
167 1
        loop_state = self.loop_manager.loop_state
168 1
        assert loop_state[dpid][tuple_pair]["state"] == "detected"
169 1
        detected_at = loop_state[dpid][tuple_pair]["detected_at"]
170 1
        assert detected_at
171 1
        updated_at = loop_state[dpid][tuple_pair]["updated_at"]
172 1
        assert updated_at
173
174
        # if it's called again updated_at should be udpated
175 1
        await self.loop_manager.set_loop_detected(intf_a, port_pair)
176 1
        assert intf_a.extend_metadata.call_count == 1
177 1
        assert loop_state[dpid][tuple_pair]["detected_at"] == detected_at
178 1
        assert loop_state[dpid][tuple_pair]["updated_at"] >= updated_at
179
180
        # force a different initial state to ensure it would overwrite
181 1
        self.loop_manager.loop_state[dpid][tuple_pair]["state"] = "stopped"
182 1
        await self.loop_manager.set_loop_detected(intf_a, port_pair)
183 1
        assert intf_a.extend_metadata.call_count == 2
184
185 1
    def test_get_stopped_loops(self):
186
        """Test get_stopped_loops."""
187 1
        dpid = "00:00:00:00:00:00:00:01"
188 1
        port_pairs = [(1, 2), (3, 3)]
189
190 1
        delta = now() - timedelta(minutes=1)
191 1
        looped_entry = {
192
            "state": "detected",
193
            "updated_at": delta.strftime("%Y-%m-%dT%H:%M:%S"),
194
        }
195 1
        for port_pair in port_pairs:
196 1
            self.loop_manager.loop_state[dpid][port_pair] = looped_entry
197 1
        assert self.loop_manager.get_stopped_loops() == {dpid: port_pairs}
198
199 1
    async def test_handle_topology_loaded(self):
200
        """Test handle_topology loaded."""
201 1
        dpid = "00:00:00:00:00:00:00:01"
202 1
        switch = get_switch_mock(dpid, 0x04)
203 1
        mock_topo = MagicMock()
204 1
        switch.metadata = {"ignored_loops": [[1, 2]]}
205 1
        mock_topo.switches = {dpid: switch}
206
207 1
        self.loop_manager.ignored_loops = {}
208 1
        assert dpid not in self.loop_manager.ignored_loops
209 1
        await self.loop_manager.handle_topology_loaded(mock_topo)
210 1
        assert self.loop_manager.ignored_loops[dpid] == [[1, 2]]
211
212 1
    async def test_handle_switch_metadata_changed_added(self):
213
        """Test handle_switch_metadata_changed added."""
214 1
        dpid = "00:00:00:00:00:00:00:01"
215 1
        switch = get_switch_mock(dpid, 0x04)
216 1
        switch.metadata = {"ignored_loops": [[1, 2]]}
217
218 1
        self.loop_manager.ignored_loops = {}
219 1
        assert dpid not in self.loop_manager.ignored_loops
220 1
        await self.loop_manager.handle_switch_metadata_changed(switch)
221 1
        assert self.loop_manager.ignored_loops[dpid] == [[1, 2]]
222
223 1
    async def test_handle_switch_metadata_changed_incrementally(self):
224
        """Test handle_switch_metadata_changed incrementally."""
225 1
        dpid = "00:00:00:00:00:00:00:01"
226 1
        switch = get_switch_mock(dpid, 0x04)
227 1
        switch.id = dpid
228 1
        switch.metadata = {"ignored_loops": [[1, 2]]}
229 1
        self.loop_manager.ignored_loops = {}
230
231 1
        assert dpid not in self.loop_manager.ignored_loops
232 1
        await self.loop_manager.handle_switch_metadata_changed(switch)
233 1
        assert self.loop_manager.ignored_loops[dpid] == [[1, 2]]
234
235 1
        switch.metadata = {"ignored_loops": [[1, 2], [3, 4]]}
236 1
        await self.loop_manager.handle_switch_metadata_changed(switch)
237 1
        assert self.loop_manager.ignored_loops[dpid] == [[1, 2], [3, 4]]
238
239 1
    async def test_handle_switch_metadata_changed_removed(self):
240
        """Test handle_switch_metadata_changed removed."""
241 1
        dpid = "00:00:00:00:00:00:00:01"
242 1
        switch = get_switch_mock(dpid, 0x04)
243 1
        switch.id = dpid
244 1
        switch.metadata = {"some_key": "some_value"}
245 1
        self.loop_manager.ignored_loops[dpid] = [[1, 2]]
246
247 1
        assert dpid in self.loop_manager.ignored_loops
248 1
        await self.loop_manager.handle_switch_metadata_changed(switch)
249
        assert dpid not in self.loop_manager.ignored_loops
250