test_main.TestMain.test_on_failover_link_down()   A
last analyzed

Complexity

Conditions 1

Size

Total Lines 6
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 5
CRAP Score 1

Importance

Changes 0
Metric Value
cc 1
eloc 5
nop 1
dl 0
loc 6
ccs 5
cts 5
cp 1
crap 1
rs 10
c 0
b 0
f 0
1
"""Test Main methods."""
2
3 1
from unittest.mock import AsyncMock, MagicMock, patch
4
5 1
import pytest
6 1
from napps.kytos.telemetry_int import utils
7 1
from napps.kytos.telemetry_int.exceptions import EVCError, ProxyPortShared
8 1
from napps.kytos.telemetry_int.main import Main
9
10 1
from kytos.core.common import EntityStatus
11 1
from kytos.core.events import KytosEvent
12 1
from kytos.lib.helpers import get_controller_mock, get_test_client
13
14
15 1
class TestMain:
16
    """Tests for the Main class."""
17
18 1
    def setup_method(self):
19
        """Setup."""
20 1
        patch("kytos.core.helpers.run_on_thread", lambda x: x).start()
21
        # pylint: disable=import-outside-toplevel
22 1
        controller = get_controller_mock()
23 1
        self.napp = Main(controller)
24 1
        self.api_client = get_test_client(controller, self.napp)
25 1
        self.base_endpoint = "kytos/telemetry_int/v1"
26
27 1 View Code Duplication
    async def test_enable_telemetry(self, monkeypatch) -> None:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
28
        """Test enable telemetry."""
29 1
        api_mock, flow = AsyncMock(), MagicMock()
30 1
        flow.cookie = 0xA800000000000001
31 1
        monkeypatch.setattr(
32
            "napps.kytos.telemetry_int.main.api",
33
            api_mock,
34
        )
35
36 1
        evc_id = utils.get_id_from_cookie(flow.cookie)
37 1
        api_mock.get_evcs.return_value = {
38
            evc_id: {"metadata": {"telemetry": {"enabled": False}}}
39
        }
40
41 1
        self.napp.int_manager = AsyncMock()
42
43 1
        endpoint = f"{self.base_endpoint}/evc/enable"
44 1
        response = await self.api_client.post(endpoint, json={"evc_ids": [evc_id]})
45 1
        assert self.napp.int_manager.enable_int.call_count == 1
46
47 1
        enable_int_args = self.napp.int_manager.enable_int.call_args
48
        # evcs arg
49 1
        assert evc_id in enable_int_args[0][0]
50
        # assert the other args
51 1
        assert enable_int_args[1] == {
52
            "force": False,
53
            "proxy_port_enabled": None,
54
            "set_proxy_port_metadata": True,
55
        }
56
57 1
        assert response.status_code == 201
58 1
        assert response.json() == [evc_id]
59
60 1 View Code Duplication
    async def test_enable_telemetry_wrong_types(self, monkeypatch) -> None:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
61
        """Test enable telemetry wrong types."""
62 1
        api_mock, flow = AsyncMock(), MagicMock()
63 1
        flow.cookie = 0xA800000000000001
64 1
        monkeypatch.setattr(
65
            "napps.kytos.telemetry_int.main.api",
66
            api_mock,
67
        )
68 1
        evc_id = utils.get_id_from_cookie(flow.cookie)
69 1
        api_mock.get_evcs.return_value = {
70
            evc_id: {"metadata": {"telemetry": {"enabled": False}}}
71
        }
72
73 1
        endpoint = f"{self.base_endpoint}/evc/enable"
74 1
        response = await self.api_client.post(
75
            endpoint, json={"evc_ids": [evc_id], "proxy_port_enabled": 1}
76
        )
77 1
        assert response.status_code == 400
78 1
        assert (
79
            "1 is not of type 'boolean' for field proxy_port_enabled"
80
            in response.json()["description"]
81
        )
82
83 1
        endpoint = f"{self.base_endpoint}/evc/enable"
84 1
        response = await self.api_client.post(
85
            endpoint, json={"evc_ids": [evc_id], "force": 2}
86
        )
87 1
        assert response.status_code == 400
88 1
        assert (
89
            "2 is not of type 'boolean' for field force"
90
            in response.json()["description"]
91
        )
92
93 1 View Code Duplication
    async def test_redeploy_telemetry_enabled(self, monkeypatch) -> None:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
94
        """Test redeploy telemetry enabled."""
95 1
        api_mock, flow = AsyncMock(), MagicMock()
96 1
        flow.cookie = 0xA800000000000001
97 1
        monkeypatch.setattr(
98
            "napps.kytos.telemetry_int.main.api",
99
            api_mock,
100
        )
