Passed
Push — master ( 4bf6f4...81fe6d )
by Vinicius
02:40 queued 17s
created

TestINTManager.test__remove_int_flows()   A

Complexity

Conditions 1

Size

Total Lines 9
Code Lines 8

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 8
CRAP Score 1

Importance

Changes 0
Metric Value
cc 1
eloc 8
nop 2
dl 0
loc 9
ccs 8
cts 8
cp 1
crap 1
rs 10
c 0
b 0
f 0
1
"""Test INTManager"""
2
3 1
import pytest
4
5 1
from unittest.mock import AsyncMock, MagicMock
6 1
from napps.kytos.telemetry_int.exceptions import ProxyPortSameSourceIntraEVC
7 1
from napps.kytos.telemetry_int.managers.int import INTManager
8 1
from napps.kytos.telemetry_int import exceptions
9 1
from kytos.core.common import EntityStatus
10
11 1
from kytos.lib.helpers import (
12
    get_interface_mock,
13
    get_controller_mock,
14
    get_switch_mock,
15
)
16
17
18 1
class TestINTManager:
19
    """TestINTManager."""
20
21 1
    def test_get_proxy_port_or_raise(self) -> None:
22
        """Test proxy_port_or_raise."""
23 1
        dpid_a = "00:00:00:00:00:00:00:01"
24 1
        mock_switch_a = get_switch_mock(dpid_a, 0x04)
25 1
        mock_interface_a = get_interface_mock("s1-eth1", 1, mock_switch_a)
26 1
        mock_interface_a.metadata = {}
27 1
        intf_id = f"{dpid_a}:1"
28 1
        controller = get_controller_mock()
29 1
        evc_id = "3766c105686749"
30 1
        int_manager = INTManager(controller)
31
32
        # Initially the mocked interface and switch hasn't been associated in the ctrllr
33 1
        with pytest.raises(exceptions.ProxyPortNotFound) as exc:
34 1
            int_manager.get_proxy_port_or_raise(intf_id, evc_id)
35 1
        assert f"interface {intf_id} not found" in str(exc)
36
37
        # Now, proxy_port still hasn't been set yet
38 1
        controller.get_interface_by_id = lambda x: mock_interface_a
39 1
        with pytest.raises(exceptions.ProxyPortNotFound) as exc:
40 1
            int_manager.get_proxy_port_or_raise(intf_id, evc_id)
41 1
        assert f"proxy_port metadata not found in {intf_id}" in str(exc)
42
43
        # Now, destination interface hasn't been mocked yet
44 1
        mock_interface_a.metadata = {"proxy_port": 5}
45 1
        with pytest.raises(exceptions.ProxyPortDestNotFound) as exc:
46 1
            int_manager.get_proxy_port_or_raise(intf_id, evc_id)
47 1
        assert "destination interface not found" in str(exc)
48
49 1
        mock_interface_b = get_interface_mock("s1-eth5", 5, mock_switch_a)
50 1
        mock_interface_b.metadata = {"looped": {"port_numbers": [5, 6]}}
51 1
        mock_interface_a.switch.get_interface_by_port_no = lambda x: mock_interface_b
52
        # Now all dependencies have been mocked and it should get the ProxyPort
53 1
        pp = int_manager.get_proxy_port_or_raise(intf_id, evc_id)
54 1
        assert pp.source == mock_interface_b
55
56 1
    def test_load_uni_src_proxy_port(self) -> None:
57
        """Test test_load_uni_src_proxy_port."""
58 1
        dpid_a = "00:00:00:00:00:00:00:01"
59 1
        mock_switch_a = get_switch_mock(dpid_a, 0x04)
60 1
        mock_interface_a = get_interface_mock("s1-eth1", 1, mock_switch_a)
61 1
        mock_interface_a.metadata = {"proxy_port": 3}
62 1
        mock_interface_z = get_interface_mock("s1-eth2", 2, mock_switch_a)
63 1
        mock_interface_z.metadata = {"proxy_port": 5}
64 1
        intf_id_a = f"{dpid_a}:1"
65 1
        intf_id_z = f"{dpid_a}:2"
66 1
        intf_id_a_1 = f"{dpid_a}:3"
67 1
        intf_id_z_1 = f"{dpid_a}:5"
68
69 1
        mock_interface_a_1 = get_interface_mock("s1-eth3", 3, mock_switch_a)
