Passed
Push — master ( 9bde40...c0a17f )
by Vinicius
02:56 queued 18s
created

test_main.TestMain.test_on_intf_metadata_added()   A

Complexity

Conditions 1

Size

Total Lines 7
Code Lines 6

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 6
CRAP Score 1

Importance

Changes 0
Metric Value
cc 1
eloc 6
nop 1
dl 0
loc 7
ccs 6
cts 6
cp 1
crap 1
rs 10
c 0
b 0
f 0
1
"""Test Main methods."""
2
3 1
import pytest
4
5 1
from unittest.mock import AsyncMock, MagicMock, patch
6 1
from napps.kytos.telemetry_int.main import Main
7 1
from napps.kytos.telemetry_int import utils
8 1
from kytos.lib.helpers import get_controller_mock, get_test_client
9 1
from kytos.core.events import KytosEvent
10
11
12 1
class TestMain:
13
    """Tests for the Main class."""
14
15 1
    def setup_method(self):
16
        """Setup."""
17 1
        patch("kytos.core.helpers.run_on_thread", lambda x: x).start()
18
        # pylint: disable=import-outside-toplevel
19 1
        controller = get_controller_mock()
20 1
        self.napp = Main(controller)
21 1
        self.api_client = get_test_client(controller, self.napp)
22 1
        self.base_endpoint = "kytos/telemetry_int/v1"
23
24 1 View Code Duplication
    async def test_enable_telemetry(self, monkeypatch) -> None:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
25
        """Test enable telemetry."""
26 1
        api_mock, flow = AsyncMock(), MagicMock()
27 1
        flow.cookie = 0xA800000000000001
28 1
        monkeypatch.setattr(
29
            "napps.kytos.telemetry_int.main.api",
30
            api_mock,
31
        )
32
33 1
        evc_id = utils.get_id_from_cookie(flow.cookie)
34 1
        api_mock.get_evcs.return_value = {
35
            evc_id: {"metadata": {"telemetry": {"enabled": False}}}
36
        }
37
38 1
        self.napp.int_manager = AsyncMock()
39
40 1
        endpoint = f"{self.base_endpoint}/evc/enable"
41 1
        response = await self.api_client.post(endpoint, json={"evc_ids": [evc_id]})
42 1
        assert self.napp.int_manager.enable_int.call_count == 1
43 1
        assert self.napp.int_manager._remove_int_flows.call_count == 1
44 1
        assert response.status_code == 201
45 1
        assert response.json() == [evc_id]
46
47 1
    async def test_redeploy_telemetry_enabled(self, monkeypatch) -> None:
48
        """Test redeploy telemetry enabled."""
49 1
        api_mock, flow = AsyncMock(), MagicMock()
50 1
        flow.cookie = 0xA800000000000001
51 1
        monkeypatch.setattr(
52
            "napps.kytos.telemetry_int.main.api",
53
            api_mock,
54
        )
55
56 1
        evc_id = utils.get_id_from_cookie(flow.cookie)
57 1
        api_mock.get_evc.return_value = {
58
            evc_id: {"metadata": {"telemetry": {"enabled": True}}}
59
        }
60
61 1
        self.napp.int_manager = AsyncMock()
62
63 1
        endpoint = f"{self.base_endpoint}/evc/redeploy"
64 1
        response = await self.api_client.patch(endpoint, json={"evc_ids": [evc_id]})
65 1
        assert self.napp.int_manager.redeploy_int.call_count == 1
66 1
        assert response.status_code == 201
67 1
        assert response.json() == [evc_id]
68
69 1
    async def test_redeploy_telemetry_not_enabled(self, monkeypatch) -> None:
70
        """Test redeploy telemetry not enabled."""
71 1
        api_mock, flow, api_mngr_mock = AsyncMock(), MagicMock(), AsyncMock()
72 1
        flow.cookie = 0xA800000000000001
73 1
        monkeypatch.setattr(
74
            "napps.kytos.telemetry_int.main.api",
75
            api_mock,
76
        )
77 1
        monkeypatch.setattr(
78
            "napps.kytos.telemetry_int.managers.int.api",
79
            api_mngr_mock,
80
        )
81
82 1
        evc_id = utils.get_id_from_cookie(flow.cookie)
83 1
        api_mock.get_evc.return_value = {
84
            evc_id: {"metadata": {"telemetry": {"enabled": False}}}
85
        }