101
102 1
        evc_id = utils.get_id_from_cookie(flow.cookie)
103 1
        api_mock.get_evc.return_value = {
104
            evc_id: {"metadata": {"telemetry": {"enabled": True}}}
105
        }
106
107 1
        self.napp.int_manager = AsyncMock()
108
109 1
        endpoint = f"{self.base_endpoint}/evc/redeploy"
110 1
        response = await self.api_client.patch(endpoint, json={"evc_ids": [evc_id]})
111 1
        assert self.napp.int_manager.redeploy_int.call_count == 1
112 1
        assert response.status_code == 201
113 1
        assert response.json() == [evc_id]
114
115 1
    async def test_redeploy_telemetry_not_enabled(self, monkeypatch) -> None:
116
        """Test redeploy telemetry not enabled."""
117 1
        api_mock, flow, api_mngr_mock = AsyncMock(), MagicMock(), AsyncMock()
118 1
        flow.cookie = 0xA800000000000001
119 1
        monkeypatch.setattr(
120
            "napps.kytos.telemetry_int.main.api",
121
            api_mock,
122
        )
123 1
        monkeypatch.setattr(
124
            "napps.kytos.telemetry_int.managers.int.api",
125
            api_mngr_mock,
126
        )
127
128 1
        evc_id = utils.get_id_from_cookie(flow.cookie)
129 1
        api_mock.get_evc.return_value = {
130
            evc_id: {"metadata": {"telemetry": {"enabled": False}}}
131
        }
132
133 1
        self.napp.int_manager._validate_map_enable_evcs = MagicMock()
134 1
        self.napp.int_manager._remove_int_flows_by_cookies = AsyncMock()
135 1
        self.napp.int_manager.install_int_flows = AsyncMock()
136 1
        endpoint = f"{self.base_endpoint}/evc/redeploy"
137 1
        response = await self.api_client.patch(endpoint, json={"evc_ids": [evc_id]})
138 1
        assert response.status_code == 409
139 1
        assert "isn't enabled" in response.json()["description"]
140
141 1
    @pytest.mark.parametrize("route", ["/evc/enable", "/evc/disable"])
142 1
    async def test_en_dis_openapi_validation(self, route: str) -> None:
143
        """Test OpenAPI enable/disable basic validation."""
144 1
        endpoint = f"{self.base_endpoint}{route}"
145
        # wrong evc_ids payload data type
146 1
        response = await self.api_client.post(endpoint, json={"evc_ids": 1})
147 1
        assert response.status_code == 400
148 1
        assert "evc_ids" in response.json()["description"]
149
150 1 View Code Duplication
    async def test_disable_telemetry(self, monkeypatch) -> None:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
151
        """Test disable telemetry."""
152 1
        api_mock, flow = AsyncMock(), MagicMock()
153 1
        flow.cookie = 0xA800000000000001
154 1
        monkeypatch.setattr(
155
            "napps.kytos.telemetry_int.main.api",
156
            api_mock,
157
        )
158
159 1
        evc_id = utils.get_id_from_cookie(flow.cookie)
160 1
        api_mock.get_evcs.return_value = {
161
            evc_id: {"metadata": {"telemetry": {"enabled": True}}}
162
        }
163
164 1
        self.napp.int_manager = AsyncMock()
165
166 1
        endpoint = f"{self.base_endpoint}/evc/disable"
167 1
        response = await self.api_client.post(endpoint, json={"evc_ids": [evc_id]})
168 1
        assert self.napp.int_manager.disable_int.call_count == 1
169 1
        assert response.status_code == 200
170 1
        assert response.json() == [evc_id]
171
172 1
    async def test_get_enabled_evcs(self, monkeypatch) -> None:
173
        """Test get enabled evcs."""
174 1
        api_mock, flow = AsyncMock(), MagicMock()
175 1
        flow.cookie = 0xA800000000000001
176 1
        monkeypatch.setattr(
177
            "napps.kytos.telemetry_int.main.api",
178
            api_mock,
179
        )
180
181 1
        evc_id = utils.get_id_from_cookie(flow.cookie)
182 1
        api_mock.get_evcs.return_value = {
183
            evc_id: {"metadata": {"telemetry": {"enabled": True}}},
184
        }
185
186 1
        endpoint = f"{self.base_endpoint}/evc"
187 1
        response = await self.api_client.get(endpoint)
188 1
        assert api_mock.get_evcs.call_args[1] == {"metadata.telemetry.enabled": "true"}
189 1
        assert response.status_code == 200
190 1
        data = response.json()
191 1
        assert len(data) == 1
192 1
        assert evc_id in data
193
194 1
    async def test_get_evc_compare(self, monkeypatch) -> None:
195
        """Test get evc compre ok case."""