70 1
        mock_interface_a_1.metadata = {"looped": {"port_numbers": [3, 4]}}
71 1
        mock_interface_a_2 = get_interface_mock("s1-eth4", 4, mock_switch_a)
72 1
        mock_interface_z_1 = get_interface_mock("s1-eth5", 5, mock_switch_a)
73 1
        mock_interface_z_1.metadata = {"looped": {"port_numbers": [5, 6]}}
74 1
        mock_interface_z_2 = get_interface_mock("s1-eth6", 6, mock_switch_a)
75
76 1
        def get_interface_by_port_no(port_no):
77 1
            data = {
78
                1: mock_interface_a,
79
                2: mock_interface_z,
80
                3: mock_interface_a_1,
81
                4: mock_interface_a_2,
82
                5: mock_interface_z_1,
83
                6: mock_interface_z_2,
84
            }
85 1
            return data[port_no]
86
87 1
        def get_interface_by_id(intf_id):
88 1
            data = {
89
                intf_id_a: mock_interface_a,
90
                intf_id_z: mock_interface_z,
91
            }
92 1
            return data[intf_id]
93
94 1
        controller = get_controller_mock()
95 1
        mock_switch_a.get_interface_by_port_no = get_interface_by_port_no
96 1
        controller.get_interface_by_id = get_interface_by_id
97
98 1
        evcs = {
99
            "3766c105686749": {
100
                "metadata": {"telemetry": {"enabled": True}},
101
                "uni_a": {"interface_id": intf_id_a},
102
                "uni_z": {"interface_id": intf_id_z},
103
            },
104
            "3766c105686748": {
105
                "metadata": {"telemetry": {"enabled": True}},
106
                "uni_a": {"interface_id": intf_id_a},
107
                "uni_z": {"interface_id": intf_id_z},
108
            },
109
            "3766c105686747": {
110
                "metadata": {"telemetry": {"enabled": False}},
111
                "uni_a": {"interface_id": intf_id_a},
112
                "uni_z": {"interface_id": intf_id_z},
113
            },
114
        }
115 1
        int_manager = INTManager(controller)
116 1
        int_manager.load_uni_src_proxy_ports(evcs)
117 1
        assert len(int_manager.unis_src) == 2
118 1
        assert int_manager.unis_src[intf_id_a] == intf_id_a_1
119 1
        assert int_manager.unis_src[intf_id_z] == intf_id_z_1
120
121 1
        assert len(int_manager.srcs_pp) == 2
122 1
        assert int_manager.srcs_pp[intf_id_a_1].source == mock_interface_a_1
123 1
        assert int_manager.srcs_pp[intf_id_a_1].destination == mock_interface_a_2
124 1
        assert int_manager.srcs_pp[intf_id_z_1].source == mock_interface_z_1
125 1
        assert int_manager.srcs_pp[intf_id_z_1].destination == mock_interface_z_2
126
127 1
        assert int_manager.srcs_pp[intf_id_a_1].evc_ids == {
128
            "3766c105686749",
129
            "3766c105686748",
130
        }
131 1
        assert int_manager.srcs_pp[intf_id_z_1].evc_ids == {
132
            "3766c105686749",
133
            "3766c105686748",
134
        }
135
136 1
    async def test_handle_pp_link_down(self, monkeypatch):
137
        """Test test_handle_pp_link_down."""
138 1
        int_manager = INTManager(MagicMock())
139 1
        api_mock, link_mock, pp_mock = AsyncMock(), MagicMock(), MagicMock()
140 1
        link_mock.endpoint_a.id = "some_intf_id"
141 1
        evc_id = "3766c105686748"
142 1
        int_manager.srcs_pp[link_mock.endpoint_a.id] = pp_mock
143 1
        pp_mock.evc_ids = {evc_id}
144
145 1
        monkeypatch.setattr("napps.kytos.telemetry_int.managers.int.api", api_mock)
146 1
        api_mock.get_evcs.return_value = {evc_id: {}}
147 1
        int_manager.remove_int_flows = AsyncMock()
148
149 1
        await int_manager.handle_pp_link_down(link_mock)
150 1
        assert api_mock.get_evcs.call_count == 1
151 1
        assert api_mock.get_evcs.call_count == 1
152 1
        assert api_mock.get_evcs.call_args[1] == {
153
            "metadata.telemetry.enabled": "true",
154
            "metadata.telemetry.status": "UP",
155
        }
