test_main.TestMain.test_on_evc_redeployed_link()   A
last analyzed

Complexity

Conditions 1

Size

Total Lines 14
Code Lines 11

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 8
CRAP Score 1

Importance

Changes 0
Metric Value
cc 1
eloc 11
nop 1
dl 0
loc 14
ccs 8
cts 8
cp 1
crap 1
rs 9.85
c 0
b 0
f 0
1
"""Test Main methods."""
2
3 1
from unittest.mock import AsyncMock, MagicMock, patch
4
5 1
import pytest
6 1
from napps.kytos.telemetry_int import utils
7 1
from napps.kytos.telemetry_int.exceptions import EVCError, ProxyPortShared
8 1
from napps.kytos.telemetry_int.main import Main
9
10 1
from kytos.core.common import EntityStatus
11 1
from kytos.core.events import KytosEvent
12 1
from kytos.lib.helpers import get_controller_mock, get_test_client
13
14
15 1
class TestMain:
16
    """Tests for the Main class."""
17
18 1
    def setup_method(self):
19
        """Setup."""
20 1
        patch("kytos.core.helpers.run_on_thread", lambda x: x).start()
21
        # pylint: disable=import-outside-toplevel
22 1
        controller = get_controller_mock()
23 1
        self.napp = Main(controller)
24 1
        self.api_client = get_test_client(controller, self.napp)
25 1
        self.base_endpoint = "kytos/telemetry_int/v1"
26
27 1 View Code Duplication
    async def test_enable_telemetry(self, monkeypatch) -> None:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
28
        """Test enable telemetry."""
29 1
        api_mock, flow = AsyncMock(), MagicMock()
30 1
        flow.cookie = 0xA800000000000001
31 1
        monkeypatch.setattr(
32
            "napps.kytos.telemetry_int.main.api",
33
            api_mock,
34
        )
35
36 1
        evc_id = utils.get_id_from_cookie(flow.cookie)
37 1
        api_mock.get_evcs.return_value = {
38
            evc_id: {"metadata": {"telemetry": {"enabled": False}}}
39
        }
40
41 1
        self.napp.int_manager = AsyncMock()
42
43 1
        endpoint = f"{self.base_endpoint}/evc/enable"
44 1
        response = await self.api_client.post(endpoint, json={"evc_ids": [evc_id]})
45 1
        assert self.napp.int_manager.enable_int.call_count == 1
46 1
        assert self.napp.int_manager._remove_int_flows_by_cookies.call_count == 1
47 1
        assert response.status_code == 201
48 1
        assert response.json() == [evc_id]
49
50 1 View Code Duplication
    async def test_redeploy_telemetry_enabled(self, monkeypatch) -> None:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
51
        """Test redeploy telemetry enabled."""
52 1
        api_mock, flow = AsyncMock(), MagicMock()
53 1
        flow.cookie = 0xA800000000000001
54 1
        monkeypatch.setattr(
55
            "napps.kytos.telemetry_int.main.api",
56
            api_mock,
57
        )
58
59 1
        evc_id = utils.get_id_from_cookie(flow.cookie)
60 1
        api_mock.get_evc.return_value = {
61
            evc_id: {"metadata": {"telemetry": {"enabled": True}}}
62
        }
63
64 1
        self.napp.int_manager = AsyncMock()
65
66 1
        endpoint = f"{self.base_endpoint}/evc/redeploy"
67 1
        response = await self.api_client.patch(endpoint, json={"evc_ids": [evc_id]})
68 1
        assert self.napp.int_manager.redeploy_int.call_count == 1
69 1
        assert response.status_code == 201
70 1
        assert response.json() == [evc_id]
71
72 1
    async def test_redeploy_telemetry_not_enabled(self, monkeypatch) -> None:
73
        """Test redeploy telemetry not enabled."""
74 1
        api_mock, flow, api_mngr_mock = AsyncMock(), MagicMock(), AsyncMock()
75 1
        flow.cookie = 0xA800000000000001
76 1
        monkeypatch.setattr(
77
            "napps.kytos.telemetry_int.main.api",
78
            api_mock,
79
        )
80 1
        monkeypatch.setattr(
81
            "napps.kytos.telemetry_int.managers.int.api",
82
            api_mngr_mock,
83
        )
84
85 1
        evc_id = utils.get_id_from_cookie(flow.cookie)
86 1
        api_mock.get_evc.return_value = {
87
            evc_id: {"metadata": {"telemetry": {"enabled": False}}}
88
        }
