test_main.TestMain.test_get_enabled_evcs()   A
last analyzed

Complexity

Conditions 1

Size

Total Lines 21
Code Lines 16

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 13
CRAP Score 1

Importance

Changes 0
Metric Value
cc 1
eloc 16
nop 2
dl 0
loc 21
ccs 13
cts 13
cp 1
crap 1
rs 9.6
c 0
b 0
f 0
1
"""Test Main methods."""
2
3 1
from unittest.mock import AsyncMock, MagicMock, patch
4
5 1
import pytest
6 1
from napps.kytos.telemetry_int import utils
7 1
from napps.kytos.telemetry_int.exceptions import EVCError, ProxyPortShared
8 1
from napps.kytos.telemetry_int.main import Main
9
10 1
from kytos.core.common import EntityStatus
11 1
from kytos.core.events import KytosEvent
12 1
from kytos.lib.helpers import get_controller_mock, get_test_client
13
14
15 1
class TestMain:
16
    """Tests for the Main class."""
17
18 1
    def setup_method(self):
19
        """Setup."""
20 1
        patch("kytos.core.helpers.run_on_thread", lambda x: x).start()
21
        # pylint: disable=import-outside-toplevel
22 1
        controller = get_controller_mock()
23 1
        self.napp = Main(controller)
24 1
        self.api_client = get_test_client(controller, self.napp)
25 1
        self.base_endpoint = "kytos/telemetry_int/v1"
26
27 1
    async def test_enable_telemetry(self, monkeypatch) -> None:
28
        """Test enable telemetry."""
29 1
        api_mock, flow = AsyncMock(), MagicMock()
30 1
        flow.cookie = 0xA800000000000001
31 1
        monkeypatch.setattr(
32
            "napps.kytos.telemetry_int.main.api",
33
            api_mock,
34
        )
35
36 1
        evc_id = utils.get_id_from_cookie(flow.cookie)
37 1
        api_mock.get_evcs.return_value = {
38
            evc_id: {"metadata": {"telemetry": {"enabled": False}}}
39
        }
40
41 1
        self.napp.int_manager = AsyncMock()
42
43 1
        endpoint = f"{self.base_endpoint}/evc/enable"
44 1
        response = await self.api_client.post(endpoint, json={"evc_ids": [evc_id]})
45 1
        assert self.napp.int_manager.enable_int.call_count == 1
46
47 1
        enable_int_args = self.napp.int_manager.enable_int.call_args
48
        # evcs arg
49 1
        assert evc_id in enable_int_args[0][0]
50
        # force arg, False by default
51 1
        assert not enable_int_args[0][1]
52
53 1
        assert self.napp.int_manager._remove_int_flows_by_cookies.call_count == 1
54 1
        assert response.status_code == 201
55 1
        assert response.json() == [evc_id]
56
57 1
        assert self.napp.int_manager._remove_int_flows_by_cookies.call_count == 1
58 1
        assert response.status_code == 201
59 1
        assert response.json() == [evc_id]
60
61 1 View Code Duplication
    async def test_redeploy_telemetry_enabled(self, monkeypatch) -> None:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
62
        """Test redeploy telemetry enabled."""
63 1
        api_mock, flow = AsyncMock(), MagicMock()
64 1
        flow.cookie = 0xA800000000000001
65 1
        monkeypatch.setattr(
66
            "napps.kytos.telemetry_int.main.api",
67
            api_mock,
68
        )
69
70 1
        evc_id = utils.get_id_from_cookie(flow.cookie)
71 1
        api_mock.get_evc.return_value = {
72
            evc_id: {"metadata": {"telemetry": {"enabled": True}}}
73
        }
74
75 1
        self.napp.int_manager = AsyncMock()
76
77 1
        endpoint = f"{self.base_endpoint}/evc/redeploy"
78 1
        response = await self.api_client.patch(endpoint, json={"evc_ids": [evc_id]})
79 1
        assert self.napp.int_manager.redeploy_int.call_count == 1
80 1
        assert response.status_code == 201
81 1
        assert response.json() == [evc_id]
82
83 1
    async def test_redeploy_telemetry_not_enabled(self, monkeypatch) -> None:
84
        """Test redeploy telemetry not enabled."""
85 1
        api_mock, flow, api_mngr_mock = AsyncMock(), MagicMock(), AsyncMock()
86 1
        flow.cookie = 0xA800000000000001
87 1
        monkeypatch.setattr(
88
            "napps.kytos.telemetry_int.main.api",
89
            api_mock,
90
        )