196 1
        api_mock, flow = AsyncMock(), MagicMock()
197 1
        flow.cookie = 0xA800000000000001
198 1
        evc_id = "1"
199 1
        monkeypatch.setattr(
200
            "napps.kytos.telemetry_int.main.api",
201
            api_mock,
202
        )
203
204 1
        evc_id = utils.get_id_from_cookie(flow.cookie)
205 1
        api_mock.get_evcs.return_value = {
206
            evc_id: {
207
                "id": evc_id,
208
                "name": "evc",
209
                "metadata": {"telemetry": {"enabled": True}},
210
            },
211
        }
212 1
        api_mock.get_stored_flows.side_effect = [
213
            {flow.cookie: [{"id": "some_1", "match": {"in_port": 1}}]},
214
            {flow.cookie: [{"id": "some_2", "match": {"in_port": 1}}]},
215
        ]
216
217 1
        endpoint = f"{self.base_endpoint}/evc/compare"
218 1
        response = await self.api_client.get(endpoint)
219 1
        assert response.status_code == 200
220 1
        data = response.json()
221 1
        assert len(data) == 0
222
223 1
    async def test_get_evc_compare_wrong_metadata(self, monkeypatch) -> None:
224
        """Test get evc compre wrong_metadata_has_int_flows case."""
225 1
        api_mock, flow = AsyncMock(), MagicMock()
226 1
        flow.cookie = 0xA800000000000001
227 1
        evc_id = "1"
228 1
        monkeypatch.setattr(
229
            "napps.kytos.telemetry_int.main.api",
230
            api_mock,
231
        )
232
233 1
        evc_id = utils.get_id_from_cookie(flow.cookie)
234 1
        api_mock.get_evcs.return_value = {
235
            evc_id: {"id": evc_id, "name": "evc", "metadata": {}},
236
        }
237 1
        api_mock.get_stored_flows.side_effect = [
238
            {flow.cookie: [{"id": "some_1", "match": {"in_port": 1}}]},
239
            {flow.cookie: [{"id": "some_2", "match": {"in_port": 1}}]},
240
        ]
241
242 1
        endpoint = f"{self.base_endpoint}/evc/compare"
243 1
        response = await self.api_client.get(endpoint)
244 1
        assert response.status_code == 200
245 1
        data = response.json()
246 1
        assert len(data) == 1
247 1
        assert data[0]["id"] == evc_id
248 1
        assert data[0]["compare_reason"] == ["wrong_metadata_has_int_flows"]
249 1
        assert data[0]["name"] == "evc"
250
251 1
    async def test_get_evc_compare_missing_some_int_flows(self, monkeypatch) -> None:
252
        """Test get evc compre missing_some_int_flows case."""
253 1
        api_mock, flow = AsyncMock(), MagicMock()
254 1
        flow.cookie = 0xA800000000000001
255 1
        evc_id = "1"
256 1
        monkeypatch.setattr(
257
            "napps.kytos.telemetry_int.main.api",
258
            api_mock,
259
        )
260
261 1
        evc_id = utils.get_id_from_cookie(flow.cookie)
262 1
        api_mock.get_evcs.return_value = {
263
            evc_id: {
264
                "id": evc_id,
265
                "name": "evc",
266
                "metadata": {"telemetry": {"enabled": True}},
267
            },
268
        }
269 1
        api_mock.get_stored_flows.side_effect = [
270
            {flow.cookie: [{"id": "some_1", "match": {"in_port": 1}}]},
271
            {
272
                flow.cookie: [
273
                    {"id": "some_2", "match": {"in_port": 1}},
274
                    {"id": "some_3", "match": {"in_port": 1}},
275
                ]
276
            },
277
        ]
278
279 1
        endpoint = f"{self.base_endpoint}/evc/compare"
280 1
        response = await self.api_client.get(endpoint)
281 1
        assert response.status_code == 200
282 1
        data = response.json()
283 1
        assert len(data) == 1
284 1
        assert data[0]["id"] == evc_id
285 1
        assert data[0]["compare_reason"] == ["missing_some_int_flows"]
286 1
        assert data[0]["name"] == "evc"
287
288 1
    async def test_delete_proxy_port_metadata(self, monkeypatch) -> None:
289
        """Test delete proxy_port metadata."""
290 1
        api_mock = AsyncMock()
291 1
        monkeypatch.setattr(
292
            "napps.kytos.telemetry_int.main.api",
293
            api_mock,
294
        )
295 1
        intf_id, port_number = "00:00:00:00:00:00:00:01:1", 7
296 1
        endpoint = f"{self.base_endpoint}/uni/{intf_id}/proxy_port"
297 1
        self.napp.controller.get_interface_by_id = MagicMock()