89
90 1
        self.napp.int_manager._validate_map_enable_evcs = MagicMock()
91 1
        self.napp.int_manager._remove_int_flows_by_cookies = AsyncMock()
92 1
        self.napp.int_manager.install_int_flows = AsyncMock()
93 1
        endpoint = f"{self.base_endpoint}/evc/redeploy"
94 1
        response = await self.api_client.patch(endpoint, json={"evc_ids": [evc_id]})
95 1
        assert response.status_code == 409
96 1
        assert "isn't enabled" in response.json()["description"]
97
98 1
    @pytest.mark.parametrize("route", ["/evc/enable", "/evc/disable"])
99 1
    async def test_en_dis_openapi_validation(self, route: str) -> None:
100
        """Test OpenAPI enable/disable basic validation."""
101 1
        endpoint = f"{self.base_endpoint}{route}"
102
        # wrong evc_ids payload data type
103 1
        response = await self.api_client.post(endpoint, json={"evc_ids": 1})
104 1
        assert response.status_code == 400
105 1
        assert "evc_ids" in response.json()["description"]
106
107 1 View Code Duplication
    async def test_disable_telemetry(self, monkeypatch) -> None:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
108
        """Test disable telemetry."""
109 1
        api_mock, flow = AsyncMock(), MagicMock()
110 1
        flow.cookie = 0xA800000000000001
111 1
        monkeypatch.setattr(
112
            "napps.kytos.telemetry_int.main.api",
113
            api_mock,
114
        )
115
116 1
        evc_id = utils.get_id_from_cookie(flow.cookie)
117 1
        api_mock.get_evcs.return_value = {
118
            evc_id: {"metadata": {"telemetry": {"enabled": True}}}
119
        }
120
121 1
        self.napp.int_manager = AsyncMock()
122
123 1
        endpoint = f"{self.base_endpoint}/evc/disable"
124 1
        response = await self.api_client.post(endpoint, json={"evc_ids": [evc_id]})
125 1
        assert self.napp.int_manager.disable_int.call_count == 1
126 1
        assert response.status_code == 200
127 1
        assert response.json() == [evc_id]
128
129 1
    async def test_get_enabled_evcs(self, monkeypatch) -> None:
130
        """Test get enabled evcs."""
131 1
        api_mock, flow = AsyncMock(), MagicMock()
132 1
        flow.cookie = 0xA800000000000001
133 1
        monkeypatch.setattr(
134
            "napps.kytos.telemetry_int.main.api",
135
            api_mock,
136
        )
137
138 1
        evc_id = utils.get_id_from_cookie(flow.cookie)
139 1
        api_mock.get_evcs.return_value = {
140
            evc_id: {"metadata": {"telemetry": {"enabled": True}}},
141
        }
142
143 1
        endpoint = f"{self.base_endpoint}/evc"
144 1
        response = await self.api_client.get(endpoint)
145 1
        assert api_mock.get_evcs.call_args[1] == {"metadata.telemetry.enabled": "true"}
146 1
        assert response.status_code == 200
147 1
        data = response.json()
148 1
        assert len(data) == 1
149 1
        assert evc_id in data
150
151 1
    async def test_get_evc_compare(self, monkeypatch) -> None:
152
        """Test get evc compre ok case."""
153 1
        api_mock, flow = AsyncMock(), MagicMock()
154 1
        flow.cookie = 0xA800000000000001
155 1
        evc_id = "1"
156 1
        monkeypatch.setattr(
157
            "napps.kytos.telemetry_int.main.api",
158
            api_mock,
159
        )
160
161 1
        evc_id = utils.get_id_from_cookie(flow.cookie)
162 1
        api_mock.get_evcs.return_value = {
163
            evc_id: {
164
                "id": evc_id,
165
                "name": "evc",
166
                "metadata": {"telemetry": {"enabled": True}},
167
            },
168
        }
169 1
        api_mock.get_stored_flows.side_effect = [
170
            {flow.cookie: [{"id": "some_1", "match": {"in_port": 1}}]},
171
            {flow.cookie: [{"id": "some_2", "match": {"in_port": 1}}]},
172
        ]
173
174 1
        endpoint = f"{self.base_endpoint}/evc/compare"
175 1
        response = await self.api_client.get(endpoint)
176 1
        assert response.status_code == 200
177 1
        data = response.json()
178 1
        assert len(data) == 0