156 1
        assert int_manager.remove_int_flows.call_count == 1
157 1
        args = int_manager.remove_int_flows.call_args[0]
158 1
        assert evc_id in args[0]
159 1
        assert "telemetry" in args[1]
160 1
        telemetry = args[1]["telemetry"]
161 1
        assert telemetry["enabled"]
162 1
        assert telemetry["status"] == "DOWN"
163 1
        assert telemetry["status_reason"] == ["proxy_port_down"]
164 1
        assert "status_updated_at" in telemetry
165
166 1
    async def test_handle_pp_link_up(self, monkeypatch):
167
        """Test handle_pp_link_up."""
168 1
        int_manager = INTManager(MagicMock())
169 1
        api_mock, link_mock, pp_mock = AsyncMock(), MagicMock(), MagicMock()
170 1
        link_mock.endpoint_a.id = "3"
171 1
        pp_mock.status = EntityStatus.UP
172 1
        link_mock.status = EntityStatus.UP
173 1
        link_mock.status_reason = []
174 1
        evc_id = "3766c105686748"
175 1
        uni_a_id, uni_z_id = "1", "2"
176 1
        src_a_id, src_z_id = "3", "5"
177 1
        int_manager.srcs_pp[src_a_id] = pp_mock
178 1
        int_manager.srcs_pp[src_z_id] = pp_mock
179 1
        int_manager.unis_src[uni_a_id] = src_a_id
180 1
        int_manager.unis_src[uni_z_id] = src_z_id
181 1
        pp_mock.evc_ids = {evc_id}
182
183 1
        monkeypatch.setattr("napps.kytos.telemetry_int.managers.int.api", api_mock)
184 1
        api_mock.get_evcs.return_value = {
185
            evc_id: {
186
                "active": True,
187
                "archived": False,
188
                "uni_a": {"interface_id": uni_a_id},
189
                "uni_z": {"interface_id": uni_z_id},
190
            }
191
        }
192 1
        int_manager.install_int_flows = AsyncMock()
193 1
        int_manager._validate_map_enable_evcs = MagicMock()
194
195 1
        await int_manager.handle_pp_link_up(link_mock)
196 1
        assert api_mock.get_evcs.call_count == 1
197 1
        assert api_mock.get_evcs.call_args[1] == {
198
            "metadata.telemetry.enabled": "true",
199
            "metadata.telemetry.status": "DOWN",
200
        }
201 1
        assert int_manager.install_int_flows.call_count == 1
202 1
        args = int_manager.install_int_flows.call_args[0]
203 1
        assert "telemetry" in args[1]
204 1
        telemetry_dict = args[1]["telemetry"]
205 1
        expected_keys = ["enabled", "status", "status_reason", "status_updated_at"]
206 1
        assert sorted(list(telemetry_dict.keys())) == sorted(expected_keys)
207 1
        assert telemetry_dict["enabled"]
208 1
        assert telemetry_dict["status"] == "UP"
209 1
        assert not telemetry_dict["status_reason"]
210
211 1
    async def test_handle_pp_metadata_removed(self, monkeypatch):
212
        """Test handle_pp_metadata_removed."""
213 1
        int_manager = INTManager(MagicMock())
214 1
        api_mock, intf_mock, pp_mock = AsyncMock(), MagicMock(), MagicMock()
215 1
        intf_mock.id = "some_intf_id"
216 1
        source_id = "some_source_id"
217 1
        evc_id = "3766c105686748"
218 1
        int_manager.unis_src[intf_mock.id] = source_id
219 1
        int_manager.srcs_pp[source_id] = pp_mock
220 1
        pp_mock.evc_ids = {evc_id}
221
222 1
        assert "proxy_port" not in intf_mock.metadata
223 1
        monkeypatch.setattr("napps.kytos.telemetry_int.managers.int.api", api_mock)
224 1
        api_mock.get_evcs.return_value = {evc_id: {}}
225 1
        int_manager.remove_int_flows = AsyncMock()
226
227 1
        await int_manager.handle_pp_metadata_removed(intf_mock)
228 1
        assert api_mock.get_evcs.call_count == 1
229 1
        assert api_mock.get_evcs.call_count == 1
230 1
        assert api_mock.get_evcs.call_args[1] == {
231
            "metadata.telemetry.enabled": "true",
232
            "metadata.telemetry.status": "UP",
233
        }