298 1
        pp = MagicMock()
299 1
        pp.evc_ids = set()
300 1
        self.napp.int_manager.get_proxy_port_or_raise = MagicMock()
301 1
        self.napp.int_manager.get_proxy_port_or_raise.return_value = pp
302 1
        intf_mock = MagicMock()
303 1
        intf_mock.metadata = {"proxy_port": port_number}
304 1
        self.napp.controller.get_interface_by_id = MagicMock()
305 1
        self.napp.controller.get_interface_by_id.return_value = intf_mock
306 1
        response = await self.api_client.delete(endpoint)
307 1
        assert response.status_code == 200
308 1
        data = response.json()
309 1
        assert data == "Operation successful"
310 1
        assert api_mock.delete_proxy_port_metadata.call_count == 1
311
312 1
    async def test_delete_proxy_port_metadata_force(self, monkeypatch) -> None:
313
        """Test delete proxy_port metadata force."""
314 1
        api_mock = AsyncMock()
315 1
        monkeypatch.setattr(
316
            "napps.kytos.telemetry_int.main.api",
317
            api_mock,
318
        )
319 1
        intf_id, port_number = "00:00:00:00:00:00:00:01:1", 7
320 1
        src_id = "00:00:00:00:00:00:00:01:7"
321 1
        endpoint = f"{self.base_endpoint}/uni/{intf_id}/proxy_port"
322 1
        self.napp.controller.get_interface_by_id = MagicMock()
323 1
        pp = MagicMock()
324 1
        pp.evc_ids = set(["some_id"])
325 1
        self.napp.int_manager.unis_src[intf_id] = src_id
326 1
        self.napp.int_manager.srcs_pp[src_id] = pp
327 1
        intf_mock = MagicMock()
328 1
        intf_mock.metadata = {"proxy_port": port_number}
329 1
        self.napp.controller.get_interface_by_id = MagicMock()
330 1
        self.napp.controller.get_interface_by_id.return_value = intf_mock
331 1
        response = await self.api_client.delete(endpoint)
332 1
        assert response.status_code == 409
333 1
        data = response.json()["description"]
334 1
        assert "is in use on 1" in data
335 1
        assert not api_mock.delete_proxy_port_metadata.call_count
336
337 1
        endpoint = f"{self.base_endpoint}/uni/{intf_id}/proxy_port?force=true"
338 1
        response = await self.api_client.delete(endpoint)
339 1
        assert response.status_code == 200
340 1
        data = response.json()
341 1
        assert "Operation successful" in data
342 1
        assert api_mock.delete_proxy_port_metadata.call_count == 1
343
344 1
    async def test_delete_proxy_port_metadata_early_ret(self, monkeypatch) -> None:
345
        """Test delete proxy_port metadata early ret."""
346 1
        api_mock = AsyncMock()
347 1
        monkeypatch.setattr(
348
            "napps.kytos.telemetry_int.main.api",
349
            api_mock,
350
        )
351 1
        intf_id = "00:00:00:00:00:00:00:01:1"
352 1
        endpoint = f"{self.base_endpoint}/uni/{intf_id}/proxy_port"
353 1
        self.napp.controller.get_interface_by_id = MagicMock()
354 1
        pp = MagicMock()
355 1
        pp.evc_ids = set(["some_id"])
356 1
        self.napp.int_manager.get_proxy_port_or_raise = MagicMock()
357 1
        self.napp.int_manager.get_proxy_port_or_raise.return_value = pp
358 1
        intf_mock = MagicMock()
359 1
        intf_mock.metadata = {}
360 1
        self.napp.controller.get_interface_by_id = MagicMock()
361 1
        self.napp.controller.get_interface_by_id.return_value = intf_mock
362 1
        response = await self.api_client.delete(endpoint)
363 1
        assert response.status_code == 200
364 1
        data = response.json()
365 1
        assert "Operation successful" in data
366 1
        assert not api_mock.delete_proxy_port_metadata.call_count
367
368 1
    async def test_add_proxy_port_metadata(self, monkeypatch) -> None:
369
        """Test add proxy_port metadata."""
370 1
        api_mock = AsyncMock()
371 1
        api_mock.get_evcs.return_value = {}
372 1
        monkeypatch.setattr(
373
            "napps.kytos.telemetry_int.main.api",
374
            api_mock,
375
        )
376 1
        intf_id, port_number = "00:00:00:00:00:00:00:01:1", 7
377 1
        endpoint = f"{self.base_endpoint}/uni/{intf_id}/proxy_port/{port_number}"
378 1
        self.napp.controller.get_interface_by_id = MagicMock()
379 1
        pp = MagicMock()
380 1
        pp.status = EntityStatus.UP