86
87 1
        self.napp.int_manager._validate_map_enable_evcs = MagicMock()
88 1
        self.napp.int_manager._remove_int_flows = AsyncMock()
89 1
        self.napp.int_manager.install_int_flows = AsyncMock()
90 1
        endpoint = f"{self.base_endpoint}/evc/redeploy"
91 1
        response = await self.api_client.patch(endpoint, json={"evc_ids": [evc_id]})
92 1
        assert response.status_code == 409
93 1
        assert "isn't enabled" in response.json()["description"]
94
95 1
    @pytest.mark.parametrize("route", ["/evc/enable", "/evc/disable"])
96 1
    async def test_en_dis_openapi_validation(self, route: str) -> None:
97
        """Test OpenAPI enable/disable basic validation."""
98 1
        endpoint = f"{self.base_endpoint}{route}"
99
        # wrong evc_ids payload data type
100 1
        response = await self.api_client.post(endpoint, json={"evc_ids": 1})
101 1
        assert response.status_code == 400
102 1
        assert "evc_ids" in response.json()["description"]
103
104 1
    async def test_disable_telemetry(self, monkeypatch) -> None:
105
        """Test disable telemetry."""
106 1
        api_mock, flow = AsyncMock(), MagicMock()
107 1
        flow.cookie = 0xA800000000000001
108 1
        monkeypatch.setattr(
109
            "napps.kytos.telemetry_int.main.api",
110
            api_mock,
111
        )
112
113 1
        evc_id = utils.get_id_from_cookie(flow.cookie)
114 1
        api_mock.get_evcs.return_value = {
115
            evc_id: {"metadata": {"telemetry": {"enabled": True}}}
116
        }
117
118 1
        self.napp.int_manager = AsyncMock()
119
120 1
        endpoint = f"{self.base_endpoint}/evc/disable"
121 1
        response = await self.api_client.post(endpoint, json={"evc_ids": [evc_id]})
122 1
        assert self.napp.int_manager.disable_int.call_count == 1
123 1
        assert response.status_code == 200
124 1
        assert response.json() == [evc_id]
125
126 1 View Code Duplication
    async def test_get_enabled_evcs(self, monkeypatch) -> None:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
127
        """Test get enabled evcs."""
128 1
        api_mock, flow = AsyncMock(), MagicMock()
129 1
        flow.cookie = 0xA800000000000001
130 1
        monkeypatch.setattr(
131
            "napps.kytos.telemetry_int.main.api",
132
            api_mock,
133
        )
134
135 1
        evc_id = utils.get_id_from_cookie(flow.cookie)
136 1
        api_mock.get_evcs.return_value = {
137
            evc_id: {"metadata": {"telemetry": {"enabled": True}}},
138
        }
139
140 1
        endpoint = f"{self.base_endpoint}/evc"
141 1
        response = await self.api_client.get(endpoint)
142 1
        assert api_mock.get_evcs.call_args[1] == {"metadata.telemetry.enabled": "true"}
143 1
        assert response.status_code == 200
144 1
        data = response.json()
145 1
        assert len(data) == 1
146 1
        assert evc_id in data
147
148 1
    async def test_get_evc_compare(self, monkeypatch) -> None:
149
        """Test get evc compre ok case."""
150 1
        api_mock, flow = AsyncMock(), MagicMock()
151 1
        flow.cookie = 0xA800000000000001
152 1
        evc_id = "1"
153 1
        monkeypatch.setattr(
154
            "napps.kytos.telemetry_int.main.api",
155
            api_mock,
156
        )
157
158 1
        evc_id = utils.get_id_from_cookie(flow.cookie)
159 1
        api_mock.get_evcs.return_value = {
160
            evc_id: {
161
                "id": evc_id,
162
                "name": "evc",
163
                "metadata": {"telemetry": {"enabled": True}},
164
            },
165
        }
166 1
        api_mock.get_stored_flows.side_effect = [
167
            {flow.cookie: [{"id": "some_1", "match": {"in_port": 1}}]},
168
            {flow.cookie: [{"id": "some_2", "match": {"in_port": 1}}]},
169
        ]
170
171 1
        endpoint = f"{self.base_endpoint}/evc/compare"
172 1
        response = await self.api_client.get(endpoint)
173 1
        assert response.status_code == 200
174 1
        data = response.json()