234 1
        assert int_manager.remove_int_flows.call_count == 1
235 1
        args = int_manager.remove_int_flows.call_args[0]
236 1
        assert evc_id in args[0]
237 1
        assert "telemetry" in args[1]
238 1
        telemetry = args[1]["telemetry"]
239 1
        assert telemetry["enabled"]
240 1
        assert telemetry["status"] == "DOWN"
241 1
        assert telemetry["status_reason"] == ["proxy_port_metadata_removed"]
242 1
        assert "status_updated_at" in telemetry
243
244 1
    async def test_handle_pp_metadata_added(self, monkeypatch):
245
        """Test handle_pp_metadata_added."""
246 1
        int_manager = INTManager(MagicMock())
247 1
        api_mock, intf_mock, pp_mock = AsyncMock(), MagicMock(), MagicMock()
248 1
        intf_mock.id = "some_intf_id"
249 1
        source_id, source_port = "some_source_id", 2
250 1
        intf_mock.metadata = {"proxy_port": source_port}
251 1
        evc_id = "3766c105686748"
252 1
        int_manager.unis_src[intf_mock.id] = source_id
253 1
        int_manager.srcs_pp[source_id] = pp_mock
254 1
        pp_mock.evc_ids = {evc_id}
255
256 1
        assert "proxy_port" in intf_mock.metadata
257 1
        monkeypatch.setattr("napps.kytos.telemetry_int.managers.int.api", api_mock)
258 1
        api_mock.get_evcs.return_value = {evc_id: {}}
259 1
        int_manager.disable_int = AsyncMock()
260 1
        int_manager.enable_int = AsyncMock()
261
262 1
        await int_manager.handle_pp_metadata_added(intf_mock)
263 1
        assert api_mock.get_evcs.call_count == 1
264 1
        assert api_mock.get_evcs.call_count == 1
265 1
        assert api_mock.get_evcs.call_args[1] == {"metadata.telemetry.enabled": "true"}
266 1
        assert int_manager.disable_int.call_count == 1
267 1
        assert int_manager.enable_int.call_count == 1
268
269 1
    async def test_handle_pp_metadata_added_no_change(self, monkeypatch):
270
        """Test handle_pp_metadata_added no change."""
271 1
        int_manager = INTManager(MagicMock())
272 1
        api_mock, intf_mock, pp_mock = AsyncMock(), MagicMock(), MagicMock()
273 1
        intf_mock.id = "some_intf_id"
274 1
        source_id, source_port = "some_source_id", 2
275 1
        source_intf = MagicMock()
276 1
        intf_mock.metadata = {"proxy_port": source_port}
277 1
        evc_id = "3766c105686748"
278 1
        int_manager.unis_src[intf_mock.id] = source_id
279 1
        int_manager.srcs_pp[source_id] = pp_mock
280 1
        pp_mock.evc_ids = {evc_id}
281
282
        # Simulating that the current and new proxy_port source are the same
283 1
        pp_mock.source = source_intf
284 1
        intf_mock.switch.get_interface_by_port_no.return_value = source_intf
285
286 1
        assert "proxy_port" in intf_mock.metadata
287 1
        monkeypatch.setattr("napps.kytos.telemetry_int.managers.int.api", api_mock)
288 1
        api_mock.get_evcs.return_value = {evc_id: {}}
289 1
        int_manager.disable_int = AsyncMock()
290 1
        int_manager.enable_int = AsyncMock()
291
292 1
        await int_manager.handle_pp_metadata_added(intf_mock)
293 1
        assert not api_mock.get_evcs.call_count
294 1
        assert not int_manager.disable_int.call_count
295 1
        assert not int_manager.enable_int.call_count
296
297 1
    async def test_handle_pp_metadata_added_no_affected(self, monkeypatch):
298
        """Test handle_pp_metadata_added no affected evcs."""
299 1
        int_manager = INTManager(MagicMock())
300 1
        api_mock, intf_mock, pp_mock = AsyncMock(), MagicMock(), MagicMock()
301 1
        intf_mock.id = "some_intf_id"
302 1
        source_id, source_port = "some_source_id", 2
303 1
        intf_mock.metadata = {"proxy_port": source_port}
304 1
        evc_id = "3766c105686748"
305 1
        int_manager.unis_src[intf_mock.id] = source_id
306 1
        int_manager.srcs_pp[source_id] = pp_mock