91 1
        monkeypatch.setattr(
92
            "napps.kytos.telemetry_int.managers.int.api",
93
            api_mngr_mock,
94
        )
95
96 1
        evc_id = utils.get_id_from_cookie(flow.cookie)
97 1
        api_mock.get_evc.return_value = {
98
            evc_id: {"metadata": {"telemetry": {"enabled": False}}}
99
        }
100
101 1
        self.napp.int_manager._validate_map_enable_evcs = MagicMock()
102 1
        self.napp.int_manager._remove_int_flows_by_cookies = AsyncMock()
103 1
        self.napp.int_manager.install_int_flows = AsyncMock()
104 1
        endpoint = f"{self.base_endpoint}/evc/redeploy"
105 1
        response = await self.api_client.patch(endpoint, json={"evc_ids": [evc_id]})
106 1
        assert response.status_code == 409
107 1
        assert "isn't enabled" in response.json()["description"]
108
109 1
    @pytest.mark.parametrize("route", ["/evc/enable", "/evc/disable"])
110 1
    async def test_en_dis_openapi_validation(self, route: str) -> None:
111
        """Test OpenAPI enable/disable basic validation."""
112 1
        endpoint = f"{self.base_endpoint}{route}"
113
        # wrong evc_ids payload data type
114 1
        response = await self.api_client.post(endpoint, json={"evc_ids": 1})
115 1
        assert response.status_code == 400
116 1
        assert "evc_ids" in response.json()["description"]
117
118 1 View Code Duplication
    async def test_disable_telemetry(self, monkeypatch) -> None:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
119
        """Test disable telemetry."""
120 1
        api_mock, flow = AsyncMock(), MagicMock()
121 1
        flow.cookie = 0xA800000000000001
122 1
        monkeypatch.setattr(
123
            "napps.kytos.telemetry_int.main.api",
124
            api_mock,
125
        )
126
127 1
        evc_id = utils.get_id_from_cookie(flow.cookie)
128 1
        api_mock.get_evcs.return_value = {
129
            evc_id: {"metadata": {"telemetry": {"enabled": True}}}
130
        }
131
132 1
        self.napp.int_manager = AsyncMock()
133
134 1
        endpoint = f"{self.base_endpoint}/evc/disable"
135 1
        response = await self.api_client.post(endpoint, json={"evc_ids": [evc_id]})
136 1
        assert self.napp.int_manager.disable_int.call_count == 1
137 1
        assert response.status_code == 200
138 1
        assert response.json() == [evc_id]
139
140 1
    async def test_get_enabled_evcs(self, monkeypatch) -> None:
141
        """Test get enabled evcs."""
142 1
        api_mock, flow = AsyncMock(), MagicMock()
143 1
        flow.cookie = 0xA800000000000001
144 1
        monkeypatch.setattr(
145
            "napps.kytos.telemetry_int.main.api",
146
            api_mock,
147
        )
148
149 1
        evc_id = utils.get_id_from_cookie(flow.cookie)
150 1
        api_mock.get_evcs.return_value = {
151
            evc_id: {"metadata": {"telemetry": {"enabled": True}}},
152
        }
153
154 1
        endpoint = f"{self.base_endpoint}/evc"
155 1
        response = await self.api_client.get(endpoint)
156 1
        assert api_mock.get_evcs.call_args[1] == {"metadata.telemetry.enabled": "true"}
157 1
        assert response.status_code == 200
158 1
        data = response.json()
159 1
        assert len(data) == 1
160 1
        assert evc_id in data
161
162 1
    async def test_get_evc_compare(self, monkeypatch) -> None:
163
        """Test get evc compre ok case."""
164 1
        api_mock, flow = AsyncMock(), MagicMock()
165 1
        flow.cookie = 0xA800000000000001
166 1
        evc_id = "1"
167 1
        monkeypatch.setattr(
168
            "napps.kytos.telemetry_int.main.api",
169
            api_mock,
170
        )
171
172 1
        evc_id = utils.get_id_from_cookie(flow.cookie)
173 1
        api_mock.get_evcs.return_value = {
174
            evc_id: {
175
                "id": evc_id,
176
                "name": "evc",
177
                "metadata": {"telemetry": {"enabled": True}},
178
            },
179
        }
180 1
        api_mock.get_stored_flows.side_effect = [
181
            {flow.cookie: [{"id": "some_1", "match": {"in_port": 1}}]},
182
            {flow.cookie: [{"id": "some_2", "match": {"in_port": 1}}]},
183
        ]