179
180 1
    async def test_get_evc_compare_wrong_metadata(self, monkeypatch) -> None:
181
        """Test get evc compre wrong_metadata_has_int_flows case."""
182 1
        api_mock, flow = AsyncMock(), MagicMock()
183 1
        flow.cookie = 0xA800000000000001
184 1
        evc_id = "1"
185 1
        monkeypatch.setattr(
186
            "napps.kytos.telemetry_int.main.api",
187
            api_mock,
188
        )
189
190 1
        evc_id = utils.get_id_from_cookie(flow.cookie)
191 1
        api_mock.get_evcs.return_value = {
192
            evc_id: {"id": evc_id, "name": "evc", "metadata": {}},
193
        }
194 1
        api_mock.get_stored_flows.side_effect = [
195
            {flow.cookie: [{"id": "some_1", "match": {"in_port": 1}}]},
196
            {flow.cookie: [{"id": "some_2", "match": {"in_port": 1}}]},
197
        ]
198
199 1
        endpoint = f"{self.base_endpoint}/evc/compare"
200 1
        response = await self.api_client.get(endpoint)
201 1
        assert response.status_code == 200
202 1
        data = response.json()
203 1
        assert len(data) == 1
204 1
        assert data[0]["id"] == evc_id
205 1
        assert data[0]["compare_reason"] == ["wrong_metadata_has_int_flows"]
206 1
        assert data[0]["name"] == "evc"
207
208 1
    async def test_get_evc_compare_missing_some_int_flows(self, monkeypatch) -> None:
209
        """Test get evc compre missing_some_int_flows case."""
210 1
        api_mock, flow = AsyncMock(), MagicMock()
211 1
        flow.cookie = 0xA800000000000001
212 1
        evc_id = "1"
213 1
        monkeypatch.setattr(
214
            "napps.kytos.telemetry_int.main.api",
215
            api_mock,
216
        )
217
218 1
        evc_id = utils.get_id_from_cookie(flow.cookie)
219 1
        api_mock.get_evcs.return_value = {
220
            evc_id: {
221
                "id": evc_id,
222
                "name": "evc",
223
                "metadata": {"telemetry": {"enabled": True}},
224
            },
225
        }
226 1
        api_mock.get_stored_flows.side_effect = [
227
            {flow.cookie: [{"id": "some_1", "match": {"in_port": 1}}]},
228
            {
229
                flow.cookie: [
230
                    {"id": "some_2", "match": {"in_port": 1}},
231
                    {"id": "some_3", "match": {"in_port": 1}},
232
                ]
233
            },
234
        ]
235
236 1
        endpoint = f"{self.base_endpoint}/evc/compare"
237 1
        response = await self.api_client.get(endpoint)
238 1
        assert response.status_code == 200
239 1
        data = response.json()
240 1
        assert len(data) == 1
241 1
        assert data[0]["id"] == evc_id
242 1
        assert data[0]["compare_reason"] == ["missing_some_int_flows"]
243 1
        assert data[0]["name"] == "evc"
244
245 1
    async def test_delete_proxy_port_metadata(self, monkeypatch) -> None:
246
        """Test delete proxy_port metadata."""
247 1
        api_mock = AsyncMock()
248 1
        monkeypatch.setattr(
249
            "napps.kytos.telemetry_int.main.api",
250
            api_mock,
251
        )
252 1
        intf_id, port_number = "00:00:00:00:00:00:00:01:1", 7
253 1
        endpoint = f"{self.base_endpoint}/uni/{intf_id}/proxy_port"
254 1
        self.napp.controller.get_interface_by_id = MagicMock()
255 1
        pp = MagicMock()
256 1
        pp.evc_ids = set()
257 1
        self.napp.int_manager.get_proxy_port_or_raise = MagicMock()
258 1
        self.napp.int_manager.get_proxy_port_or_raise.return_value = pp
259 1
        intf_mock = MagicMock()
260 1
        intf_mock.metadata = {"proxy_port": port_number}
261 1
        self.napp.controller.get_interface_by_id = MagicMock()
262 1
        self.napp.controller.get_interface_by_id.return_value = intf_mock
263 1
        response = await self.api_client.delete(endpoint)
264 1
        assert response.status_code == 200
265 1
        data = response.json()
266 1
        assert data == "Operation successful"
267 1
        assert api_mock.delete_proxy_port_metadata.call_count == 1
268
269 1
    async def test_delete_proxy_port_metadata_force(self, monkeypatch) -> None:
270
        """Test delete proxy_port metadata force."""
271 1
        api_mock = AsyncMock()
272 1
        monkeypatch.setattr(
273
            "napps.kytos.telemetry_int.main.api",
274
            api_mock,
275
        )
276 1
        intf_id, port_number = "00:00:00:00:00:00:00:01:1", 7
277 1
        src_id = "00:00:00:00:00:00:00:01:7"
278 1
        endpoint = f"{self.base_endpoint}/uni/{intf_id}/proxy_port"
279 1
        self.napp.controller.get_interface_by_id = MagicMock()
280 1
        pp = MagicMock()
281 1
        pp.evc_ids = set(["some_id"])
282 1
        self.napp.int_manager.unis_src[intf_id] = src_id
283 1
        self.napp.int_manager.srcs_pp[src_id] = pp
284 1
        intf_mock = MagicMock()
285 1
        intf_mock.metadata = {"proxy_port": port_number}
286 1
        self.napp.controller.get_interface_by_id = MagicMock()
287 1
        self.napp.controller.get_interface_by_id.return_value = intf_mock
288 1
        response = await self.api_client.delete(endpoint)
289 1
        assert response.status_code == 409
290 1
        data = response.json()["description"]
291 1
        assert "is in use on 1" in data
292 1
        assert not api_mock.delete_proxy_port_metadata.call_count
293
294 1
        endpoint = f"{self.base_endpoint}/uni/{intf_id}/proxy_port?force=true"
295 1
        response = await self.api_client.delete(endpoint)
296 1
        assert response.status_code == 200
297 1
        data = response.json()
298 1
        assert "Operation successful" in data
299 1
        assert api_mock.delete_proxy_port_metadata.call_count == 1
300
301 1
    async def test_delete_proxy_port_metadata_early_ret(self, monkeypatch) -> None:
302
        """Test delete proxy_port metadata early ret."""
303 1
        api_mock = AsyncMock()
304 1
        monkeypatch.setattr(
305
            "napps.kytos.telemetry_int.main.api",
306
            api_mock,
307
        )
308 1
        intf_id = "00:00:00:00:00:00:00:01:1"
309 1
        endpoint = f"{self.base_endpoint}/uni/{intf_id}/proxy_port"
310 1
        self.napp.controller.get_interface_by_id = MagicMock()
311 1
        pp = MagicMock()
312 1
        pp.evc_ids = set(["some_id"])
313 1
        self.napp.int_manager.get_proxy_port_or_raise = MagicMock()
314 1
        self.napp.int_manager.get_proxy_port_or_raise.return_value = pp
315 1
        intf_mock = MagicMock()
316 1
        intf_mock.metadata = {}
317 1
        self.napp.controller.get_interface_by_id = MagicMock()
318 1
        self.napp.controller.get_interface_by_id.return_value = intf_mock
319 1
        response = await self.api_client.delete(endpoint)
320 1
        assert response.status_code == 200
321 1
        data = response.json()
322 1
        assert "Operation successful" in data
323 1
        assert not api_mock.delete_proxy_port_metadata.call_count
324
325 1
    async def test_add_proxy_port_metadata(self, monkeypatch) -> None:
326
        """Test add proxy_port metadata."""
327 1
        api_mock = AsyncMock()
328 1
        monkeypatch.setattr(
329
            "napps.kytos.telemetry_int.main.api",
330
            api_mock,
331
        )
332 1
        intf_id, port_number = "00:00:00:00:00:00:00:01:1", 7
333 1
        endpoint = f"{self.base_endpoint}/uni/{intf_id}/proxy_port/{port_number}"
334 1
        self.napp.controller.get_interface_by_id = MagicMock()
335 1
        pp = MagicMock()
336 1
        pp.status = EntityStatus.UP
337 1
        self.napp.int_manager.get_proxy_port_or_raise = MagicMock()
338 1
        self.napp.int_manager.get_proxy_port_or_raise.return_value = pp
339 1
        response = await self.api_client.post(endpoint)
340 1
        assert response.status_code == 200
341 1
        data = response.json()
342 1
        assert data == "Operation successful"
343 1
        assert api_mock.add_proxy_port_metadata.call_count == 1
344
345 1
    async def test_add_proxy_port_metadata_early_ret(self, monkeypatch) -> None:
346
        """Test add proxy_port metadata early ret."""
347 1
        api_mock = AsyncMock()