307 1
        pp_mock.evc_ids = {evc_id}
308
309 1
        assert "proxy_port" in intf_mock.metadata
310 1
        monkeypatch.setattr("napps.kytos.telemetry_int.managers.int.api", api_mock)
311
312
        # Simulating returning no EVCs that were enabled and UP
313 1
        api_mock.get_evcs.return_value = {}
314 1
        int_manager.disable_int = AsyncMock()
315 1
        int_manager.enable_int = AsyncMock()
316
317 1
        await int_manager.handle_pp_metadata_added(intf_mock)
318 1
        assert api_mock.get_evcs.call_count == 1
319 1
        assert api_mock.get_evcs.call_count == 1
320 1
        assert api_mock.get_evcs.call_args[1] == {
321
            "metadata.telemetry.enabled": "true",
322
        }
323 1
        assert not int_manager.disable_int.call_count
324 1
        assert not int_manager.enable_int.call_count
325
326 1
    async def test_disable_int_metadata(self, monkeypatch) -> None:
327
        """Test disable INT metadata args."""
328 1
        controller = MagicMock()
329 1
        api_mock = AsyncMock()
330 1
        monkeypatch.setattr("napps.kytos.telemetry_int.managers.int.api", api_mock)
331
332 1
        int_manager = INTManager(controller)
333 1
        int_manager._remove_int_flows_by_cookies = AsyncMock()
334 1
        await int_manager.disable_int({}, False)
335
336 1
        assert api_mock.add_evcs_metadata.call_count == 1
337 1
        args = api_mock.add_evcs_metadata.call_args[0]
338 1
        assert args[0] == {}
339 1
        assert "telemetry" in args[1]
340 1
        telemetry_dict = args[1]["telemetry"]
341 1
        expected_keys = ["enabled", "status", "status_reason", "status_updated_at"]
342 1
        assert sorted(list(telemetry_dict.keys())) == sorted(expected_keys)
343
344 1
        assert not telemetry_dict["enabled"]
345 1
        assert telemetry_dict["status"] == "DOWN"
346 1
        assert telemetry_dict["status_reason"] == ["disabled"]
347
348 1
        assert args[2] is False
349
350 1
    async def test_enable_int_metadata(self, monkeypatch) -> None:
351
        """Test enable INT metadata args."""
352 1
        controller = MagicMock()
353 1
        api_mock = AsyncMock()
354 1
        stored_flows_mock = AsyncMock()
355 1
        monkeypatch.setattr("napps.kytos.telemetry_int.managers.int.api", api_mock)
356 1
        monkeypatch.setattr(
357
            "napps.kytos.telemetry_int.utils.get_found_stored_flows", stored_flows_mock
358
        )
359
360 1
        int_manager = INTManager(controller)
361 1
        int_manager.remove_int_flows = AsyncMock()
362 1
        evcs = {
363
            "3766c105686749": {
364
                "active": True,
365
                "uni_a": MagicMock(),
366
                "uni_z": MagicMock(),
367
            }
368
        }
369 1
        int_manager._validate_map_enable_evcs = MagicMock()
370 1
        int_manager._validate_map_enable_evcs.return_value = evcs
371 1
        int_manager.flow_builder.build_int_flows = MagicMock()
372 1
        int_manager.flow_builder.build_int_flows.return_value = {
373
            0xAA3766C105686749: [MagicMock()]
374
        }
375 1
        int_manager._add_pps_evc_ids = MagicMock()
376 1
        int_manager._send_flows = AsyncMock()
377
378 1
        await int_manager.enable_int(evcs, False)
379
380 1
        assert stored_flows_mock.call_count == 1
381 1
        assert api_mock.add_evcs_metadata.call_count == 3
382 1
        args = api_mock.add_evcs_metadata.call_args[0]
383 1
        assert "telemetry" in args[1]
384 1
        telemetry_dict = args[1]["telemetry"]
385 1
        expected_keys = ["enabled", "status", "status_reason", "status_updated_at"]
386 1
        assert sorted(list(telemetry_dict.keys())) == sorted(expected_keys)
387 1
        assert int_manager._send_flows.call_count == 1
388
389 1
        assert telemetry_dict["enabled"] is True
390 1
        assert telemetry_dict["status"] == "UP"
391 1
        assert telemetry_dict["status_reason"] == []
392
393 1
    async def test_redeploy_int(self, monkeypatch) -> None:
394
        """Test redeploy int."""