184
185 1
        endpoint = f"{self.base_endpoint}/evc/compare"
186 1
        response = await self.api_client.get(endpoint)
187 1
        assert response.status_code == 200
188 1
        data = response.json()
189 1
        assert len(data) == 0
190
191 1
    async def test_get_evc_compare_wrong_metadata(self, monkeypatch) -> None:
192
        """Test get evc compre wrong_metadata_has_int_flows case."""
193 1
        api_mock, flow = AsyncMock(), MagicMock()
194 1
        flow.cookie = 0xA800000000000001
195 1
        evc_id = "1"
196 1
        monkeypatch.setattr(
197
            "napps.kytos.telemetry_int.main.api",
198
            api_mock,
199
        )
200
201 1
        evc_id = utils.get_id_from_cookie(flow.cookie)
202 1
        api_mock.get_evcs.return_value = {
203
            evc_id: {"id": evc_id, "name": "evc", "metadata": {}},
204
        }
205 1
        api_mock.get_stored_flows.side_effect = [
206
            {flow.cookie: [{"id": "some_1", "match": {"in_port": 1}}]},
207
            {flow.cookie: [{"id": "some_2", "match": {"in_port": 1}}]},
208
        ]
209
210 1
        endpoint = f"{self.base_endpoint}/evc/compare"
211 1
        response = await self.api_client.get(endpoint)
212 1
        assert response.status_code == 200
213 1
        data = response.json()
214 1
        assert len(data) == 1
215 1
        assert data[0]["id"] == evc_id
216 1
        assert data[0]["compare_reason"] == ["wrong_metadata_has_int_flows"]
217 1
        assert data[0]["name"] == "evc"
218
219 1
    async def test_get_evc_compare_missing_some_int_flows(self, monkeypatch) -> None:
220
        """Test get evc compre missing_some_int_flows case."""
221 1
        api_mock, flow = AsyncMock(), MagicMock()
222 1
        flow.cookie = 0xA800000000000001
223 1
        evc_id = "1"
224 1
        monkeypatch.setattr(
225
            "napps.kytos.telemetry_int.main.api",
226
            api_mock,
227
        )
228
229 1
        evc_id = utils.get_id_from_cookie(flow.cookie)
230 1
        api_mock.get_evcs.return_value = {
231
            evc_id: {
232
                "id": evc_id,
233
                "name": "evc",
234
                "metadata": {"telemetry": {"enabled": True}},
235
            },
236
        }
237 1
        api_mock.get_stored_flows.side_effect = [
238
            {flow.cookie: [{"id": "some_1", "match": {"in_port": 1}}]},
239
            {
240
                flow.cookie: [
241
                    {"id": "some_2", "match": {"in_port": 1}},
242
                    {"id": "some_3", "match": {"in_port": 1}},
243
                ]
244
            },
245
        ]
246
247 1
        endpoint = f"{self.base_endpoint}/evc/compare"
248 1
        response = await self.api_client.get(endpoint)
249 1
        assert response.status_code == 200
250 1
        data = response.json()
251 1
        assert len(data) == 1
252 1
        assert data[0]["id"] == evc_id
253 1
        assert data[0]["compare_reason"] == ["missing_some_int_flows"]
254 1
        assert data[0]["name"] == "evc"
255
256 1
    async def test_delete_proxy_port_metadata(self, monkeypatch) -> None:
257
        """Test delete proxy_port metadata."""
258 1
        api_mock = AsyncMock()
259 1
        monkeypatch.setattr(
260
            "napps.kytos.telemetry_int.main.api",
261
            api_mock,
262
        )
263 1
        intf_id, port_number = "00:00:00:00:00:00:00:01:1", 7
264 1
        endpoint = f"{self.base_endpoint}/uni/{intf_id}/proxy_port"
265 1
        self.napp.controller.get_interface_by_id = MagicMock()
266 1
        pp = MagicMock()
267 1
        pp.evc_ids = set()
268 1
        self.napp.int_manager.get_proxy_port_or_raise = MagicMock()
269 1
        self.napp.int_manager.get_proxy_port_or_raise.return_value = pp
270 1
        intf_mock = MagicMock()
271 1
        intf_mock.metadata = {"proxy_port": port_number}
272 1
        self.napp.controller.get_interface_by_id = MagicMock()
273 1
        self.napp.controller.get_interface_by_id.return_value = intf_mock
274 1
        response = await self.api_client.delete(endpoint)
275 1
        assert response.status_code == 200
276 1
        data = response.json()
277 1
        assert data == "Operation successful"