348 1
        monkeypatch.setattr(
349
            "napps.kytos.telemetry_int.main.api",
350
            api_mock,
351
        )
352 1
        intf_id, port_number = "00:00:00:00:00:00:00:01:1", 7
353 1
        endpoint = f"{self.base_endpoint}/uni/{intf_id}/proxy_port/{port_number}"
354 1
        intf_mock = MagicMock()
355 1
        intf_mock.metadata = {"proxy_port": port_number}
356 1
        self.napp.controller.get_interface_by_id = MagicMock()
357 1
        self.napp.controller.get_interface_by_id.return_value = intf_mock
358 1
        response = await self.api_client.post(endpoint)
359 1
        assert response.status_code == 200
360 1
        data = response.json()
361 1
        assert data == "Operation successful"
362 1
        assert not api_mock.add_proxy_port_metadata.call_count
363
364 1
    async def test_add_proxy_port_metadata_conflict(self, monkeypatch) -> None:
365
        """Test add proxy_port metadata conflict."""
366 1
        api_mock = AsyncMock()
367 1
        monkeypatch.setattr(
368
            "napps.kytos.telemetry_int.main.api",
369
            api_mock,
370
        )
371 1
        intf_id, port_number = "00:00:00:00:00:00:00:01:1", 7
372 1
        endpoint = f"{self.base_endpoint}/uni/{intf_id}/proxy_port/{port_number}"
373 1
        self.napp.controller.get_interface_by_id = MagicMock()
374 1
        pp = MagicMock()
375 1
        pp.status = EntityStatus.UP
376 1
        self.napp.int_manager.get_proxy_port_or_raise = MagicMock()
377 1
        self.napp.int_manager.get_proxy_port_or_raise.side_effect = ProxyPortShared(
378
            "no_evc_id", "boom"
379
        )
380 1
        response = await self.api_client.post(endpoint)
381 1
        assert response.status_code == 409
382
383 1
    async def test_add_proxy_port_metadata_force(self, monkeypatch) -> None:
384
        """Test add proxy_port metadata force."""
385 1
        api_mock = AsyncMock()
386 1
        monkeypatch.setattr(
387
            "napps.kytos.telemetry_int.main.api",
388
            api_mock,
389
        )
390 1
        intf_id, port_number = "00:00:00:00:00:00:00:01:1", 7
391 1
        force = "true"
392 1
        endpoint = (
393
            f"{self.base_endpoint}/uni/{intf_id}/proxy_port/{port_number}?force={force}"
394
        )
395 1
        self.napp.controller.get_interface_by_id = MagicMock()
396 1
        pp = MagicMock()
397
        # despite proxy port down, with force true the request shoudl succeed
398 1
        pp.status = EntityStatus.DOWN
399 1
        self.napp.int_manager.get_proxy_port_or_raise = MagicMock()
400 1
        self.napp.int_manager.get_proxy_port_or_raise.return_value = pp
401 1
        response = await self.api_client.post(endpoint)
402 1
        assert response.status_code == 200
403 1
        data = response.json()
404 1
        assert data == "Operation successful"
405 1
        assert api_mock.add_proxy_port_metadata.call_count == 1
406
407 1
        force = "false"
408 1
        endpoint = (
409
            f"{self.base_endpoint}/uni/{intf_id}/proxy_port/{port_number}?force={force}"
410
        )
411 1
        response = await self.api_client.post(endpoint)
412 1
        assert response.status_code == 409
413 1
        assert "isn't UP" in response.json()["description"]
414
415 1
    async def test_list_proxy_port(self) -> None:
416
        """Test list proxy port."""
417 1
        endpoint = f"{self.base_endpoint}/uni/proxy_port"
418 1
        response = await self.api_client.get(endpoint)
419 1
        assert response.status_code == 200
420 1
        data = response.json()
421 1
        assert not data
422
423 1
        sw1, intf_mock = MagicMock(), MagicMock()
424 1
        intf_mock.metadata = {"proxy_port": 1}
425 1
        intf_mock.status.value = "UP"
426 1
        intf_mock.id = "1"
427 1
        sw1.interfaces = {"intf1": intf_mock}
428 1
        self.napp.controller.switches = {"sw1": sw1}
429 1
        response = await self.api_client.get(endpoint)
430 1
        assert response.status_code == 200
431 1
        data = response.json()
432 1
        expected = [
433
            {
434
                "proxy_port": {
435
                    "port_number": 1,
436
                    "status": "DOWN",
437
                    "status_reason": ["UNI interface 1 not found"],
438
                },
439
                "uni": {"id": "1", "status": "UP", "status_reason": []},
440
            }
441
        ]