395 1
        controller = MagicMock()
396 1
        api_mock = AsyncMock()
397 1
        stored_flows_mock = AsyncMock()
398 1
        monkeypatch.setattr("napps.kytos.telemetry_int.managers.int.api", api_mock)
399 1
        monkeypatch.setattr(
400
            "napps.kytos.telemetry_int.utils.get_found_stored_flows", stored_flows_mock
401
        )
402
403 1
        int_manager = INTManager(controller)
404 1
        int_manager._remove_int_flows_by_cookies = AsyncMock()
405 1
        int_manager._install_int_flows = AsyncMock()
406
407 1
        dpid_a = "00:00:00:00:00:00:00:01"
408 1
        intf_id_a = f"{dpid_a}:1"
409 1
        intf_id_z = f"{dpid_a}:2"
410 1
        evc_id = "3766c105686749"
411 1
        evcs = {
412
            evc_id: {
413
                "metadata": {"telemetry": {"enabled": True}},
414
                "uni_a": {"interface_id": intf_id_a},
415
                "uni_z": {"interface_id": intf_id_z},
416
            }
417
        }
418 1
        int_manager._validate_map_enable_evcs = MagicMock()
419 1
        await int_manager.redeploy_int(evcs)
420
421 1
        assert stored_flows_mock.call_count == 1
422 1
        assert int_manager._remove_int_flows_by_cookies.call_count == 1
423 1
        assert api_mock.get_stored_flows.call_count == 1
424 1
        assert int_manager._install_int_flows.call_count == 1
425
426 1
    def test_validate_intra_evc_different_proxy_ports(self) -> None:
427
        """Test _validate_intra_evc_different_proxy_ports."""
428 1
        pp_a, pp_z, controller = MagicMock(), MagicMock(), MagicMock()
429 1
        evc = {
430
            "id": "some_id",
431
            "uni_a": {"proxy_port": pp_a, "interface_id": "00:00:00:00:00:00:00:01:1"},
432
            "uni_z": {"proxy_port": pp_z, "interface_id": "00:00:00:00:00:00:00:01:2"},
433
        }
434
435 1
        int_manager = INTManager(controller)
436 1
        int_manager._validate_intra_evc_different_proxy_ports(evc)
437
438 1
        source = MagicMock()
439 1
        pp_a.source, pp_z.source = source, source
440 1
        with pytest.raises(ProxyPortSameSourceIntraEVC):
441 1
            int_manager._validate_intra_evc_different_proxy_ports(evc)
442
443 1
    async def test__remove_int_flows_by_cookies(
444
        self, inter_evc_evpl_flows_data
445
    ) -> None:
446
        """test _remove_int_flows_by_cookies."""
447 1
        controller = get_controller_mock()
448 1
        controller._buffers.app.aput = AsyncMock()
449 1
        int_manager = INTManager(controller)
450 1
        assert len(inter_evc_evpl_flows_data) == 3
451 1
        res = await int_manager._remove_int_flows_by_cookies(inter_evc_evpl_flows_data)
452 1
        assert len(res) == 3
453 1
        for flows in res.values():
454 1
            for flow in flows:
455 1
                assert "cookie_mask" in flow
456 1
                assert flow["cookie_mask"] == int(0xFFFFFFFFFFFFFFFF)
457 1
                assert flow["table_id"] == 0xFF
458 1
        assert controller._buffers.app.aput.call_count == 3
459
460 1
    async def test__remove_int_flows(self, inter_evc_evpl_flows_data) -> None:
461
        """test _remove_int_flows."""
462 1
        controller = get_controller_mock()
463 1
        controller._buffers.app.aput = AsyncMock()
464 1
        int_manager = INTManager(controller)
465 1
        assert len(inter_evc_evpl_flows_data) == 3
466 1
        res = await int_manager._remove_int_flows(inter_evc_evpl_flows_data)
467 1
        assert len(res) == 3
468 1
        assert controller._buffers.app.aput.call_count == 3
469
470 1
    async def test__install_int_flows(self, inter_evc_evpl_flows_data, monkeypatch):
471
        """test__install_int_flows."""
472 1
        sleep_mock = AsyncMock()
473 1
        monkeypatch.setattr("asyncio.sleep", sleep_mock)
474 1
        controller = get_controller_mock()
475 1
        controller._buffers.app.aput = AsyncMock()
476 1
        int_manager = INTManager(controller)
