Passed
Pull Request — master (#163)
by Vinicius
04:26
created

test_int_manager   B

Complexity

Total Complexity 43

Size/Duplication

Total Lines 725
Duplicated Lines 0 %

Test Coverage

Coverage 100%

Importance

Changes 0
Metric Value
eloc 556
dl 0
loc 725
ccs 474
cts 474
cp 1
rs 8.96
c 0
b 0
f 0
wmc 43

25 Methods

Rating   Name   Duplication   Size   Complexity  
A TestINTManager.test_handle_pp_link_down() 0 29 1
B TestINTManager.test_load_uni_src_proxy_port() 0 78 1
B TestINTManager.test_get_proxy_port_or_raise() 0 34 6
B TestINTManager.test_handle_pp_link_up() 0 44 1
A TestINTManager.test_handle_pp_metadata_removed() 0 26 1
A TestINTManager.test_handle_pp_metadata_added_evcs_with_no_pp() 0 28 1
A TestINTManager.test_handle_pp_metadata_added() 0 23 1
A TestINTManager.test_handle_pp_metadata_added_exc_port_shared() 0 29 1
A TestINTManager.test_disable_int_metadata() 0 23 1
A TestINTManager.test_handle_pp_metadata_added_no_affected() 0 29 1
A TestINTManager.test_handle_pp_metadata_added_no_change() 0 27 1
A TestINTManager.test_validate_evc_stored_flows() 0 22 3
A TestINTManager.test_validate_dedicated_proxy_port_evcs_existing() 0 13 2
A TestINTManager.test_validate_proxy_ports_symmetry_intra_evc() 0 32 4
A TestINTManager.test_enable_int_metadata() 0 43 1
A TestINTManager.test__remove_int_flows() 0 9 1
A TestINTManager.test__remove_int_flows_by_cookies() 0 17 3
A TestINTManager.test__add_pps_evc_ids() 0 23 1
A TestINTManager.test_validate_proxy_ports_symmetry_inter_evc() 0 38 4
A TestINTManager.test__install_int_flows() 0 12 1
A TestINTManager.test_redeploy_int() 0 32 1
A TestINTManager.test__discard_pps_evc_ids() 0 23 1
A TestINTManager.test__send_flows() 0 12 1
A TestINTManager.test_validate_dedicated_proxy_port_evcs() 0 16 2
A TestINTManager.test_validate_intra_evc_different_proxy_ports() 0 16 2

How to fix   Complexity   

Complexity

Complex classes like test_int_manager often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""Test INTManager"""
2
3 1
import pytest
4
5 1
from unittest.mock import AsyncMock, MagicMock
6 1
from napps.kytos.telemetry_int.exceptions import ProxyPortSameSourceIntraEVC
7 1
from napps.kytos.telemetry_int.exceptions import ProxyPortShared
8 1
from napps.kytos.telemetry_int.managers.int import INTManager
9 1
from napps.kytos.telemetry_int import exceptions
10 1
from kytos.core.common import EntityStatus
11
12 1
from kytos.lib.helpers import (
13
    get_interface_mock,
14
    get_controller_mock,
15
    get_switch_mock,
16
)
17
18
19 1
class TestINTManager:
20
    """TestINTManager."""
21
22 1
    def test_get_proxy_port_or_raise(self) -> None:
23
        """Test proxy_port_or_raise."""
24 1
        dpid_a = "00:00:00:00:00:00:00:01"
25 1
        mock_switch_a = get_switch_mock(dpid_a, 0x04)
26 1
        mock_interface_a = get_interface_mock("s1-eth1", 1, mock_switch_a)
27 1
        mock_interface_a.metadata = {}
28 1
        intf_id = f"{dpid_a}:1"
29 1
        controller = get_controller_mock()
30 1
        evc_id = "3766c105686749"
31 1
        int_manager = INTManager(controller)
32
33
        # Initially the mocked interface and switch hasn't been associated in the ctrllr
34 1
        with pytest.raises(exceptions.ProxyPortNotFound) as exc:
35 1
            int_manager.get_proxy_port_or_raise(intf_id, evc_id)
36 1
        assert f"interface {intf_id} not found" in str(exc)
37
38
        # Now, proxy_port still hasn't been set yet
39 1
        controller.get_interface_by_id = lambda x: mock_interface_a
40 1
        with pytest.raises(exceptions.ProxyPortMetadataNotFound) as exc:
41 1
            int_manager.get_proxy_port_or_raise(intf_id, evc_id)
42 1
        assert f"metadata not found in {intf_id}" in str(exc)
43
44
        # Now, destination interface hasn't been mocked yet
45 1
        mock_interface_a.metadata = {"proxy_port": 5}
46 1
        with pytest.raises(exceptions.ProxyPortDestNotFound) as exc:
47 1
            int_manager.get_proxy_port_or_raise(intf_id, evc_id)
48 1
        assert "isn't looped" in str(exc)
49
50 1
        mock_interface_b = get_interface_mock("s1-eth5", 5, mock_switch_a)
51 1
        mock_interface_b.metadata = {"looped": {"port_numbers": [5, 6]}}
52 1
        mock_interface_a.switch.get_interface_by_port_no = lambda x: mock_interface_b
53
        # Now all dependencies have been mocked and it should get the ProxyPort
54 1
        pp = int_manager.get_proxy_port_or_raise(intf_id, evc_id)
55 1
        assert pp.source == mock_interface_b
56
57 1
    def test_load_uni_src_proxy_port(self) -> None:
58
        """Test test_load_uni_src_proxy_port."""
59 1
        dpid_a = "00:00:00:00:00:00:00:01"
60 1
        mock_switch_a = get_switch_mock(dpid_a, 0x04)
61 1
        mock_interface_a = get_interface_mock("s1-eth1", 1, mock_switch_a)
62 1
        mock_interface_a.metadata = {"proxy_port": 3}
63 1
        mock_interface_z = get_interface_mock("s1-eth2", 2, mock_switch_a)
64 1
        mock_interface_z.metadata = {"proxy_port": 5}
65 1
        intf_id_a = f"{dpid_a}:1"
66 1
        intf_id_z = f"{dpid_a}:2"
67 1
        intf_id_a_1 = f"{dpid_a}:3"
68 1
        intf_id_z_1 = f"{dpid_a}:5"
69
70 1
        mock_interface_a_1 = get_interface_mock("s1-eth3", 3, mock_switch_a)
71 1
        mock_interface_a_1.metadata = {"looped": {"port_numbers": [3, 4]}}
72 1
        mock_interface_a_2 = get_interface_mock("s1-eth4", 4, mock_switch_a)
73 1
        mock_interface_z_1 = get_interface_mock("s1-eth5", 5, mock_switch_a)
74 1
        mock_interface_z_1.metadata = {"looped": {"port_numbers": [5, 6]}}
75 1
        mock_interface_z_2 = get_interface_mock("s1-eth6", 6, mock_switch_a)
76
77 1
        def get_interface_by_port_no(port_no):
78 1
            data = {
79
                1: mock_interface_a,
80
                2: mock_interface_z,
81
                3: mock_interface_a_1,
82
                4: mock_interface_a_2,
83
                5: mock_interface_z_1,
84
                6: mock_interface_z_2,
85
            }
86 1
            return data[port_no]
87
88 1
        def get_interface_by_id(intf_id):
89 1
            data = {
90
                intf_id_a: mock_interface_a,
91
                intf_id_z: mock_interface_z,
92
            }
93 1
            return data[intf_id]
94
95 1
        controller = get_controller_mock()
96 1
        mock_switch_a.get_interface_by_port_no = get_interface_by_port_no
97 1
        controller.get_interface_by_id = get_interface_by_id
98
99 1
        evcs = {
100
            "3766c105686749": {
101
                "metadata": {"telemetry": {"enabled": True}},
102
                "uni_a": {"interface_id": intf_id_a},
103
                "uni_z": {"interface_id": intf_id_z},
104
            },
105
            "3766c105686748": {
106
                "metadata": {"telemetry": {"enabled": True}},
107
                "uni_a": {"interface_id": intf_id_a},
108
                "uni_z": {"interface_id": intf_id_z},
109
            },
110
            "3766c105686747": {
111
                "metadata": {"telemetry": {"enabled": False}},
112
                "uni_a": {"interface_id": intf_id_a},
113
                "uni_z": {"interface_id": intf_id_z},
114
            },
115
        }
116 1
        int_manager = INTManager(controller)
117 1
        int_manager.load_uni_src_proxy_ports(evcs)
118 1
        assert len(int_manager.unis_src) == 2
119 1
        assert int_manager.unis_src[intf_id_a] == intf_id_a_1
120 1
        assert int_manager.unis_src[intf_id_z] == intf_id_z_1
121
122 1
        assert len(int_manager.srcs_pp) == 2
123 1
        assert int_manager.srcs_pp[intf_id_a_1].source == mock_interface_a_1
124 1
        assert int_manager.srcs_pp[intf_id_a_1].destination == mock_interface_a_2
125 1
        assert int_manager.srcs_pp[intf_id_z_1].source == mock_interface_z_1
126 1
        assert int_manager.srcs_pp[intf_id_z_1].destination == mock_interface_z_2
127
128 1
        assert int_manager.srcs_pp[intf_id_a_1].evc_ids == {
129
            "3766c105686749",
130
            "3766c105686748",
131
        }
132 1
        assert int_manager.srcs_pp[intf_id_z_1].evc_ids == {
133
            "3766c105686749",
134
            "3766c105686748",
135
        }
136
137 1
    async def test_handle_pp_link_down(self, monkeypatch):
138
        """Test test_handle_pp_link_down."""
139 1
        int_manager = INTManager(MagicMock())
140 1
        api_mock, link_mock, pp_mock = AsyncMock(), MagicMock(), MagicMock()
141 1
        link_mock.endpoint_a.id = "some_intf_id"
142 1
        evc_id = "3766c105686748"
143 1
        int_manager.srcs_pp[link_mock.endpoint_a.id] = pp_mock
144 1
        pp_mock.evc_ids = {evc_id}
145
146 1
        monkeypatch.setattr("napps.kytos.telemetry_int.managers.int.api", api_mock)
147 1
        api_mock.get_evcs.return_value = {evc_id: {}}
148 1
        int_manager.remove_int_flows = AsyncMock()
149
150 1
        await int_manager.handle_pp_link_down(link_mock)
151 1
        assert api_mock.get_evcs.call_count == 1
152 1
        assert api_mock.get_evcs.call_count == 1
153 1
        assert api_mock.get_evcs.call_args[1] == {
154
            "metadata.telemetry.enabled": "true",
155
            "metadata.telemetry.status": "UP",
156
        }
157 1
        assert int_manager.remove_int_flows.call_count == 1
158 1
        args = int_manager.remove_int_flows.call_args[0]
159 1
        assert evc_id in args[0]
160 1
        assert "telemetry" in args[1]
161 1
        telemetry = args[1]["telemetry"]
162 1
        assert telemetry["enabled"]
163 1
        assert telemetry["status"] == "DOWN"
164 1
        assert telemetry["status_reason"] == ["proxy_port_down"]
165 1
        assert "status_updated_at" in telemetry
166
167 1
    async def test_handle_pp_link_up(self, monkeypatch):
168
        """Test handle_pp_link_up."""
169 1
        int_manager = INTManager(MagicMock())
170 1
        api_mock, link_mock, pp_mock = AsyncMock(), MagicMock(), MagicMock()
171 1
        link_mock.endpoint_a.id = "3"
172 1
        pp_mock.status = EntityStatus.UP
173 1
        link_mock.status = EntityStatus.UP
174 1
        link_mock.status_reason = []
175 1
        evc_id = "3766c105686748"
176 1
        uni_a_id, uni_z_id = "1", "2"
177 1
        src_a_id, src_z_id = "3", "5"
178 1
        int_manager.srcs_pp[src_a_id] = pp_mock
179 1
        int_manager.srcs_pp[src_z_id] = pp_mock
180 1
        int_manager.unis_src[uni_a_id] = src_a_id
181 1
        int_manager.unis_src[uni_z_id] = src_z_id
182 1
        pp_mock.evc_ids = {evc_id}
183
184 1
        monkeypatch.setattr("napps.kytos.telemetry_int.managers.int.api", api_mock)
185 1
        api_mock.get_evcs.return_value = {
186
            evc_id: {
187
                "active": True,
188
                "archived": False,
189
                "uni_a": {"interface_id": uni_a_id},
190
                "uni_z": {"interface_id": uni_z_id},
191
            }
192
        }
193 1
        int_manager.install_int_flows = AsyncMock()
194 1
        int_manager._validate_map_enable_evcs = MagicMock()
195
196 1
        await int_manager.handle_pp_link_up(link_mock)
197 1
        assert api_mock.get_evcs.call_count == 1
198 1
        assert api_mock.get_evcs.call_args[1] == {
199
            "metadata.telemetry.enabled": "true",
200
            "metadata.telemetry.status": "DOWN",
201
        }
202 1
        assert int_manager.install_int_flows.call_count == 1
203 1
        args = int_manager.install_int_flows.call_args[0]
204 1
        assert "telemetry" in args[1]
205 1
        telemetry_dict = args[1]["telemetry"]
206 1
        expected_keys = ["enabled", "status", "status_reason", "status_updated_at"]
207 1
        assert sorted(list(telemetry_dict.keys())) == sorted(expected_keys)
208 1
        assert telemetry_dict["enabled"]
209 1
        assert telemetry_dict["status"] == "UP"
210 1
        assert not telemetry_dict["status_reason"]
211
212 1
    async def test_handle_pp_metadata_removed(self, monkeypatch):
213
        """Test handle_pp_metadata_removed."""
214 1
        int_manager = INTManager(MagicMock())
215 1
        api_mock, intf_mock, pp_mock = AsyncMock(), MagicMock(), MagicMock()
216 1
        intf_mock.id = "some_intf_id"
217 1
        source_id = "some_source_id"
218 1
        evc_id = "3766c105686748"
219 1
        int_manager.unis_src[intf_mock.id] = source_id
220 1
        int_manager.srcs_pp[source_id] = pp_mock
221 1
        pp_mock.evc_ids = {evc_id}
222
223 1
        assert "proxy_port" not in intf_mock.metadata
224 1
        monkeypatch.setattr("napps.kytos.telemetry_int.managers.int.api", api_mock)
225 1
        api_mock.get_evcs.return_value = {evc_id: {}}
226 1
        int_manager.disable_int = AsyncMock()
227
228 1
        await int_manager.handle_pp_metadata_removed(intf_mock)
229 1
        assert api_mock.get_evcs.call_count == 1
230 1
        assert api_mock.get_evcs.call_count == 1
231 1
        assert api_mock.get_evcs.call_args[1] == {
232
            "metadata.telemetry.enabled": "true",
233
            "metadata.telemetry.status": "UP",
234
        }
235 1
        assert int_manager.disable_int.call_count == 1
236 1
        args = int_manager.disable_int.call_args[0]
237 1
        assert evc_id in args[0]
238
239 1
    async def test_handle_pp_metadata_added(self, monkeypatch):
240
        """Test handle_pp_metadata_added."""
241 1
        int_manager = INTManager(MagicMock())
242 1
        api_mock, intf_mock, pp_mock = AsyncMock(), MagicMock(), MagicMock()
243 1
        intf_mock.id = "some_intf_id"
244 1
        intf_mock.metadata = {"proxy_port": 2}
245 1
        evc_id = "3766c105686748"
246 1
        pp_mock.evc_ids = {evc_id}
247 1
        int_manager.get_proxy_port_or_raise = MagicMock()
248 1
        int_manager.get_proxy_port_or_raise.return_value = pp_mock
249
250 1
        assert "proxy_port" in intf_mock.metadata
251 1
        monkeypatch.setattr("napps.kytos.telemetry_int.managers.int.api", api_mock)
252 1
        api_mock.get_evcs.return_value = {evc_id: {}}
253 1
        int_manager.disable_int = AsyncMock()
254 1
        int_manager.enable_int = AsyncMock()
255
256 1
        await int_manager.handle_pp_metadata_added(intf_mock)
257 1
        assert api_mock.get_evcs.call_count == 1
258 1
        assert api_mock.get_evcs.call_count == 1
259 1
        assert api_mock.get_evcs.call_args[1] == {"metadata.telemetry.enabled": "true"}
260 1
        assert int_manager.disable_int.call_count == 1
261 1
        assert int_manager.enable_int.call_count == 1
262
263 1
    async def test_handle_pp_metadata_added_evcs_with_no_pp(self, monkeypatch):
264
        """Test handle_pp_metadata_added with existing evcs with no pp."""
265 1
        int_manager = INTManager(MagicMock())
266 1
        api_mock, intf_mock, pp_mock = AsyncMock(), MagicMock(), MagicMock()
267 1
        intf_mock.id = "some_intf_id"
268 1
        intf_mock.metadata = {"proxy_port": 2}
269 1
        evc_id = "3766c105686748"
270 1
        pp_mock.evc_ids = {}
271 1
        int_manager.get_proxy_port_or_raise = MagicMock()
272 1
        int_manager.get_proxy_port_or_raise.return_value = pp_mock
273
274 1
        assert "proxy_port" in intf_mock.metadata
275 1
        monkeypatch.setattr("napps.kytos.telemetry_int.managers.int.api", api_mock)
276 1
        api_mock.get_evcs.return_value = {
277
            evc_id: {
278
                "uni_a": {"interface_id": "some_intf_id"},
279
                "uni_z": {"interface_id": "another_intf_id"},
280
            }
281
        }
282 1
        int_manager.disable_int = AsyncMock()
283 1
        int_manager.enable_int = AsyncMock()
284
285 1
        await int_manager.handle_pp_metadata_added(intf_mock)
286 1
        assert api_mock.get_evcs.call_count == 1
287 1
        assert api_mock.get_evcs.call_count == 1
288 1
        assert api_mock.get_evcs.call_args[1] == {"metadata.telemetry.enabled": "true"}
289 1
        assert int_manager.disable_int.call_count == 1
290 1
        assert int_manager.enable_int.call_count == 1
291
292 1
    async def test_handle_pp_metadata_added_no_change(self, monkeypatch):
293
        """Test handle_pp_metadata_added no change."""
294 1
        int_manager = INTManager(MagicMock())
295 1
        api_mock, intf_mock, pp_mock = AsyncMock(), MagicMock(), MagicMock()
296 1
        intf_mock.id = "some_intf_id"
297 1
        source_id, source_port = "some_source_id", 2
298 1
        source_intf = MagicMock()
299 1
        intf_mock.metadata = {"proxy_port": source_port}
300 1
        evc_id = "3766c105686748"
301 1
        int_manager.unis_src[intf_mock.id] = source_id
302 1
        int_manager.srcs_pp[source_id] = pp_mock
303 1
        pp_mock.evc_ids = {evc_id}
304
305
        # Simulating that the current and new proxy_port source are the same
306 1
        pp_mock.source = source_intf
307 1
        intf_mock.switch.get_interface_by_port_no.return_value = source_intf
308
309 1
        assert "proxy_port" in intf_mock.metadata
310 1
        monkeypatch.setattr("napps.kytos.telemetry_int.managers.int.api", api_mock)
311 1
        api_mock.get_evcs.return_value = {evc_id: {}}
312 1
        int_manager.disable_int = AsyncMock()
313 1
        int_manager.enable_int = AsyncMock()
314
315 1
        await int_manager.handle_pp_metadata_added(intf_mock)
316 1
        assert not api_mock.get_evcs.call_count
317 1
        assert not int_manager.disable_int.call_count
318 1
        assert not int_manager.enable_int.call_count
319
320 1
    async def test_handle_pp_metadata_added_no_affected(self, monkeypatch):
321
        """Test handle_pp_metadata_added no affected evcs."""
322 1
        int_manager = INTManager(MagicMock())
323 1
        int_manager.get_proxy_port_or_raise = MagicMock()
324 1
        api_mock, intf_mock, pp_mock = AsyncMock(), MagicMock(), MagicMock()
325 1
        intf_mock.id = "some_intf_id"
326 1
        source_id, source_port = "some_source_id", 2
327 1
        intf_mock.metadata = {"proxy_port": source_port}
328 1
        evc_id = "3766c105686748"
329 1
        int_manager.unis_src[intf_mock.id] = source_id
330 1
        int_manager.srcs_pp[source_id] = pp_mock
331 1
        pp_mock.evc_ids = {evc_id}
332
333 1
        assert "proxy_port" in intf_mock.metadata
334 1
        monkeypatch.setattr("napps.kytos.telemetry_int.managers.int.api", api_mock)
335
336
        # Simulating returning no EVCs that were enabled and UP
337 1
        api_mock.get_evcs.return_value = {}
338 1
        int_manager.disable_int = AsyncMock()
339 1
        int_manager.enable_int = AsyncMock()
340
341 1
        await int_manager.handle_pp_metadata_added(intf_mock)
342 1
        assert api_mock.get_evcs.call_count == 1
343 1
        assert api_mock.get_evcs.call_count == 1
344 1
        assert api_mock.get_evcs.call_args[1] == {
345
            "metadata.telemetry.enabled": "true",
346
        }
347 1
        assert not int_manager.disable_int.call_count
348 1
        assert not int_manager.enable_int.call_count
349
350 1
    async def test_handle_pp_metadata_added_exc_port_shared(self, monkeypatch):
351
        """Test handle_pp_metadata_added exception port shared."""
352 1
        log_mock = MagicMock()
353 1
        monkeypatch.setattr("napps.kytos.telemetry_int.managers.int.log", log_mock)
354 1
        int_manager = INTManager(MagicMock())
355 1
        api_mock, intf_mock, pp_mock = AsyncMock(), MagicMock(), MagicMock()
356 1
        intf_mock.id = "some_intf_id"
357 1
        intf_mock.metadata = {"proxy_port": 2}
358 1
        evc_id = "3766c105686748"
359 1
        int_manager.get_proxy_port_or_raise = MagicMock()
360 1
        pp_mock.evc_ids = {evc_id}
361 1
        int_manager.get_proxy_port_or_raise.return_value = pp_mock
362
363 1
        assert "proxy_port" in intf_mock.metadata
364 1
        monkeypatch.setattr("napps.kytos.telemetry_int.managers.int.api", api_mock)
365 1
        api_mock.get_evcs.return_value = {evc_id: {}}
366 1
        int_manager.disable_int = AsyncMock()
367 1
        int_manager.enable_int = AsyncMock()
368 1
        int_manager.enable_int.side_effect = ProxyPortShared(evc_id, "shared")
369
370 1
        await int_manager.handle_pp_metadata_added(intf_mock)
371 1
        assert api_mock.get_evcs.call_count == 1
372 1
        assert api_mock.get_evcs.call_count == 1
373 1
        assert api_mock.get_evcs.call_args[1] == {"metadata.telemetry.enabled": "true"}
374 1
        assert int_manager.disable_int.call_count == 1
375 1
        assert int_manager.enable_int.call_count == 1
376
377 1
        assert api_mock.add_evcs_metadata.call_count == 1
378 1
        assert log_mock.error.call_count == 1
379
380 1
    async def test_disable_int_metadata(self, monkeypatch) -> None:
381
        """Test disable INT metadata args."""
382 1
        controller = MagicMock()
383 1
        api_mock = AsyncMock()
384 1
        monkeypatch.setattr("napps.kytos.telemetry_int.managers.int.api", api_mock)
385
386 1
        int_manager = INTManager(controller)
387 1
        int_manager._remove_int_flows_by_cookies = AsyncMock()
388 1
        await int_manager.disable_int({}, False)
389
390 1
        assert api_mock.add_evcs_metadata.call_count == 1
391 1
        args = api_mock.add_evcs_metadata.call_args[0]
392 1
        assert args[0] == {}
393 1
        assert "telemetry" in args[1]
394 1
        telemetry_dict = args[1]["telemetry"]
395 1
        expected_keys = ["enabled", "status", "status_reason", "status_updated_at"]
396 1
        assert sorted(list(telemetry_dict.keys())) == sorted(expected_keys)
397
398 1
        assert not telemetry_dict["enabled"]
399 1
        assert telemetry_dict["status"] == "DOWN"
400 1
        assert telemetry_dict["status_reason"] == ["disabled"]
401
402 1
        assert args[2] is False
403
404 1
    async def test_enable_int_metadata(self, monkeypatch) -> None:
405
        """Test enable INT metadata args."""
406 1
        controller = MagicMock()
407 1
        api_mock = AsyncMock()
408 1
        stored_flows_mock = AsyncMock()
409 1
        monkeypatch.setattr("napps.kytos.telemetry_int.managers.int.api", api_mock)
410 1
        monkeypatch.setattr(
411
            "napps.kytos.telemetry_int.utils.get_found_stored_flows", stored_flows_mock
412
        )
413
414 1
        int_manager = INTManager(controller)
415 1
        int_manager._remove_int_flows_by_cookies = AsyncMock()
416 1
        evcs = {
417
            "3766c105686749": {
418
                "active": True,
419
                "uni_a": MagicMock(),
420
                "uni_z": MagicMock(),
421
            }
422
        }
423 1
        int_manager._validate_map_enable_evcs = MagicMock()
424 1
        int_manager._validate_map_enable_evcs.return_value = evcs
425 1
        int_manager.flow_builder.build_int_flows = MagicMock()
426 1
        int_manager.flow_builder.build_int_flows.return_value = {
427
            0xAA3766C105686749: [MagicMock()]
428
        }
429 1
        int_manager._add_pps_evc_ids = MagicMock()
430 1
        int_manager._send_flows = AsyncMock()
431
432 1
        await int_manager.enable_int(evcs, False)
433
434 1
        assert stored_flows_mock.call_count == 1
435 1
        assert int_manager._remove_int_flows_by_cookies.call_count == 1
436 1
        assert api_mock.add_evcs_metadata.call_count == 3
437 1
        args = api_mock.add_evcs_metadata.call_args[0]
438 1
        assert "telemetry" in args[1]
439 1
        telemetry_dict = args[1]["telemetry"]
440 1
        expected_keys = ["enabled", "status", "status_reason", "status_updated_at"]
441 1
        assert sorted(list(telemetry_dict.keys())) == sorted(expected_keys)
442 1
        assert int_manager._send_flows.call_count == 1
443
444 1
        assert telemetry_dict["enabled"] is True
445 1
        assert telemetry_dict["status"] == "UP"
446 1
        assert telemetry_dict["status_reason"] == []
447
448 1
    async def test_redeploy_int(self, monkeypatch) -> None:
449
        """Test redeploy int."""
450 1
        controller = MagicMock()
451 1
        api_mock = AsyncMock()
452 1
        stored_flows_mock = AsyncMock()
453 1
        monkeypatch.setattr("napps.kytos.telemetry_int.managers.int.api", api_mock)
454 1
        monkeypatch.setattr(
455
            "napps.kytos.telemetry_int.utils.get_found_stored_flows", stored_flows_mock
456
        )
457
458 1
        int_manager = INTManager(controller)
459 1
        int_manager._remove_int_flows_by_cookies = AsyncMock()
460 1
        int_manager._install_int_flows = AsyncMock()
461
462 1
        dpid_a = "00:00:00:00:00:00:00:01"
463 1
        intf_id_a = f"{dpid_a}:1"
464 1
        intf_id_z = f"{dpid_a}:2"
465 1
        evc_id = "3766c105686749"
466 1
        evcs = {
467
            evc_id: {
468
                "metadata": {"telemetry": {"enabled": True}},
469
                "uni_a": {"interface_id": intf_id_a},
470
                "uni_z": {"interface_id": intf_id_z},
471
            }
472
        }
473 1
        int_manager._validate_map_enable_evcs = MagicMock()
474 1
        await int_manager.redeploy_int(evcs)
475
476 1
        assert stored_flows_mock.call_count == 1
477 1
        assert int_manager._remove_int_flows_by_cookies.call_count == 1
478 1
        assert api_mock.get_stored_flows.call_count == 1
479 1
        assert int_manager._install_int_flows.call_count == 1
480
481 1
    def test_validate_intra_evc_different_proxy_ports(self) -> None:
482
        """Test _validate_intra_evc_different_proxy_ports."""
483 1
        pp_a, pp_z, controller = MagicMock(), MagicMock(), MagicMock()
484 1
        evc = {
485
            "id": "some_id",
486
            "uni_a": {"proxy_port": pp_a, "interface_id": "00:00:00:00:00:00:00:01:1"},
487
            "uni_z": {"proxy_port": pp_z, "interface_id": "00:00:00:00:00:00:00:01:2"},
488
        }
489
490 1
        int_manager = INTManager(controller)
491 1
        int_manager._validate_intra_evc_different_proxy_ports(evc)
492
493 1
        source = MagicMock()
494 1
        pp_a.source, pp_z.source = source, source
495 1
        with pytest.raises(ProxyPortSameSourceIntraEVC):
496 1
            int_manager._validate_intra_evc_different_proxy_ports(evc)
497
498 1
    def test_validate_dedicated_proxy_port_evcs(self) -> None:
499
        """Test _validate_intra_evc_different_proxy_ports."""
500 1
        pp_a, pp_z, controller = MagicMock(), MagicMock(), MagicMock()
501 1
        evc = {
502
            "id": "some_id",
503
            "uni_a": {"proxy_port": pp_a, "interface_id": "00:00:00:00:00:00:00:01:1"},
504
            "uni_z": {"proxy_port": pp_z, "interface_id": "00:00:00:00:00:00:00:01:2"},
505
        }
506
507 1
        int_manager = INTManager(controller)
508 1
        int_manager._validate_dedicated_proxy_port_evcs({evc["id"]: evc})
509
510 1
        source = MagicMock()
511 1
        pp_a.source, pp_z.source = source, source
512 1
        with pytest.raises(ProxyPortShared):
513 1
            int_manager._validate_dedicated_proxy_port_evcs({evc["id"]: evc})
514
515 1
    def test_validate_proxy_ports_symmetry_inter_evc(self) -> None:
516
        """Test _validate_proxy_ports_symmetry for inter evc."""
517 1
        evc = {
518
            "id": "some_id",
519
            "uni_a": {"interface_id": "00:00:00:00:00:00:00:01:1"},
520
            "uni_z": {"interface_id": "00:00:00:00:00:00:00:03:1"},
521
        }
522
523 1
        controller = MagicMock()
524 1
        int_manager = INTManager(controller)
525
526
        # no proxy ports case
527 1
        int_manager._validate_proxy_ports_symmetry(evc)
528
529
        # one proxy port, asymmetric case
530 1
        pp_a = MagicMock()
531 1
        evc["uni_a"]["proxy_port"] = pp_a
532 1
        with pytest.raises(exceptions.ProxyPortAsymmetric):
533 1
            int_manager._validate_proxy_ports_symmetry(evc)
534
535
        # one proxy port, still asymmetric case
536 1
        pp_z = MagicMock()
537 1
        evc["uni_a"].pop("proxy_port")
538 1
        evc["uni_z"]["proxy_port"] = pp_z
539 1
        with pytest.raises(exceptions.ProxyPortAsymmetric):
540 1
            int_manager._validate_proxy_ports_symmetry(evc)
541
542
        # symmetric case
543 1
        evc["uni_a"]["proxy_port"] = pp_a
544 1
        int_manager._validate_proxy_ports_symmetry(evc)
545
546
        # cover ProxyPortRequired for inter EVC with metadata
547 1
        evc["uni_a"].pop("proxy_port")
548 1
        evc["uni_z"].pop("proxy_port")
549 1
        evc["metadata"] = {"proxy_port_enabled": True}
550 1
        with pytest.raises(exceptions.ProxyPortRequired) as exc:
551 1
            int_manager._validate_proxy_ports_symmetry(evc)
552 1
        assert "proxy_port_enabled" in str(exc)
553
554 1
    def test_validate_proxy_ports_symmetry_intra_evc(self) -> None:
555
        """Test _validate_proxy_ports_symmetry intra evc."""
556 1
        evc = {
557
            "id": "some_id",
558
            "uni_a": {"interface_id": "00:00:00:00:00:00:00:01:1"},
559
            "uni_z": {"interface_id": "00:00:00:00:00:00:00:01:2"},
560
        }
561
562 1
        controller = MagicMock()
563 1
        int_manager = INTManager(controller)
564
565
        # no proxy ports case
566 1
        with pytest.raises(exceptions.ProxyPortRequired) as exc:
567 1
            int_manager._validate_proxy_ports_symmetry(evc)
568 1
        assert "intra-EVC must use proxy ports" in str(exc)
569
570
        # one proxy port, asymmetric case
571 1
        pp_a = MagicMock()
572 1
        evc["uni_a"]["proxy_port"] = pp_a
573 1
        with pytest.raises(exceptions.ProxyPortAsymmetric):
574 1
            int_manager._validate_proxy_ports_symmetry(evc)
575
576
        # one proxy port, still asymmetric case
577 1
        pp_z = MagicMock()
578 1
        evc["uni_a"].pop("proxy_port")
579 1
        evc["uni_z"]["proxy_port"] = pp_z
580 1
        with pytest.raises(exceptions.ProxyPortAsymmetric):
581 1
            int_manager._validate_proxy_ports_symmetry(evc)
582
583
        # symmetric case
584 1
        evc["uni_a"]["proxy_port"] = pp_a
585 1
        int_manager._validate_proxy_ports_symmetry(evc)
586
587 1
    def test_validate_dedicated_proxy_port_evcs_existing(self) -> None:
588
        """Test _validate_intra_evc_different_proxy_ports existing."""
589 1
        pp_a, pp_z, controller = MagicMock(), MagicMock(), MagicMock()
590 1
        evc = {
591
            "id": "some_id",
592
            "uni_a": {"proxy_port": pp_a, "interface_id": "00:00:00:00:00:00:00:01:1"},
593
            "uni_z": {"proxy_port": pp_z, "interface_id": "00:00:00:00:00:00:00:01:2"},
594
        }
595
596 1
        int_manager = INTManager(controller)
597 1
        int_manager.unis_src["00:00:00:00:00:00:00:01:3"] = pp_a.source.id
598 1
        with pytest.raises(ProxyPortShared):
599 1
            int_manager._validate_dedicated_proxy_port_evcs({evc["id"]: evc})
600
601 1
    async def test__remove_int_flows_by_cookies(
602
        self, inter_evc_evpl_flows_data
603
    ) -> None:
604
        """test _remove_int_flows_by_cookies."""
605 1
        controller = get_controller_mock()
606 1
        controller._buffers.app.aput = AsyncMock()
607 1
        int_manager = INTManager(controller)
608 1
        assert len(inter_evc_evpl_flows_data) == 3
609 1
        res = await int_manager._remove_int_flows_by_cookies(inter_evc_evpl_flows_data)
610 1
        assert len(res) == 3
611 1
        for flows in res.values():
612 1
            for flow in flows:
613 1
                assert "cookie_mask" in flow
614 1
                assert flow["cookie_mask"] == int(0xFFFFFFFFFFFFFFFF)
615 1
                assert flow["table_id"] == 0xFF
616 1
                assert flow["owner"] == "telemetry_int"
617 1
        assert controller._buffers.app.aput.call_count == 3
618
619 1
    async def test__remove_int_flows(self, inter_evc_evpl_flows_data) -> None:
620
        """test _remove_int_flows."""
621 1
        controller = get_controller_mock()
622 1
        controller._buffers.app.aput = AsyncMock()
623 1
        int_manager = INTManager(controller)
624 1
        assert len(inter_evc_evpl_flows_data) == 3
625 1
        res = await int_manager._remove_int_flows(inter_evc_evpl_flows_data)
626 1
        assert len(res) == 3
627 1
        assert controller._buffers.app.aput.call_count == 3
628
629 1
    async def test__install_int_flows(self, inter_evc_evpl_flows_data, monkeypatch):
630
        """test__install_int_flows."""
631 1
        sleep_mock = AsyncMock()
632 1
        monkeypatch.setattr("asyncio.sleep", sleep_mock)
633 1
        controller = get_controller_mock()
634 1
        controller._buffers.app.aput = AsyncMock()
635 1
        int_manager = INTManager(controller)
636 1
        assert len(inter_evc_evpl_flows_data) == 3
637 1
        res = await int_manager._install_int_flows(inter_evc_evpl_flows_data)
638 1
        assert len(res) == 3
639 1
        assert controller._buffers.app.aput.call_count == 3
640 1
        assert sleep_mock.call_count == 0
641
642 1
    def test__add_pps_evc_ids(self):
643
        """test_add_pps_evc_ids."""
644 1
        dpid_a = "00:00:00:00:00:00:00:01"
645 1
        intf_id_a = f"{dpid_a}:1"
646 1
        intf_id_z = f"{dpid_a}:2"
647 1
        evc_id = "3766c105686749"
648 1
        evcs = {
649
            evc_id: {
650
                "metadata": {"telemetry": {"enabled": True}},
651
                "uni_a": {"interface_id": intf_id_a},
652
                "uni_z": {"interface_id": intf_id_z},
653
            }
654
        }
655 1
        controller = get_controller_mock()
656 1
        int_manager = INTManager(controller)
657 1
        pp = MagicMock()
658 1
        mock = MagicMock()
659 1
        int_manager.get_proxy_port_or_raise = mock
660 1
        mock.return_value = pp
661 1
        int_manager._add_pps_evc_ids(evcs)
662 1
        assert int_manager.get_proxy_port_or_raise.call_count == 2
663 1
        assert pp.evc_ids.add.call_count == 2
664 1
        pp.evc_ids.add.assert_called_with(evc_id)
665
666 1
    def test__discard_pps_evc_ids(self):
667
        """test_discard_pps_evc_ids."""
668 1
        dpid_a = "00:00:00:00:00:00:00:01"
669 1
        intf_id_a = f"{dpid_a}:1"
670 1
        intf_id_z = f"{dpid_a}:2"
671 1
        evc_id = "3766c105686749"
672 1
        evcs = {
673
            evc_id: {
674
                "metadata": {"telemetry": {"enabled": True}},
675
                "uni_a": {"interface_id": intf_id_a},
676
                "uni_z": {"interface_id": intf_id_z},
677
            }
678
        }
679 1
        controller = get_controller_mock()
680 1
        int_manager = INTManager(controller)
681 1
        pp = MagicMock()
682 1
        int_manager.unis_src[intf_id_a] = "a"
683 1
        int_manager.unis_src[intf_id_z] = "z"
684 1
        int_manager.srcs_pp[int_manager.unis_src[intf_id_a]] = pp
685 1
        int_manager.srcs_pp[int_manager.unis_src[intf_id_z]] = pp
686 1
        int_manager._discard_pps_evc_ids(evcs)
687 1
        assert pp.evc_ids.discard.call_count == 2
688 1
        pp.evc_ids.discard.assert_called_with(evc_id)
689
690 1
    def test_validate_evc_stored_flows(self) -> None:
691
        """Test validate evc stored flows."""
692 1
        controller = MagicMock()
693 1
        int_manager = INTManager(controller)
694 1
        evcs = {
695
            "3766c105686749": {
696
                "active": True,
697
                "uni_a": MagicMock(),
698
                "uni_z": MagicMock(),
699
            }
700
        }
701 1
        stored_flows = {0xAA3766C105686749: [MagicMock()]}
702 1
        int_manager._validate_evcs_stored_flows(evcs, stored_flows)
703
704 1
        with pytest.raises(exceptions.FlowsNotFound):
705 1
            int_manager._validate_evcs_stored_flows(evcs, {0xAA3766C105686749: []})
706
707 1
        with pytest.raises(exceptions.FlowsNotFound):
708 1
            int_manager._validate_evcs_stored_flows(evcs, {})
709
710 1
        evcs["3766c105686749"]["active"] = False
711 1
        int_manager._validate_evcs_stored_flows(evcs, {})
712
713 1
    async def test__send_flows(self) -> None:
714
        """Test _send_flows."""
715 1
        controller = get_controller_mock()
716 1
        controller._buffers.app.aput = AsyncMock()
717 1
        int_manager = INTManager(controller)
718 1
        switch_flows = {"dpid": []}
719 1
        await int_manager._send_flows(switch_flows, "install")
720 1
        controller._buffers.app.aput.assert_not_called()
721
722 1
        switch_flows = {"dpid": [MagicMock()]}
723 1
        await int_manager._send_flows(switch_flows, "install")
724
        controller._buffers.app.aput.assert_called()
725