381 1
        self.napp.int_manager.get_proxy_port_or_raise = MagicMock()
382 1
        self.napp.int_manager.get_proxy_port_or_raise.return_value = pp
383 1
        response = await self.api_client.post(endpoint)
384 1
        assert response.status_code == 200
385 1
        data = response.json()
386 1
        assert data == "Operation successful"
387 1
        assert api_mock.add_proxy_port_metadata.call_count == 1
388
389 1
    async def test_add_proxy_port_metadata_early_ret(self, monkeypatch) -> None:
390
        """Test add proxy_port metadata early ret."""
391 1
        api_mock = AsyncMock()
392 1
        monkeypatch.setattr(
393
            "napps.kytos.telemetry_int.main.api",
394
            api_mock,
395
        )
396 1
        intf_id, port_number = "00:00:00:00:00:00:00:01:1", 7
397 1
        endpoint = f"{self.base_endpoint}/uni/{intf_id}/proxy_port/{port_number}"
398 1
        intf_mock = MagicMock()
399 1
        intf_mock.metadata = {"proxy_port": port_number}
400 1
        self.napp.controller.get_interface_by_id = MagicMock()
401 1
        self.napp.controller.get_interface_by_id.return_value = intf_mock
402 1
        response = await self.api_client.post(endpoint)
403 1
        assert response.status_code == 200
404 1
        data = response.json()
405 1
        assert data == "Operation successful"
406 1
        assert not api_mock.add_proxy_port_metadata.call_count
407
408 1
    async def test_add_proxy_port_metadata_conflict(self, monkeypatch) -> None:
409
        """Test add proxy_port metadata conflict."""
410 1
        api_mock = AsyncMock()
411 1
        api_mock.get_evcs.return_value = {}
412 1
        monkeypatch.setattr(
413
            "napps.kytos.telemetry_int.main.api",
414
            api_mock,
415
        )
416 1
        intf_id, port_number = "00:00:00:00:00:00:00:01:1", 7
417 1
        endpoint = f"{self.base_endpoint}/uni/{intf_id}/proxy_port/{port_number}"
418 1
        self.napp.controller.get_interface_by_id = MagicMock()
419 1
        pp = MagicMock()
420 1
        pp.status = EntityStatus.UP
421 1
        self.napp.int_manager.get_proxy_port_or_raise = MagicMock()
422 1
        self.napp.int_manager.get_proxy_port_or_raise.side_effect = ProxyPortShared(
423
            "no_evc_id", "boom"
424
        )
425 1
        response = await self.api_client.post(endpoint)
426 1
        assert response.status_code == 409
427
428 1
    async def test_add_proxy_port_metadata_force(self, monkeypatch) -> None:
429
        """Test add proxy_port metadata force."""
430 1
        api_mock = AsyncMock()
431 1
        api_mock.get_evcs.return_value = {}
432 1
        monkeypatch.setattr(
433
            "napps.kytos.telemetry_int.main.api",
434
            api_mock,
435
        )
436 1
        intf_id, port_number = "00:00:00:00:00:00:00:01:1", 7
437 1
        force = "true"
438 1
        endpoint = (
439
            f"{self.base_endpoint}/uni/{intf_id}/proxy_port/{port_number}?force={force}"
440
        )
441 1
        self.napp.controller.get_interface_by_id = MagicMock()
442 1
        pp = MagicMock()
443
        # despite proxy port down, with force true the request shoudl succeed
444 1
        pp.status = EntityStatus.DOWN
445 1
        self.napp.int_manager.get_proxy_port_or_raise = MagicMock()
446 1
        self.napp.int_manager.get_proxy_port_or_raise.return_value = pp
447 1
        response = await self.api_client.post(endpoint)
448 1
        assert response.status_code == 200
449 1
        data = response.json()
450 1
        assert data == "Operation successful"
451 1
        assert api_mock.add_proxy_port_metadata.call_count == 1
452
453 1
        force = "false"
454 1
        endpoint = (
455
            f"{self.base_endpoint}/uni/{intf_id}/proxy_port/{port_number}?force={force}"
456
        )
457 1
        response = await self.api_client.post(endpoint)
458 1
        assert response.status_code == 409
459 1
        assert "isn't UP" in response.json()["description"]
460
461 1
    async def test_list_proxy_port(self) -> None:
462
        """Test list proxy port."""
463 1
        endpoint = f"{self.base_endpoint}/uni/proxy_port"
464 1
        response = await self.api_client.get(endpoint)
465 1
        assert response.status_code == 200
466 1
        data = response.json()
467 1
        assert not data
468
469 1
        sw1, intf_mock = MagicMock(), MagicMock()
470 1
        intf_mock.metadata = {"proxy_port": 1}