477 1
        assert len(inter_evc_evpl_flows_data) == 3
478 1
        res = await int_manager._install_int_flows(inter_evc_evpl_flows_data)
479 1
        assert len(res) == 3
480 1
        assert controller._buffers.app.aput.call_count == 3
481 1
        assert sleep_mock.call_count == 0
482
483 1 View Code Duplication
    def test__add_pps_evc_ids(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
484
        """test_add_pps_evc_ids."""
485 1
        dpid_a = "00:00:00:00:00:00:00:01"
486 1
        intf_id_a = f"{dpid_a}:1"
487 1
        intf_id_z = f"{dpid_a}:2"
488 1
        evc_id = "3766c105686749"
489 1
        evcs = {
490
            evc_id: {
491
                "metadata": {"telemetry": {"enabled": True}},
492
                "uni_a": {"interface_id": intf_id_a},
493
                "uni_z": {"interface_id": intf_id_z},
494
            }
495
        }
496 1
        controller = get_controller_mock()
497 1
        int_manager = INTManager(controller)
498 1
        pp = MagicMock()
499 1
        mock = MagicMock()
500 1
        int_manager.get_proxy_port_or_raise = mock
501 1
        mock.return_value = pp
502 1
        int_manager._add_pps_evc_ids(evcs)
503 1
        assert int_manager.get_proxy_port_or_raise.call_count == 2
504 1
        assert pp.evc_ids.add.call_count == 2
505 1
        pp.evc_ids.add.assert_called_with(evc_id)
506
507 1 View Code Duplication
    def test__discard_pps_evc_ids(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
508
        """test_discard_pps_evc_ids."""
509 1
        dpid_a = "00:00:00:00:00:00:00:01"
510 1
        intf_id_a = f"{dpid_a}:1"
511 1
        intf_id_z = f"{dpid_a}:2"
512 1
        evc_id = "3766c105686749"
513 1
        evcs = {
514
            evc_id: {
515
                "metadata": {"telemetry": {"enabled": True}},
516
                "uni_a": {"interface_id": intf_id_a},
517
                "uni_z": {"interface_id": intf_id_z},
518
            }
519
        }
520 1
        controller = get_controller_mock()
521 1
        int_manager = INTManager(controller)
522 1
        pp = MagicMock()
523 1
        mock = MagicMock()
524 1
        int_manager.get_proxy_port_or_raise = mock
525 1
        mock.return_value = pp
526 1
        int_manager._discard_pps_evc_ids(evcs)
527 1
        assert int_manager.get_proxy_port_or_raise.call_count == 2
528 1
        assert pp.evc_ids.discard.call_count == 2
529 1
        pp.evc_ids.discard.assert_called_with(evc_id)
530
531 1
    def test_validate_evc_stored_flows(self) -> None:
532
        """Test validate evc stored flows."""
533 1
        controller = MagicMock()
534 1
        int_manager = INTManager(controller)
535 1
        evcs = {
536
            "3766c105686749": {
537
                "active": True,
538
                "uni_a": MagicMock(),
539
                "uni_z": MagicMock(),
540
            }
541
        }
542 1
        stored_flows = {0xAA3766C105686749: [MagicMock()]}
543 1
        int_manager._validate_evcs_stored_flows(evcs, stored_flows)
544
545 1
        with pytest.raises(exceptions.FlowsNotFound):
546 1
            int_manager._validate_evcs_stored_flows(evcs, {0xAA3766C105686749: []})
547
548 1
        with pytest.raises(exceptions.FlowsNotFound):
549 1
            int_manager._validate_evcs_stored_flows(evcs, {})
550
551 1
        evcs["3766c105686749"]["active"] = False
552 1
        int_manager._validate_evcs_stored_flows(evcs, {})
553
554 1
    async def test__send_flows(self) -> None:
555
        """Test _send_flows."""
556 1
        controller = get_controller_mock()
557 1
        controller._buffers.app.aput = AsyncMock()
558 1
        int_manager = INTManager(controller)
559 1
        switch_flows = {"dpid": []}
560 1
        await int_manager._send_flows(switch_flows, "install")
561 1
        controller._buffers.app.aput.assert_not_called()
562
563 1
        switch_flows = {"dpid": [MagicMock()]}
564 1
        await int_manager._send_flows(switch_flows, "install")
565
        controller._buffers.app.aput.assert_called()
566