442 1
        assert data == expected
443
444 1
        pp = MagicMock()
445 1
        self.napp.int_manager.get_proxy_port_or_raise = MagicMock()
446 1
        self.napp.int_manager.get_proxy_port_or_raise.return_value = pp
447 1
        pp.status.value = "UP"
448
449 1
        response = await self.api_client.get(endpoint)
450 1
        assert response.status_code == 200
451 1
        data = response.json()
452 1
        expected = [
453
            {
454
                "proxy_port": {
455
                    "port_number": 1,
456
                    "status": "UP",
457
                    "status_reason": [],
458
                },
459
                "uni": {"id": "1", "status": "UP", "status_reason": []},
460
            }
461
        ]
462 1
        assert data == expected
463
464 1
    async def test_on_table_enabled(self) -> None:
465
        """Test on_table_enabled."""
466 1
        assert self.napp.int_manager.flow_builder.table_group == {"evpl": 2, "epl": 3}
467 1
        await self.napp.on_table_enabled(
468
            KytosEvent(content={"telemetry_int": {"evpl": 22, "epl": 33}})
469
        )
470 1
        assert self.napp.int_manager.flow_builder.table_group == {"evpl": 22, "epl": 33}
471 1
        assert self.napp.controller.buffers.app.aput.call_count == 1
472
473 1
    async def test_on_table_enabled_no_group(self) -> None:
474
        """Test on_table_enabled no group."""
475 1
        await self.napp.on_table_enabled(
476
            KytosEvent(content={"mef_eline": {"evpl": 22, "epl": 33}})
477
        )
478 1
        assert not self.napp.controller.buffers.app.aput.call_count
479
480 1
    async def test_on_evc_deployed(self) -> None:
481
        """Test on_evc_deployed."""
482 1
        content = {"metadata": {"telemetry_request": {}}, "id": "some_id"}
483 1
        self.napp.int_manager.redeploy_int = AsyncMock()
484 1
        self.napp.int_manager.enable_int = AsyncMock()
485 1
        await self.napp.on_evc_deployed(KytosEvent(content=content))
486 1
        assert self.napp.int_manager.enable_int.call_count == 1
487 1
        assert self.napp.int_manager.redeploy_int.call_count == 0
488
489 1
        content = {"metadata": {"telemetry": {"enabled": True}}, "id": "some_id"}
490 1
        await self.napp.on_evc_deployed(KytosEvent(content=content))
491 1
        assert self.napp.int_manager.enable_int.call_count == 1
492 1
        assert self.napp.int_manager.redeploy_int.call_count == 1
493
494 1
    async def test_on_evc_deployed_error(self, monkeypatch) -> None:
495
        """Test on_evc_deployed error."""
496 1
        content = {"metadata": {"telemetry_request": {}}, "id": "some_id"}
497 1
        self.napp.int_manager.enable_int = AsyncMock()
498 1
        self.napp.int_manager.enable_int.side_effect = EVCError("no_id", "boom")
499 1
        log_mock = MagicMock()
500 1
        monkeypatch.setattr("napps.kytos.telemetry_int.main.log", log_mock)
501 1
        await self.napp.on_evc_deployed(KytosEvent(content=content))
502 1
        assert log_mock.error.call_count == 1
503
504 1
    async def test_on_evc_deleted(self) -> None:
505
        """Test on_evc_deleted."""
506 1
        content = {"metadata": {"telemetry": {"enabled": True}}, "id": "some_id"}
507 1
        self.napp.int_manager.disable_int = AsyncMock()
508 1
        await self.napp.on_evc_deleted(KytosEvent(content=content))
509 1
        assert self.napp.int_manager.disable_int.call_count == 1
510
511 1
    async def test_on_uni_active_updated(self, monkeypatch) -> None:
512
        """Test on UNI active updated."""
513 1
        api_mock = AsyncMock()
514 1
        monkeypatch.setattr(
515
            "napps.kytos.telemetry_int.main.api",
516
            api_mock,
517
        )
518 1
        content = {
519
            "metadata": {"telemetry": {"enabled": True}},
520
            "id": "some_id",
521
            "active": True,
522
        }
523 1
        await self.napp.on_uni_active_updated(KytosEvent(content=content))
524 1
        assert api_mock.add_evcs_metadata.call_count == 1