175 1
        assert len(data) == 0
176
177 1
    async def test_get_evc_compare_wrong_metadata(self, monkeypatch) -> None:
178
        """Test get evc compre wrong_metadata_has_int_flows case."""
179 1
        api_mock, flow = AsyncMock(), MagicMock()
180 1
        flow.cookie = 0xA800000000000001
181 1
        evc_id = "1"
182 1
        monkeypatch.setattr(
183
            "napps.kytos.telemetry_int.main.api",
184
            api_mock,
185
        )
186
187 1
        evc_id = utils.get_id_from_cookie(flow.cookie)
188 1
        api_mock.get_evcs.return_value = {
189
            evc_id: {"id": evc_id, "name": "evc", "metadata": {}},
190
        }
191 1
        api_mock.get_stored_flows.side_effect = [
192
            {flow.cookie: [{"id": "some_1", "match": {"in_port": 1}}]},
193
            {flow.cookie: [{"id": "some_2", "match": {"in_port": 1}}]},
194
        ]
195
196 1
        endpoint = f"{self.base_endpoint}/evc/compare"
197 1
        response = await self.api_client.get(endpoint)
198 1
        assert response.status_code == 200
199 1
        data = response.json()
200 1
        assert len(data) == 1
201 1
        assert data[0]["id"] == evc_id
202 1
        assert data[0]["compare_reason"] == ["wrong_metadata_has_int_flows"]
203 1
        assert data[0]["name"] == "evc"
204
205 1
    async def test_get_evc_compare_missing_some_int_flows(self, monkeypatch) -> None:
206
        """Test get evc compre missing_some_int_flows case."""
207 1
        api_mock, flow = AsyncMock(), MagicMock()
208 1
        flow.cookie = 0xA800000000000001
209 1
        evc_id = "1"
210 1
        monkeypatch.setattr(
211
            "napps.kytos.telemetry_int.main.api",
212
            api_mock,
213
        )
214
215 1
        evc_id = utils.get_id_from_cookie(flow.cookie)
216 1
        api_mock.get_evcs.return_value = {
217
            evc_id: {
218
                "id": evc_id,
219
                "name": "evc",
220
                "metadata": {"telemetry": {"enabled": True}},
221
            },
222
        }
223 1
        api_mock.get_stored_flows.side_effect = [
224
            {flow.cookie: [{"id": "some_1", "match": {"in_port": 1}}]},
225
            {
226
                flow.cookie: [
227
                    {"id": "some_2", "match": {"in_port": 1}},
228
                    {"id": "some_3", "match": {"in_port": 1}},
229
                ]
230
            },
231
        ]
232
233 1
        endpoint = f"{self.base_endpoint}/evc/compare"
234 1
        response = await self.api_client.get(endpoint)
235 1
        assert response.status_code == 200
236 1
        data = response.json()
237 1
        assert len(data) == 1
238 1
        assert data[0]["id"] == evc_id
239 1
        assert data[0]["compare_reason"] == ["missing_some_int_flows"]
240 1
        assert data[0]["name"] == "evc"
241
242 1
    async def test_on_table_enabled(self) -> None:
243
        """Test on_table_enabled."""
244 1
        assert self.napp.int_manager.flow_builder.table_group == {"evpl": 2, "epl": 3}
245 1
        await self.napp.on_table_enabled(
246
            KytosEvent(content={"telemetry_int": {"evpl": 22, "epl": 33}})
247
        )
248 1
        assert self.napp.int_manager.flow_builder.table_group == {"evpl": 22, "epl": 33}
249 1
        assert self.napp.controller.buffers.app.aput.call_count == 1
250
251 1
    async def test_on_table_enabled_no_group(self) -> None:
252
        """Test on_table_enabled no group."""
253 1
        await self.napp.on_table_enabled(
254
            KytosEvent(content={"mef_eline": {"evpl": 22, "epl": 33}})
255
        )
256 1
        assert not self.napp.controller.buffers.app.aput.call_count
257
258 1
    async def test_on_evc_deployed(self) -> None:
259
        """Test on_evc_deployed."""
260 1
        content = {"metadata": {"telemetry_request": {}}, "evc_id": "some_id"}
261 1
        self.napp.int_manager.redeploy_int = AsyncMock()
262 1
        self.napp.int_manager.enable_int = AsyncMock()