278 1
        assert api_mock.delete_proxy_port_metadata.call_count == 1
279
280 1
    async def test_delete_proxy_port_metadata_force(self, monkeypatch) -> None:
281
        """Test delete proxy_port metadata force."""
282 1
        api_mock = AsyncMock()
283 1
        monkeypatch.setattr(
284
            "napps.kytos.telemetry_int.main.api",
285
            api_mock,
286
        )
287 1
        intf_id, port_number = "00:00:00:00:00:00:00:01:1", 7
288 1
        src_id = "00:00:00:00:00:00:00:01:7"
289 1
        endpoint = f"{self.base_endpoint}/uni/{intf_id}/proxy_port"
290 1
        self.napp.controller.get_interface_by_id = MagicMock()
291 1
        pp = MagicMock()
292 1
        pp.evc_ids = set(["some_id"])
293 1
        self.napp.int_manager.unis_src[intf_id] = src_id
294 1
        self.napp.int_manager.srcs_pp[src_id] = pp
295 1
        intf_mock = MagicMock()
296 1
        intf_mock.metadata = {"proxy_port": port_number}
297 1
        self.napp.controller.get_interface_by_id = MagicMock()
298 1
        self.napp.controller.get_interface_by_id.return_value = intf_mock
299 1
        response = await self.api_client.delete(endpoint)
300 1
        assert response.status_code == 409
301 1
        data = response.json()["description"]
302 1
        assert "is in use on 1" in data
303 1
        assert not api_mock.delete_proxy_port_metadata.call_count
304
305 1
        endpoint = f"{self.base_endpoint}/uni/{intf_id}/proxy_port?force=true"
306 1
        response = await self.api_client.delete(endpoint)
307 1
        assert response.status_code == 200
308 1
        data = response.json()
309 1
        assert "Operation successful" in data
310 1
        assert api_mock.delete_proxy_port_metadata.call_count == 1
311
312 1
    async def test_delete_proxy_port_metadata_early_ret(self, monkeypatch) -> None:
313
        """Test delete proxy_port metadata early ret."""
314 1
        api_mock = AsyncMock()
315 1
        monkeypatch.setattr(
316
            "napps.kytos.telemetry_int.main.api",
317
            api_mock,
318
        )
319 1
        intf_id = "00:00:00:00:00:00:00:01:1"
320 1
        endpoint = f"{self.base_endpoint}/uni/{intf_id}/proxy_port"
321 1
        self.napp.controller.get_interface_by_id = MagicMock()
322 1
        pp = MagicMock()
323 1
        pp.evc_ids = set(["some_id"])
324 1
        self.napp.int_manager.get_proxy_port_or_raise = MagicMock()
325 1
        self.napp.int_manager.get_proxy_port_or_raise.return_value = pp
326 1
        intf_mock = MagicMock()
327 1
        intf_mock.metadata = {}
328 1
        self.napp.controller.get_interface_by_id = MagicMock()
329 1
        self.napp.controller.get_interface_by_id.return_value = intf_mock
330 1
        response = await self.api_client.delete(endpoint)
331 1
        assert response.status_code == 200
332 1
        data = response.json()
333 1
        assert "Operation successful" in data
334 1
        assert not api_mock.delete_proxy_port_metadata.call_count
335
336 1
    async def test_add_proxy_port_metadata(self, monkeypatch) -> None:
337
        """Test add proxy_port metadata."""
338 1
        api_mock = AsyncMock()
339 1
        api_mock.get_evcs.return_value = {}
340 1
        monkeypatch.setattr(
341
            "napps.kytos.telemetry_int.main.api",
342
            api_mock,
343
        )
344 1
        intf_id, port_number = "00:00:00:00:00:00:00:01:1", 7
345 1
        endpoint = f"{self.base_endpoint}/uni/{intf_id}/proxy_port/{port_number}"
346 1
        self.napp.controller.get_interface_by_id = MagicMock()
347 1
        pp = MagicMock()
348 1
        pp.status = EntityStatus.UP
349 1
        self.napp.int_manager.get_proxy_port_or_raise = MagicMock()
350 1
        self.napp.int_manager.get_proxy_port_or_raise.return_value = pp
351 1
        response = await self.api_client.post(endpoint)
352 1
        assert response.status_code == 200
353 1
        data = response.json()
354 1
        assert data == "Operation successful"
355 1
        assert api_mock.add_proxy_port_metadata.call_count == 1
356
357 1
    async def test_add_proxy_port_metadata_early_ret(self, monkeypatch) -> None:
358
        """Test add proxy_port metadata early ret."""
359 1
        api_mock = AsyncMock()
360 1
        monkeypatch.setattr(
361
            "napps.kytos.telemetry_int.main.api",
362
            api_mock,
363
        )
364 1
        intf_id, port_number = "00:00:00:00:00:00:00:01:1", 7
365 1
        endpoint = f"{self.base_endpoint}/uni/{intf_id}/proxy_port/{port_number}"
366 1
        intf_mock = MagicMock()
367 1
        intf_mock.metadata = {"proxy_port": port_number}
368 1
        self.napp.controller.get_interface_by_id = MagicMock()
369 1
        self.napp.controller.get_interface_by_id.return_value = intf_mock
370 1
        response = await self.api_client.post(endpoint)
371 1
        assert response.status_code == 200
372 1
        data = response.json()
373 1
        assert data == "Operation successful"
374 1
        assert not api_mock.add_proxy_port_metadata.call_count
375
376 1
    async def test_add_proxy_port_metadata_conflict(self, monkeypatch) -> None:
377
        """Test add proxy_port metadata conflict."""
378 1
        api_mock = AsyncMock()
379 1
        api_mock.get_evcs.return_value = {}
380 1
        monkeypatch.setattr(
381
            "napps.kytos.telemetry_int.main.api",
382
            api_mock,
383
        )
384 1
        intf_id, port_number = "00:00:00:00:00:00:00:01:1", 7
385 1
        endpoint = f"{self.base_endpoint}/uni/{intf_id}/proxy_port/{port_number}"
386 1
        self.napp.controller.get_interface_by_id = MagicMock()
387 1
        pp = MagicMock()
388 1
        pp.status = EntityStatus.UP
389 1
        self.napp.int_manager.get_proxy_port_or_raise = MagicMock()
390 1
        self.napp.int_manager.get_proxy_port_or_raise.side_effect = ProxyPortShared(
391
            "no_evc_id", "boom"
392
        )
393 1
        response = await self.api_client.post(endpoint)
394 1
        assert response.status_code == 409
395
396 1
    async def test_add_proxy_port_metadata_force(self, monkeypatch) -> None:
397
        """Test add proxy_port metadata force."""
398 1
        api_mock = AsyncMock()
399 1
        api_mock.get_evcs.return_value = {}
400 1
        monkeypatch.setattr(
401
            "napps.kytos.telemetry_int.main.api",
402
            api_mock,
403
        )
404 1
        intf_id, port_number = "00:00:00:00:00:00:00:01:1", 7
405 1
        force = "true"
406 1
        endpoint = (
407
            f"{self.base_endpoint}/uni/{intf_id}/proxy_port/{port_number}?force={force}"
408
        )
409 1
        self.napp.controller.get_interface_by_id = MagicMock()
410 1
        pp = MagicMock()
411
        # despite proxy port down, with force true the request shoudl succeed
412 1
        pp.status = EntityStatus.DOWN
413 1
        self.napp.int_manager.get_proxy_port_or_raise = MagicMock()
414 1
        self.napp.int_manager.get_proxy_port_or_raise.return_value = pp
415 1
        response = await self.api_client.post(endpoint)
416 1
        assert response.status_code == 200
417 1
        data = response.json()
418 1
        assert data == "Operation successful"
419 1
        assert api_mock.add_proxy_port_metadata.call_count == 1
420
421 1
        force = "false"
422 1
        endpoint = (
423
            f"{self.base_endpoint}/uni/{intf_id}/proxy_port/{port_number}?force={force}"
424
        )
425 1
        response = await self.api_client.post(endpoint)
426 1
        assert response.status_code == 409
427 1
        assert "isn't UP" in response.json()["description"]
428
429 1
    async def test_list_proxy_port(self) -> None:
430
        """Test list proxy port."""
431 1
        endpoint = f"{self.base_endpoint}/uni/proxy_port"
432 1
        response = await self.api_client.get(endpoint)
433 1
        assert response.status_code == 200
434 1
        data = response.json()
435 1
        assert not data
436
437 1
        sw1, intf_mock = MagicMock(), MagicMock()
438 1
        intf_mock.metadata = {"proxy_port": 1}
439 1
        intf_mock.status.value = "UP"
440 1
        intf_mock.id = "1"
441 1
        sw1.interfaces = {"intf1": intf_mock}
442 1
        self.napp.controller.switches = {"sw1": sw1}
443 1
        response = await self.api_client.get(endpoint)
