@@ 27-48 (lines=22) @@ | ||
24 | self.api_client = get_test_client(controller, self.napp) |
|
25 | self.base_endpoint = "kytos/telemetry_int/v1" |
|
26 | ||
27 | async def test_enable_telemetry(self, monkeypatch) -> None: |
|
28 | """Test enable telemetry.""" |
|
29 | api_mock, flow = AsyncMock(), MagicMock() |
|
30 | flow.cookie = 0xA800000000000001 |
|
31 | monkeypatch.setattr( |
|
32 | "napps.kytos.telemetry_int.main.api", |
|
33 | api_mock, |
|
34 | ) |
|
35 | ||
36 | evc_id = utils.get_id_from_cookie(flow.cookie) |
|
37 | api_mock.get_evcs.return_value = { |
|
38 | evc_id: {"metadata": {"telemetry": {"enabled": False}}} |
|
39 | } |
|
40 | ||
41 | self.napp.int_manager = AsyncMock() |
|
42 | ||
43 | endpoint = f"{self.base_endpoint}/evc/enable" |
|
44 | response = await self.api_client.post(endpoint, json={"evc_ids": [evc_id]}) |
|
45 | assert self.napp.int_manager.enable_int.call_count == 1 |
|
46 | assert self.napp.int_manager._remove_int_flows_by_cookies.call_count == 1 |
|
47 | assert response.status_code == 201 |
|
48 | assert response.json() == [evc_id] |
|
49 | ||
50 | async def test_redeploy_telemetry_enabled(self, monkeypatch) -> None: |
|
51 | """Test redeploy telemetry enabled.""" |
|
@@ 107-127 (lines=21) @@ | ||
104 | assert response.status_code == 400 |
|
105 | assert "evc_ids" in response.json()["description"] |
|
106 | ||
107 | async def test_disable_telemetry(self, monkeypatch) -> None: |
|
108 | """Test disable telemetry.""" |
|
109 | api_mock, flow = AsyncMock(), MagicMock() |
|
110 | flow.cookie = 0xA800000000000001 |
|
111 | monkeypatch.setattr( |
|
112 | "napps.kytos.telemetry_int.main.api", |
|
113 | api_mock, |
|
114 | ) |
|
115 | ||
116 | evc_id = utils.get_id_from_cookie(flow.cookie) |
|
117 | api_mock.get_evcs.return_value = { |
|
118 | evc_id: {"metadata": {"telemetry": {"enabled": True}}} |
|
119 | } |
|
120 | ||
121 | self.napp.int_manager = AsyncMock() |
|
122 | ||
123 | endpoint = f"{self.base_endpoint}/evc/disable" |
|
124 | response = await self.api_client.post(endpoint, json={"evc_ids": [evc_id]}) |
|
125 | assert self.napp.int_manager.disable_int.call_count == 1 |
|
126 | assert response.status_code == 200 |
|
127 | assert response.json() == [evc_id] |
|
128 | ||
129 | async def test_get_enabled_evcs(self, monkeypatch) -> None: |
|
130 | """Test get enabled evcs.""" |
|
@@ 50-70 (lines=21) @@ | ||
47 | assert response.status_code == 201 |
|
48 | assert response.json() == [evc_id] |
|
49 | ||
50 | async def test_redeploy_telemetry_enabled(self, monkeypatch) -> None: |
|
51 | """Test redeploy telemetry enabled.""" |
|
52 | api_mock, flow = AsyncMock(), MagicMock() |
|
53 | flow.cookie = 0xA800000000000001 |
|
54 | monkeypatch.setattr( |
|
55 | "napps.kytos.telemetry_int.main.api", |
|
56 | api_mock, |
|
57 | ) |
|
58 | ||
59 | evc_id = utils.get_id_from_cookie(flow.cookie) |
|
60 | api_mock.get_evc.return_value = { |
|
61 | evc_id: {"metadata": {"telemetry": {"enabled": True}}} |
|
62 | } |
|
63 | ||
64 | self.napp.int_manager = AsyncMock() |
|
65 | ||
66 | endpoint = f"{self.base_endpoint}/evc/redeploy" |
|
67 | response = await self.api_client.patch(endpoint, json={"evc_ids": [evc_id]}) |
|
68 | assert self.napp.int_manager.redeploy_int.call_count == 1 |
|
69 | assert response.status_code == 201 |
|
70 | assert response.json() == [evc_id] |
|
71 | ||
72 | async def test_redeploy_telemetry_not_enabled(self, monkeypatch) -> None: |
|
73 | """Test redeploy telemetry not enabled.""" |