525 1
        args = api_mock.add_evcs_metadata.call_args[0][1]
526 1
        assert args["telemetry"]["status"] == "UP"
527
528 1
        content["active"] = False
529 1
        await self.napp.on_uni_active_updated(KytosEvent(content=content))
530 1
        assert api_mock.add_evcs_metadata.call_count == 2
531 1
        args = api_mock.add_evcs_metadata.call_args[0][1]
532 1
        assert args["telemetry"]["status"] == "DOWN"
533
534 1
    async def test_on_evc_undeployed(self) -> None:
535
        """Test on_evc_undeployed."""
536 1
        content = {
537
            "enabled": False,
538
            "metadata": {"telemetry": {"enabled": False}},
539
            "id": "some_id",
540
        }
541 1
        self.napp.int_manager.remove_int_flows = AsyncMock()
542 1
        await self.napp.on_evc_undeployed(KytosEvent(content=content))
543 1
        assert self.napp.int_manager.remove_int_flows.call_count == 0
544
545 1
        content["metadata"]["telemetry"]["enabled"] = True
546 1
        await self.napp.on_evc_undeployed(KytosEvent(content=content))
547 1
        assert self.napp.int_manager.remove_int_flows.call_count == 1
548
549 1
    async def test_on_evc_redeployed_link(self) -> None:
550
        """Test on redeployed_link_down|redeployed_link_up."""
551 1
        content = {
552
            "enabled": True,
553
            "metadata": {"telemetry": {"enabled": False}},
554
            "id": "some_id",
555
        }
556 1
        self.napp.int_manager.redeploy_int = AsyncMock()
557 1
        await self.napp.on_evc_redeployed_link(KytosEvent(content=content))
558 1
        assert self.napp.int_manager.redeploy_int.call_count == 0
559
560 1
        content["metadata"]["telemetry"]["enabled"] = True
561 1
        await self.napp.on_evc_redeployed_link(KytosEvent(content=content))
562 1
        assert self.napp.int_manager.redeploy_int.call_count == 1
563
564 1
    async def test_on_evc_redeployed_link_error(self, monkeypatch) -> None:
565
        """Test on redeployed_link_down|redeployed_link_up error."""
566 1
        content = {
567
            "enabled": True,
568
            "metadata": {"telemetry": {"enabled": True}},
569
            "id": "some_id",
570
        }
571 1
        log_mock = MagicMock()
572 1
        monkeypatch.setattr("napps.kytos.telemetry_int.main.log", log_mock)
573 1
        self.napp.int_manager.redeploy_int = AsyncMock()
574 1
        self.napp.int_manager.redeploy_int.side_effect = EVCError("no_id", "boom")
575 1
        await self.napp.on_evc_redeployed_link(KytosEvent(content=content))
576 1
        assert log_mock.error.call_count == 1
577
578 1
    async def test_on_evc_error_redeployed_link_down(self) -> None:
579
        """Test error_redeployed_link_down."""
580 1
        content = {
581
            "enabled": True,
582
            "metadata": {"telemetry": {"enabled": False}},
583
            "id": "some_id",
584
        }
585 1
        self.napp.int_manager.remove_int_flows = AsyncMock()
586 1
        await self.napp.on_evc_error_redeployed_link_down(KytosEvent(content=content))
587 1
        assert self.napp.int_manager.remove_int_flows.call_count == 0
588
589 1
        content["metadata"]["telemetry"]["enabled"] = True
590 1
        await self.napp.on_evc_error_redeployed_link_down(KytosEvent(content=content))
591 1
        assert self.napp.int_manager.remove_int_flows.call_count == 1
592
593 1
    async def test_on_link_down(self) -> None:
594
        """Test on link_down."""
595 1
        self.napp.int_manager.handle_pp_link_down = AsyncMock()
596 1
        await self.napp.on_link_down(KytosEvent(content={"link": MagicMock()}))
597 1
        assert self.napp.int_manager.handle_pp_link_down.call_count == 1
598
599 1
    async def test_on_link_up(self) -> None:
600
        """Test on link_up."""
601 1
        self.napp.int_manager.handle_pp_link_up = AsyncMock()
602 1
        await self.napp.on_link_up(KytosEvent(content={"link": MagicMock()}))
603 1
        assert self.napp.int_manager.handle_pp_link_up.call_count == 1
604
605 1
    async def test_on_table_enabled_error(self, monkeypatch) -> None:
606
        """Test on_table_enabled error case."""