263 1
        await self.napp.on_evc_deployed(KytosEvent(content=content))
264 1
        assert self.napp.int_manager.enable_int.call_count == 1
265 1
        assert self.napp.int_manager.redeploy_int.call_count == 0
266
267 1
        content = {"metadata": {"telemetry": {"enabled": True}}, "evc_id": "some_id"}
268 1
        await self.napp.on_evc_deployed(KytosEvent(content=content))
269 1
        assert self.napp.int_manager.enable_int.call_count == 1
270 1
        assert self.napp.int_manager.redeploy_int.call_count == 1
271
272 1
    async def test_on_evc_deleted(self) -> None:
273
        """Test on_evc_deleted."""
274 1
        content = {"metadata": {"telemetry": {"enabled": True}}, "evc_id": "some_id"}
275 1
        self.napp.int_manager.disable_int = AsyncMock()
276 1
        await self.napp.on_evc_deleted(KytosEvent(content=content))
277 1
        assert self.napp.int_manager.disable_int.call_count == 1
278
279 1
    async def test_on_uni_active_updated(self, monkeypatch) -> None:
280
        """Test on UNI active updated."""
281 1
        api_mock = AsyncMock()
282 1
        monkeypatch.setattr(
283
            "napps.kytos.telemetry_int.main.api",
284
            api_mock,
285
        )
286 1
        content = {
287
            "metadata": {"telemetry": {"enabled": True}},
288
            "evc_id": "some_id",
289
            "active": True,
290
        }
291 1
        await self.napp.on_uni_active_updated(KytosEvent(content=content))
292 1
        assert api_mock.add_evcs_metadata.call_count == 1
293 1
        args = api_mock.add_evcs_metadata.call_args[0][1]
294 1
        assert args["telemetry"]["status"] == "UP"
295
296 1
        content["active"] = False
297 1
        await self.napp.on_uni_active_updated(KytosEvent(content=content))
298 1
        assert api_mock.add_evcs_metadata.call_count == 2
299 1
        args = api_mock.add_evcs_metadata.call_args[0][1]
300 1
        assert args["telemetry"]["status"] == "DOWN"
301
302 1
    async def test_on_evc_undeployed(self) -> None:
303
        """Test on_evc_undeployed."""
304 1
        content = {
305
            "enabled": False,
306
            "metadata": {"telemetry": {"enabled": False}},
307
            "evc_id": "some_id",
308
        }
309 1
        self.napp.int_manager.remove_int_flows = AsyncMock()
310 1
        await self.napp.on_evc_undeployed(KytosEvent(content=content))
311 1
        assert self.napp.int_manager.remove_int_flows.call_count == 0
312
313 1
        content["metadata"]["telemetry"]["enabled"] = True
314 1
        await self.napp.on_evc_undeployed(KytosEvent(content=content))
315 1
        assert self.napp.int_manager.remove_int_flows.call_count == 1
316
317 1
    async def test_on_evc_redeployed_link(self) -> None:
318
        """Test on redeployed_link_down|redeployed_link_up."""
319 1
        content = {
320
            "enabled": True,
321
            "metadata": {"telemetry": {"enabled": False}},
322
            "evc_id": "some_id",
323
        }
324 1
        self.napp.int_manager.redeploy_int = AsyncMock()
325 1
        await self.napp.on_evc_redeployed_link(KytosEvent(content=content))
326 1
        assert self.napp.int_manager.redeploy_int.call_count == 0
327
328 1
        content["metadata"]["telemetry"]["enabled"] = True
329 1
        await self.napp.on_evc_redeployed_link(KytosEvent(content=content))
330 1
        assert self.napp.int_manager.redeploy_int.call_count == 1
331
332 1
    async def test_on_evc_error_redeployed_link_down(self) -> None:
333
        """Test error_redeployed_link_down."""
334 1
        content = {
335
            "enabled": True,
336
            "metadata": {"telemetry": {"enabled": False}},
337
            "evc_id": "some_id",
338
        }
339 1
        self.napp.int_manager.remove_int_flows = AsyncMock()
340 1
        await self.napp.on_evc_error_redeployed_link_down(KytosEvent(content=content))
341 1
        assert self.napp.int_manager.remove_int_flows.call_count == 0
342
343 1
        content["metadata"]["telemetry"]["enabled"] = True
344 1
        await self.napp.on_evc_error_redeployed_link_down(KytosEvent(content=content))