471 1
        intf_mock.status.value = "UP"
472 1
        intf_mock.id = "1"
473 1
        sw1.interfaces = {"intf1": intf_mock}
474 1
        self.napp.controller.switches = {"sw1": sw1}
475 1
        response = await self.api_client.get(endpoint)
476 1
        assert response.status_code == 200
477 1
        data = response.json()
478 1
        expected = [
479
            {
480
                "proxy_port": {
481
                    "port_number": 1,
482
                    "status": "DOWN",
483
                    "status_reason": ["UNI interface 1 not found"],
484
                },
485
                "uni": {"id": "1", "status": "UP", "status_reason": []},
486
            }
487
        ]
488 1
        assert data == expected
489
490 1
        pp = MagicMock()
491 1
        self.napp.int_manager.get_proxy_port_or_raise = MagicMock()
492 1
        self.napp.int_manager.get_proxy_port_or_raise.return_value = pp
493 1
        pp.status.value = "UP"
494
495 1
        response = await self.api_client.get(endpoint)
496 1
        assert response.status_code == 200
497 1
        data = response.json()
498 1
        expected = [
499
            {
500
                "proxy_port": {
501
                    "port_number": 1,
502
                    "status": "UP",
503
                    "status_reason": [],
504
                },
505
                "uni": {"id": "1", "status": "UP", "status_reason": []},
506
            }
507
        ]
508 1
        assert data == expected
509
510 1
    async def test_on_table_enabled(self) -> None:
511
        """Test on_table_enabled."""
512 1
        assert self.napp.int_manager.flow_builder.table_group == {"evpl": 2, "epl": 3}
513 1
        await self.napp.on_table_enabled(
514
            KytosEvent(content={"telemetry_int": {"evpl": 22, "epl": 33}})
515
        )
516 1
        assert self.napp.int_manager.flow_builder.table_group == {"evpl": 22, "epl": 33}
517 1
        assert self.napp.controller.buffers.app.aput.call_count == 1
518
519 1
    async def test_on_table_enabled_no_group(self) -> None:
520
        """Test on_table_enabled no group."""
521 1
        await self.napp.on_table_enabled(
522
            KytosEvent(content={"mef_eline": {"evpl": 22, "epl": 33}})
523
        )
524 1
        assert not self.napp.controller.buffers.app.aput.call_count
525
526 1
    async def test_on_evc_deployed(self) -> None:
527
        """Test on_evc_deployed."""
528 1
        content = {"metadata": {"telemetry_request": {}}, "id": "some_id"}
529 1
        self.napp.int_manager.redeploy_int = AsyncMock()
530 1
        self.napp.int_manager.enable_int = AsyncMock()
531 1
        await self.napp.on_evc_deployed(KytosEvent(content=content))
532 1
        assert self.napp.int_manager.enable_int.call_count == 1
533 1
        assert self.napp.int_manager.redeploy_int.call_count == 0
534
535 1
        content = {"metadata": {"telemetry": {"enabled": True}}, "id": "some_id"}
536 1
        await self.napp.on_evc_deployed(KytosEvent(content=content))
537 1
        assert self.napp.int_manager.enable_int.call_count == 1
538 1
        assert self.napp.int_manager.redeploy_int.call_count == 1
539
540 1
    async def test_on_evc_deployed_error(self, monkeypatch) -> None:
541
        """Test on_evc_deployed error."""
542 1
        content = {"metadata": {"telemetry_request": {}}, "id": "some_id"}
543 1
        self.napp.int_manager.enable_int = AsyncMock()
544 1
        self.napp.int_manager.enable_int.side_effect = EVCError("no_id", "boom")
545 1
        log_mock = MagicMock()
546 1
        monkeypatch.setattr("napps.kytos.telemetry_int.main.log", log_mock)
547 1
        api_mock = AsyncMock()
548 1
        monkeypatch.setattr(
549
            "napps.kytos.telemetry_int.main.api",
550
            api_mock,
551
        )
552 1
        await self.napp.on_evc_deployed(KytosEvent(content=content))
553 1
        assert log_mock.error.call_count == 1
554 1
        assert api_mock.add_evcs_metadata.call_count == 1
555
556 1
    async def test_on_evc_deleted(self) -> None:
557
        """Test on_evc_deleted."""
558 1
        content = {"metadata": {"telemetry": {"enabled": True}}, "id": "some_id"}
559 1
        self.napp.int_manager.disable_int = AsyncMock()
560 1
        await self.napp.on_evc_deleted(KytosEvent(content=content))
561 1
        assert self.napp.int_manager.disable_int.call_count == 1
562
563 1
    async def test_on_uni_active_updated(self, monkeypatch) -> None:
564
        """Test on UNI active updated."""
565 1
        api_mock = AsyncMock()
566 1
        monkeypatch.setattr(
567
            "napps.kytos.telemetry_int.main.api",
568
            api_mock,
569
        )
570 1
        content = {
571
            "metadata": {"telemetry": {"enabled": True}},
572
            "id": "some_id",
573
            "active": True,
574
        }
575 1
        await self.napp.on_uni_active_updated(KytosEvent(content=content))
576 1
        assert api_mock.add_evcs_metadata.call_count == 1
577 1
        args = api_mock.add_evcs_metadata.call_args[0][1]
578 1
        assert args["telemetry"]["status"] == "UP"
579
580 1
        content["active"] = False
581 1
        await self.napp.on_uni_active_updated(KytosEvent(content=content))
582 1
        assert api_mock.add_evcs_metadata.call_count == 2
583 1
        args = api_mock.add_evcs_metadata.call_args[0][1]
584 1
        assert args["telemetry"]["status"] == "DOWN"
585
586 1
    async def test_on_evc_undeployed(self) -> None:
587
        """Test on_evc_undeployed."""
588 1
        content = {
589
            "enabled": False,
590
            "metadata": {"telemetry": {"enabled": False}},
591
            "id": "some_id",
592
        }
593 1
        self.napp.int_manager.remove_int_flows = AsyncMock()
594 1
        await self.napp.on_evc_undeployed(KytosEvent(content=content))
595 1
        assert self.napp.int_manager.remove_int_flows.call_count == 0
596
597 1
        content["metadata"]["telemetry"]["enabled"] = True
598 1
        await self.napp.on_evc_undeployed(KytosEvent(content=content))
599 1
        assert self.napp.int_manager.remove_int_flows.call_count == 1
600
601 1
    async def test_on_evc_redeployed_link(self) -> None:
602
        """Test on redeployed_link_down|redeployed_link_up."""
603 1
        content = {
604
            "enabled": True,
605
            "metadata": {"telemetry": {"enabled": False}},
606
            "id": "some_id",
607
        }
608 1
        self.napp.int_manager.redeploy_int = AsyncMock()
609 1
        await self.napp.on_evc_redeployed_link(KytosEvent(content=content))
610 1
        assert self.napp.int_manager.redeploy_int.call_count == 0
611
612 1
        content["metadata"]["telemetry"]["enabled"] = True
613 1
        await self.napp.on_evc_redeployed_link(KytosEvent(content=content))
614 1
        assert self.napp.int_manager.redeploy_int.call_count == 1
615
616 1
    async def test_on_evc_redeployed_link_error(self, monkeypatch) -> None:
617
        """Test on redeployed_link_down|redeployed_link_up error."""
618 1
        content = {
619
            "enabled": True,
620
            "metadata": {"telemetry": {"enabled": True}},
621
            "id": "some_id",
622
        }
623 1
        log_mock = MagicMock()
624 1
        monkeypatch.setattr("napps.kytos.telemetry_int.main.log", log_mock)
625 1
        self.napp.int_manager.redeploy_int = AsyncMock()
626 1
        self.napp.int_manager.redeploy_int.side_effect = EVCError("no_id", "boom")
627 1
        await self.napp.on_evc_redeployed_link(KytosEvent(content=content))
628 1
        assert log_mock.error.call_count == 1
629
630 1
    async def test_on_evc_error_redeployed_link_down(self) -> None:
631
        """Test error_redeployed_link_down."""
632 1
        content = {
633
            "enabled": True,
634
            "metadata": {"telemetry": {"enabled": False}},
635
            "id": "some_id",
636
        }
637 1
        self.napp.int_manager.remove_int_flows = AsyncMock()
638 1
        await self.napp.on_evc_error_redeployed_link_down(KytosEvent(content=content))
639 1
        assert self.napp.int_manager.remove_int_flows.call_count == 0
640
641 1
        content["metadata"]["telemetry"]["enabled"] = True
642 1
        await self.napp.on_evc_error_redeployed_link_down(KytosEvent(content=content))
643 1
        assert self.napp.int_manager.remove_int_flows.call_count == 1
644
645 1
    async def test_on_link_down(self) -> None:
646
        """Test on link_down."""
647 1
        self.napp.int_manager.handle_pp_link_down = AsyncMock()
648 1
        await self.napp.on_link_down(KytosEvent(content={"link": MagicMock()}))
649 1
        assert self.napp.int_manager.handle_pp_link_down.call_count == 1
650
651 1
    async def test_on_link_up(self) -> None:
652
        """Test on link_up."""
653 1
        self.napp.int_manager.handle_pp_link_up = AsyncMock()
654 1
        await self.napp.on_link_up(KytosEvent(content={"link": MagicMock()}))