444 1
        assert response.status_code == 200
445 1
        data = response.json()
446 1
        expected = [
447
            {
448
                "proxy_port": {
449
                    "port_number": 1,
450
                    "status": "DOWN",
451
                    "status_reason": ["UNI interface 1 not found"],
452
                },
453
                "uni": {"id": "1", "status": "UP", "status_reason": []},
454
            }
455
        ]
456 1
        assert data == expected
457
458 1
        pp = MagicMock()
459 1
        self.napp.int_manager.get_proxy_port_or_raise = MagicMock()
460 1
        self.napp.int_manager.get_proxy_port_or_raise.return_value = pp
461 1
        pp.status.value = "UP"
462
463 1
        response = await self.api_client.get(endpoint)
464 1
        assert response.status_code == 200
465 1
        data = response.json()
466 1
        expected = [
467
            {
468
                "proxy_port": {
469
                    "port_number": 1,
470
                    "status": "UP",
471
                    "status_reason": [],
472
                },
473
                "uni": {"id": "1", "status": "UP", "status_reason": []},
474
            }
475
        ]
476 1
        assert data == expected
477
478 1
    async def test_on_table_enabled(self) -> None:
479
        """Test on_table_enabled."""
480 1
        assert self.napp.int_manager.flow_builder.table_group == {"evpl": 2, "epl": 3}
481 1
        await self.napp.on_table_enabled(
482
            KytosEvent(content={"telemetry_int": {"evpl": 22, "epl": 33}})
483
        )
484 1
        assert self.napp.int_manager.flow_builder.table_group == {"evpl": 22, "epl": 33}
485 1
        assert self.napp.controller.buffers.app.aput.call_count == 1
486
487 1
    async def test_on_table_enabled_no_group(self) -> None:
488
        """Test on_table_enabled no group."""
489 1
        await self.napp.on_table_enabled(
490
            KytosEvent(content={"mef_eline": {"evpl": 22, "epl": 33}})
491
        )
492 1
        assert not self.napp.controller.buffers.app.aput.call_count
493
494 1
    async def test_on_evc_deployed(self) -> None:
495
        """Test on_evc_deployed."""
496 1
        content = {"metadata": {"telemetry_request": {}}, "id": "some_id"}
497 1
        self.napp.int_manager.redeploy_int = AsyncMock()
498 1
        self.napp.int_manager.enable_int = AsyncMock()
499 1
        await self.napp.on_evc_deployed(KytosEvent(content=content))
500 1
        assert self.napp.int_manager.enable_int.call_count == 1
501 1
        assert self.napp.int_manager.redeploy_int.call_count == 0
502
503 1
        content = {"metadata": {"telemetry": {"enabled": True}}, "id": "some_id"}
504 1
        await self.napp.on_evc_deployed(KytosEvent(content=content))
505 1
        assert self.napp.int_manager.enable_int.call_count == 1
506 1
        assert self.napp.int_manager.redeploy_int.call_count == 1
507
508 1
    async def test_on_evc_deployed_error(self, monkeypatch) -> None:
509
        """Test on_evc_deployed error."""
510 1
        content = {"metadata": {"telemetry_request": {}}, "id": "some_id"}
511 1
        self.napp.int_manager.enable_int = AsyncMock()
512 1
        self.napp.int_manager.enable_int.side_effect = EVCError("no_id", "boom")
513 1
        log_mock = MagicMock()
514 1
        monkeypatch.setattr("napps.kytos.telemetry_int.main.log", log_mock)
515 1
        await self.napp.on_evc_deployed(KytosEvent(content=content))
516 1
        assert log_mock.error.call_count == 1
517
518 1
    async def test_on_evc_deleted(self) -> None:
519
        """Test on_evc_deleted."""
520 1
        content = {"metadata": {"telemetry": {"enabled": True}}, "id": "some_id"}
521 1
        self.napp.int_manager.disable_int = AsyncMock()
522 1
        await self.napp.on_evc_deleted(KytosEvent(content=content))
523 1
        assert self.napp.int_manager.disable_int.call_count == 1
524
525 1
    async def test_on_uni_active_updated(self, monkeypatch) -> None:
526
        """Test on UNI active updated."""
527 1
        api_mock = AsyncMock()
528 1
        monkeypatch.setattr(
529
            "napps.kytos.telemetry_int.main.api",
530
            api_mock,
531
        )
532 1
        content = {
533
            "metadata": {"telemetry": {"enabled": True}},
534
            "id": "some_id",
535
            "active": True,
536
        }
