@@ 24-45 (lines=22) @@ | ||
21 | self.api_client = get_test_client(controller, self.napp) |
|
22 | self.base_endpoint = "kytos/telemetry_int/v1" |
|
23 | ||
24 | async def test_enable_telemetry(self, monkeypatch) -> None: |
|
25 | """Test enable telemetry.""" |
|
26 | api_mock, flow = AsyncMock(), MagicMock() |
|
27 | flow.cookie = 0xA800000000000001 |
|
28 | monkeypatch.setattr( |
|
29 | "napps.kytos.telemetry_int.main.api", |
|
30 | api_mock, |
|
31 | ) |
|
32 | ||
33 | evc_id = utils.get_id_from_cookie(flow.cookie) |
|
34 | api_mock.get_evcs.return_value = { |
|
35 | evc_id: {"metadata": {"telemetry": {"enabled": False}}} |
|
36 | } |
|
37 | ||
38 | self.napp.int_manager = AsyncMock() |
|
39 | ||
40 | endpoint = f"{self.base_endpoint}/evc/enable" |
|
41 | response = await self.api_client.post(endpoint, json={"evc_ids": [evc_id]}) |
|
42 | assert self.napp.int_manager.enable_int.call_count == 1 |
|
43 | assert self.napp.int_manager._remove_int_flows.call_count == 1 |
|
44 | assert response.status_code == 201 |
|
45 | assert response.json() == [evc_id] |
|
46 | ||
47 | async def test_redeploy_telemetry(self, monkeypatch) -> None: |
|
48 | """Test redeploy telemetry.""" |
|
@@ 78-98 (lines=21) @@ | ||
75 | assert response.status_code == 400 |
|
76 | assert "evc_ids" in response.json()["description"] |
|
77 | ||
78 | async def test_disable_telemetry(self, monkeypatch) -> None: |
|
79 | """Test disable telemetry.""" |
|
80 | api_mock, flow = AsyncMock(), MagicMock() |
|
81 | flow.cookie = 0xA800000000000001 |
|
82 | monkeypatch.setattr( |
|
83 | "napps.kytos.telemetry_int.main.api", |
|
84 | api_mock, |
|
85 | ) |
|
86 | ||
87 | evc_id = utils.get_id_from_cookie(flow.cookie) |
|
88 | api_mock.get_evcs.return_value = { |
|
89 | evc_id: {"metadata": {"telemetry": {"enabled": True}}} |
|
90 | } |
|
91 | ||
92 | self.napp.int_manager = AsyncMock() |
|
93 | ||
94 | endpoint = f"{self.base_endpoint}/evc/disable" |
|
95 | response = await self.api_client.post(endpoint, json={"evc_ids": [evc_id]}) |
|
96 | assert self.napp.int_manager.disable_int.call_count == 1 |
|
97 | assert response.status_code == 200 |
|
98 | assert response.json() == [evc_id] |
|
99 | ||
100 | async def test_get_enabled_evcs(self, monkeypatch) -> None: |
|
101 | """Test get enabled evcs.""" |
|
@@ 47-67 (lines=21) @@ | ||
44 | assert response.status_code == 201 |
|
45 | assert response.json() == [evc_id] |
|
46 | ||
47 | async def test_redeploy_telemetry(self, monkeypatch) -> None: |
|
48 | """Test redeploy telemetry.""" |
|
49 | api_mock, flow = AsyncMock(), MagicMock() |
|
50 | flow.cookie = 0xA800000000000001 |
|
51 | monkeypatch.setattr( |
|
52 | "napps.kytos.telemetry_int.main.api", |
|
53 | api_mock, |
|
54 | ) |
|
55 | ||
56 | evc_id = utils.get_id_from_cookie(flow.cookie) |
|
57 | api_mock.get_evcs.return_value = { |
|
58 | evc_id: {"metadata": {"telemetry": {"enabled": False}}} |
|
59 | } |
|
60 | ||
61 | self.napp.int_manager = AsyncMock() |
|
62 | ||
63 | endpoint = f"{self.base_endpoint}/evc/redeploy" |
|
64 | response = await self.api_client.patch(endpoint, json={"evc_ids": [evc_id]}) |
|
65 | assert self.napp.int_manager.redeploy_int.call_count == 1 |
|
66 | assert response.status_code == 201 |
|
67 | assert response.json() == [evc_id] |
|
68 | ||
69 | @pytest.mark.parametrize("route", ["/evc/enable", "/evc/disable"]) |
|
70 | async def test_en_dis_openapi_validation(self, route: str) -> None: |