655 1
        assert self.napp.int_manager.handle_pp_link_up.call_count == 1
656
657 1
    async def test_on_table_enabled_error(self, monkeypatch) -> None:
658
        """Test on_table_enabled error case."""
659 1
        assert self.napp.int_manager.flow_builder.table_group == {"evpl": 2, "epl": 3}
660 1
        log_mock = MagicMock()
661 1
        monkeypatch.setattr("napps.kytos.telemetry_int.main.log", log_mock)
662 1
        await self.napp.on_table_enabled(
663
            KytosEvent(content={"telemetry_int": {"invalid": 1}})
664
        )
665 1
        assert self.napp.int_manager.flow_builder.table_group == {"evpl": 2, "epl": 3}
666 1
        assert log_mock.error.call_count == 1
667 1
        assert not self.napp.controller.buffers.app.aput.call_count
668
669 1
    async def test_on_flow_mod_error(self, monkeypatch) -> None:
670
        """Test on_flow_mod_error."""
671 1
        api_mock_main, api_mock_int, flow = AsyncMock(), AsyncMock(), MagicMock()
672 1
        flow.cookie = 0xA800000000000001
673 1
        monkeypatch.setattr(
674
            "napps.kytos.telemetry_int.main.api",
675
            api_mock_main,
676
        )
677 1
        monkeypatch.setattr(
678
            "napps.kytos.telemetry_int.managers.int.api",
679
            api_mock_int,
680
        )
681 1
        cookie = utils.get_id_from_cookie(flow.cookie)
682 1
        api_mock_main.get_evc.return_value = {
683
            cookie: {"metadata": {"telemetry": {"enabled": True}}}
684
        }
685 1
        api_mock_int.get_stored_flows.return_value = {cookie: [MagicMock()]}
686 1
        self.napp.int_manager._remove_int_flows_by_cookies = AsyncMock()
687
688 1
        event = KytosEvent(content={"flow": flow, "error_command": "add"})
689 1
        await self.napp.on_flow_mod_error(event)
690
691 1
        assert api_mock_main.get_evc.call_count == 1
692 1
        assert api_mock_int.get_stored_flows.call_count == 1
693 1
        assert api_mock_int.add_evcs_metadata.call_count == 1
694 1
        assert self.napp.int_manager._remove_int_flows_by_cookies.call_count == 1
695
696 1
    async def test_on_mef_eline_evcs_loaded(self):
697
        """Test on_mef_eline_evcs_loaded."""
698 1
        evcs = {"1": {}, "2": {}}
699 1
        event = KytosEvent(content=evcs)
700 1
        self.napp.int_manager = MagicMock()
701 1
        await self.napp.on_mef_eline_evcs_loaded(event)
702 1
        self.napp.int_manager.load_uni_src_proxy_ports.assert_called_with(evcs)
703
704 1
    async def test_on_intf_metadata_remove(self):
705
        """Test on_intf_metadata_removed."""
706 1
        intf = MagicMock()
707 1
        event = KytosEvent(content={"interface": intf})
708 1
        self.napp.int_manager = MagicMock()
709 1
        await self.napp.on_intf_metadata_removed(event)
710 1
        self.napp.int_manager.handle_pp_metadata_removed.assert_called_with(intf)
711
712 1
    async def test_on_intf_metadata_added(self):
713
        """Test on_intf_metadata_added."""
714 1
        intf = MagicMock()
715 1
        event = KytosEvent(content={"interface": intf})
716 1
        self.napp.int_manager = MagicMock()
717 1
        await self.napp.on_intf_metadata_added(event)
718 1
        self.napp.int_manager.handle_pp_metadata_added.assert_called_with(intf)
719
720 1
    async def test_on_failover_deployed(self):
721
        """Test on_failover_deployed."""
722 1
        event = KytosEvent(content={})
723 1
        self.napp.int_manager = MagicMock()
724 1
        await self.napp.on_failover_deployed(event)
725 1
        self.napp.int_manager.handle_failover_flows.assert_called()
726
727 1
    async def test_on_failover_link_down(self):
728
        """Test on_failover_link_down."""
729 1
        event = KytosEvent(content={})
730 1
        self.napp.int_manager = MagicMock()
731 1
        await self.napp.on_failover_link_down(event)
732 1
        self.napp.int_manager.handle_failover_flows.assert_called()
733
734 1
    async def test_on_failover_old_path(self):
735
        """Test on_failover_old_path."""
736 1
        event = KytosEvent(content={})
737 1
        self.napp.int_manager = MagicMock()
738 1
        await self.napp.on_failover_old_path(event)
739
        self.napp.int_manager.handle_failover_flows.assert_called()
740