537 1
        await self.napp.on_uni_active_updated(KytosEvent(content=content))
538 1
        assert api_mock.add_evcs_metadata.call_count == 1
539 1
        args = api_mock.add_evcs_metadata.call_args[0][1]
540 1
        assert args["telemetry"]["status"] == "UP"
541
542 1
        content["active"] = False
543 1
        await self.napp.on_uni_active_updated(KytosEvent(content=content))
544 1
        assert api_mock.add_evcs_metadata.call_count == 2
545 1
        args = api_mock.add_evcs_metadata.call_args[0][1]
546 1
        assert args["telemetry"]["status"] == "DOWN"
547
548 1
    async def test_on_evc_undeployed(self) -> None:
549
        """Test on_evc_undeployed."""
550 1
        content = {
551
            "enabled": False,
552
            "metadata": {"telemetry": {"enabled": False}},
553
            "id": "some_id",
554
        }
555 1
        self.napp.int_manager.remove_int_flows = AsyncMock()
556 1
        await self.napp.on_evc_undeployed(KytosEvent(content=content))
557 1
        assert self.napp.int_manager.remove_int_flows.call_count == 0
558
559 1
        content["metadata"]["telemetry"]["enabled"] = True
560 1
        await self.napp.on_evc_undeployed(KytosEvent(content=content))
561 1
        assert self.napp.int_manager.remove_int_flows.call_count == 1
562
563 1
    async def test_on_evc_redeployed_link(self) -> None:
564
        """Test on redeployed_link_down|redeployed_link_up."""
565 1
        content = {
566
            "enabled": True,
567
            "metadata": {"telemetry": {"enabled": False}},
568
            "id": "some_id",
569
        }
570 1
        self.napp.int_manager.redeploy_int = AsyncMock()
571 1
        await self.napp.on_evc_redeployed_link(KytosEvent(content=content))
572 1
        assert self.napp.int_manager.redeploy_int.call_count == 0
573
574 1
        content["metadata"]["telemetry"]["enabled"] = True
575 1
        await self.napp.on_evc_redeployed_link(KytosEvent(content=content))
576 1
        assert self.napp.int_manager.redeploy_int.call_count == 1
577
578 1
    async def test_on_evc_redeployed_link_error(self, monkeypatch) -> None:
579
        """Test on redeployed_link_down|redeployed_link_up error."""
580 1
        content = {
581
            "enabled": True,
582
            "metadata": {"telemetry": {"enabled": True}},
583
            "id": "some_id",
584
        }
585 1
        log_mock = MagicMock()
586 1
        monkeypatch.setattr("napps.kytos.telemetry_int.main.log", log_mock)
587 1
        self.napp.int_manager.redeploy_int = AsyncMock()
588 1
        self.napp.int_manager.redeploy_int.side_effect = EVCError("no_id", "boom")
589 1
        await self.napp.on_evc_redeployed_link(KytosEvent(content=content))
590 1
        assert log_mock.error.call_count == 1
591
592 1
    async def test_on_evc_error_redeployed_link_down(self) -> None:
593
        """Test error_redeployed_link_down."""
594 1
        content = {
595
            "enabled": True,
596
            "metadata": {"telemetry": {"enabled": False}},
597
            "id": "some_id",
598
        }
599 1
        self.napp.int_manager.remove_int_flows = AsyncMock()
600 1
        await self.napp.on_evc_error_redeployed_link_down(KytosEvent(content=content))
601 1
        assert self.napp.int_manager.remove_int_flows.call_count == 0
602
603 1
        content["metadata"]["telemetry"]["enabled"] = True
604 1
        await self.napp.on_evc_error_redeployed_link_down(KytosEvent(content=content))
605 1
        assert self.napp.int_manager.remove_int_flows.call_count == 1
606
607 1
    async def test_on_link_down(self) -> None:
608
        """Test on link_down."""
609 1
        self.napp.int_manager.handle_pp_link_down = AsyncMock()
610 1
        await self.napp.on_link_down(KytosEvent(content={"link": MagicMock()}))
611 1
        assert self.napp.int_manager.handle_pp_link_down.call_count == 1
612
613 1
    async def test_on_link_up(self) -> None:
614
        """Test on link_up."""
615 1
        self.napp.int_manager.handle_pp_link_up = AsyncMock()
616 1
        await self.napp.on_link_up(KytosEvent(content={"link": MagicMock()}))
617 1
        assert self.napp.int_manager.handle_pp_link_up.call_count == 1
618
619 1
    async def test_on_table_enabled_error(self, monkeypatch) -> None:
620
        """Test on_table_enabled error case."""
621 1
        assert self.napp.int_manager.flow_builder.table_group == {"evpl": 2, "epl": 3}
622 1
        log_mock = MagicMock()
623 1
        monkeypatch.setattr("napps.kytos.telemetry_int.main.log", log_mock)
624 1
        await self.napp.on_table_enabled(
625
            KytosEvent(content={"telemetry_int": {"invalid": 1}})
626
        )
627 1
        assert self.napp.int_manager.flow_builder.table_group == {"evpl": 2, "epl": 3}
628 1
        assert log_mock.error.call_count == 1
629 1
        assert not self.napp.controller.buffers.app.aput.call_count
630
631 1
    async def test_on_flow_mod_error(self, monkeypatch) -> None:
632
        """Test on_flow_mod_error."""
633 1
        api_mock_main, api_mock_int, flow = AsyncMock(), AsyncMock(), MagicMock()
634 1
        flow.cookie = 0xA800000000000001
635 1
        monkeypatch.setattr(
636
            "napps.kytos.telemetry_int.main.api",
637
            api_mock_main,
638
        )
639 1
        monkeypatch.setattr(
640
            "napps.kytos.telemetry_int.managers.int.api",
641
            api_mock_int,
642
        )
643 1
        cookie = utils.get_id_from_cookie(flow.cookie)
644 1
        api_mock_main.get_evc.return_value = {
645
            cookie: {"metadata": {"telemetry": {"enabled": True}}}
646
        }
647 1
        api_mock_int.get_stored_flows.return_value = {cookie: [MagicMock()]}
648 1
        self.napp.int_manager._remove_int_flows_by_cookies = AsyncMock()
649
650 1
        event = KytosEvent(content={"flow": flow, "error_command": "add"})
651 1
        await self.napp.on_flow_mod_error(event)
652
653 1
        assert api_mock_main.get_evc.call_count == 1
654 1
        assert api_mock_int.get_stored_flows.call_count == 1
655 1
        assert api_mock_int.add_evcs_metadata.call_count == 1
656 1
        assert self.napp.int_manager._remove_int_flows_by_cookies.call_count == 1
657
658 1
    async def test_on_mef_eline_evcs_loaded(self):
659
        """Test on_mef_eline_evcs_loaded."""
660 1
        evcs = {"1": {}, "2": {}}
661 1
        event = KytosEvent(content=evcs)
662 1
        self.napp.int_manager = MagicMock()
663 1
        await self.napp.on_mef_eline_evcs_loaded(event)
664 1
        self.napp.int_manager.load_uni_src_proxy_ports.assert_called_with(evcs)
665
666 1
    async def test_on_intf_metadata_remove(self):
667
        """Test on_intf_metadata_removed."""
668 1
        intf = MagicMock()
669 1
        event = KytosEvent(content={"interface": intf})
670 1
        self.napp.int_manager = MagicMock()
671 1
        await self.napp.on_intf_metadata_removed(event)
672 1
        self.napp.int_manager.handle_pp_metadata_removed.assert_called_with(intf)
673
674 1
    async def test_on_intf_metadata_added(self):
675
        """Test on_intf_metadata_added."""
676 1
        intf = MagicMock()
677 1
        event = KytosEvent(content={"interface": intf})
678 1
        self.napp.int_manager = MagicMock()
679 1
        await self.napp.on_intf_metadata_added(event)
680 1
        self.napp.int_manager.handle_pp_metadata_added.assert_called_with(intf)
681
682 1
    async def test_on_failover_deployed(self):
683
        """Test on_failover_deployed."""
684 1
        event = KytosEvent(content={})
685 1
        self.napp.int_manager = MagicMock()
686 1
        await self.napp.on_failover_deployed(event)
687 1
        self.napp.int_manager.handle_failover_flows.assert_called()
688
689 1
    async def test_on_failover_link_down(self):
690
        """Test on_failover_link_down."""
691 1
        event = KytosEvent(content={})
692 1
        self.napp.int_manager = MagicMock()
693 1
        await self.napp.on_failover_link_down(event)
694 1
        self.napp.int_manager.handle_failover_flows.assert_called()
695
696 1
    async def test_on_failover_old_path(self):
697
        """Test on_failover_old_path."""
698 1
        event = KytosEvent(content={})
699 1
        self.napp.int_manager = MagicMock()
700 1
        await self.napp.on_failover_old_path(event)
701
        self.napp.int_manager.handle_failover_flows.assert_called()
702