345 1
        assert self.napp.int_manager.remove_int_flows.call_count == 1
346
347 1
    async def test_on_link_down(self) -> None:
348
        """Test on link_down."""
349 1
        self.napp.int_manager.handle_pp_link_down = AsyncMock()
350 1
        await self.napp.on_link_down(KytosEvent(content={"link": MagicMock()}))
351 1
        assert self.napp.int_manager.handle_pp_link_down.call_count == 1
352
353 1
    async def test_on_link_up(self) -> None:
354
        """Test on link_up."""
355 1
        self.napp.int_manager.handle_pp_link_up = AsyncMock()
356 1
        await self.napp.on_link_up(KytosEvent(content={"link": MagicMock()}))
357 1
        assert self.napp.int_manager.handle_pp_link_up.call_count == 1
358
359 1
    async def test_on_table_enabled_error(self, monkeypatch) -> None:
360
        """Test on_table_enabled error case."""
361 1
        assert self.napp.int_manager.flow_builder.table_group == {"evpl": 2, "epl": 3}
362 1
        log_mock = MagicMock()
363 1
        monkeypatch.setattr("napps.kytos.telemetry_int.main.log", log_mock)
364 1
        await self.napp.on_table_enabled(
365
            KytosEvent(content={"telemetry_int": {"invalid": 1}})
366
        )
367 1
        assert self.napp.int_manager.flow_builder.table_group == {"evpl": 2, "epl": 3}
368 1
        assert log_mock.error.call_count == 1
369 1
        assert not self.napp.controller.buffers.app.aput.call_count
370
371 1
    async def test_on_flow_mod_error(self, monkeypatch) -> None:
372
        """Test on_flow_mod_error."""
373 1
        api_mock_main, api_mock_int, flow = AsyncMock(), AsyncMock(), MagicMock()
374 1
        flow.cookie = 0xA800000000000001
375 1
        monkeypatch.setattr(
376
            "napps.kytos.telemetry_int.main.api",
377
            api_mock_main,
378
        )
379 1
        monkeypatch.setattr(
380
            "napps.kytos.telemetry_int.managers.int.api",
381
            api_mock_int,
382
        )
383 1
        cookie = utils.get_id_from_cookie(flow.cookie)
384 1
        api_mock_main.get_evc.return_value = {
385
            cookie: {"metadata": {"telemetry": {"enabled": True}}}
386
        }
387 1
        api_mock_int.get_stored_flows.return_value = {cookie: [MagicMock()]}
388 1
        self.napp.int_manager._remove_int_flows = AsyncMock()
389
390 1
        event = KytosEvent(content={"flow": flow, "error_command": "add"})
391 1
        await self.napp.on_flow_mod_error(event)
392
393 1
        assert api_mock_main.get_evc.call_count == 1
394 1
        assert api_mock_int.get_stored_flows.call_count == 1
395 1
        assert api_mock_int.add_evcs_metadata.call_count == 1
396 1
        assert self.napp.int_manager._remove_int_flows.call_count == 1
397
398 1
    async def test_on_mef_eline_evcs_loaded(self):
399
        """Test on_mef_eline_evcs_loaded."""
400 1
        evcs = {"1": {}, "2": {}}
401 1
        event = KytosEvent(content=evcs)
402 1
        self.napp.int_manager = MagicMock()
403 1
        await self.napp.on_mef_eline_evcs_loaded(event)
404 1
        self.napp.int_manager.load_uni_src_proxy_ports.assert_called_with(evcs)
405
406 1
    async def test_on_intf_metadata_remove(self):
407
        """Test on_intf_metadata_removed."""
408 1
        intf = MagicMock()
409 1
        event = KytosEvent(content={"interface": intf})
410 1
        self.napp.int_manager = MagicMock()
411 1
        await self.napp.on_intf_metadata_removed(event)
412 1
        self.napp.int_manager.handle_pp_metadata_removed.assert_called_with(intf)
413
414 1
    async def test_on_intf_metadata_added(self):
415
        """Test on_intf_metadata_added."""
416 1
        intf = MagicMock()
417 1
        event = KytosEvent(content={"interface": intf})
418 1
        self.napp.int_manager = MagicMock()
419 1
        await self.napp.on_intf_metadata_added(event)
420
        self.napp.int_manager.handle_pp_metadata_added.assert_called_with(intf)
421