607 1
        assert self.napp.int_manager.flow_builder.table_group == {"evpl": 2, "epl": 3}
608 1
        log_mock = MagicMock()
609 1
        monkeypatch.setattr("napps.kytos.telemetry_int.main.log", log_mock)
610 1
        await self.napp.on_table_enabled(
611
            KytosEvent(content={"telemetry_int": {"invalid": 1}})
612
        )
613 1
        assert self.napp.int_manager.flow_builder.table_group == {"evpl": 2, "epl": 3}
614 1
        assert log_mock.error.call_count == 1
615 1
        assert not self.napp.controller.buffers.app.aput.call_count
616
617 1
    async def test_on_flow_mod_error(self, monkeypatch) -> None:
618
        """Test on_flow_mod_error."""
619 1
        api_mock_main, api_mock_int, flow = AsyncMock(), AsyncMock(), MagicMock()
620 1
        flow.cookie = 0xA800000000000001
621 1
        monkeypatch.setattr(
622
            "napps.kytos.telemetry_int.main.api",
623
            api_mock_main,
624
        )
625 1
        monkeypatch.setattr(
626
            "napps.kytos.telemetry_int.managers.int.api",
627
            api_mock_int,
628
        )
629 1
        cookie = utils.get_id_from_cookie(flow.cookie)
630 1
        api_mock_main.get_evc.return_value = {
631
            cookie: {"metadata": {"telemetry": {"enabled": True}}}
632
        }
633 1
        api_mock_int.get_stored_flows.return_value = {cookie: [MagicMock()]}
634 1
        self.napp.int_manager._remove_int_flows_by_cookies = AsyncMock()
635
636 1
        event = KytosEvent(content={"flow": flow, "error_command": "add"})
637 1
        await self.napp.on_flow_mod_error(event)
638
639 1
        assert api_mock_main.get_evc.call_count == 1
640 1
        assert api_mock_int.get_stored_flows.call_count == 1
641 1
        assert api_mock_int.add_evcs_metadata.call_count == 1
642 1
        assert self.napp.int_manager._remove_int_flows_by_cookies.call_count == 1
643
644 1
    async def test_on_mef_eline_evcs_loaded(self):
645
        """Test on_mef_eline_evcs_loaded."""
646 1
        evcs = {"1": {}, "2": {}}
647 1
        event = KytosEvent(content=evcs)
648 1
        self.napp.int_manager = MagicMock()
649 1
        await self.napp.on_mef_eline_evcs_loaded(event)
650 1
        self.napp.int_manager.load_uni_src_proxy_ports.assert_called_with(evcs)
651
652 1
    async def test_on_intf_metadata_remove(self):
653
        """Test on_intf_metadata_removed."""
654 1
        intf = MagicMock()
655 1
        event = KytosEvent(content={"interface": intf})
656 1
        self.napp.int_manager = MagicMock()
657 1
        await self.napp.on_intf_metadata_removed(event)
658 1
        self.napp.int_manager.handle_pp_metadata_removed.assert_called_with(intf)
659
660 1
    async def test_on_intf_metadata_added(self):
661
        """Test on_intf_metadata_added."""
662 1
        intf = MagicMock()
663 1
        event = KytosEvent(content={"interface": intf})
664 1
        self.napp.int_manager = MagicMock()
665 1
        await self.napp.on_intf_metadata_added(event)
666 1
        self.napp.int_manager.handle_pp_metadata_added.assert_called_with(intf)
667
668 1
    async def test_on_failover_deployed(self):
669
        """Test on_failover_deployed."""
670 1
        event = KytosEvent(content={})
671 1
        self.napp.int_manager = MagicMock()
672 1
        await self.napp.on_failover_deployed(event)
673 1
        self.napp.int_manager.handle_failover_flows.assert_called()
674
675 1
    async def test_on_failover_link_down(self):
676
        """Test on_failover_link_down."""
677 1
        event = KytosEvent(content={})
678 1
        self.napp.int_manager = MagicMock()
679 1
        await self.napp.on_failover_link_down(event)
680 1
        self.napp.int_manager.handle_failover_flows.assert_called()
681
682 1
    async def test_on_failover_old_path(self):
683
        """Test on_failover_old_path."""
684 1
        event = KytosEvent(content={})
685 1
        self.napp.int_manager = MagicMock()
686 1
        await self.napp.on_failover_old_path(event)
687
        self.napp.int_manager.handle_failover_flows.assert_called()
688