@@ 27-58 (lines=32) @@ | ||
24 | self.api_client = get_test_client(controller, self.napp) |
|
25 | self.base_endpoint = "kytos/telemetry_int/v1" |
|
26 | ||
27 | async def test_enable_telemetry(self, monkeypatch) -> None: |
|
28 | """Test enable telemetry.""" |
|
29 | api_mock, flow = AsyncMock(), MagicMock() |
|
30 | flow.cookie = 0xA800000000000001 |
|
31 | monkeypatch.setattr( |
|
32 | "napps.kytos.telemetry_int.main.api", |
|
33 | api_mock, |
|
34 | ) |
|
35 | ||
36 | evc_id = utils.get_id_from_cookie(flow.cookie) |
|
37 | api_mock.get_evcs.return_value = { |
|
38 | evc_id: {"metadata": {"telemetry": {"enabled": False}}} |
|
39 | } |
|
40 | ||
41 | self.napp.int_manager = AsyncMock() |
|
42 | ||
43 | endpoint = f"{self.base_endpoint}/evc/enable" |
|
44 | response = await self.api_client.post(endpoint, json={"evc_ids": [evc_id]}) |
|
45 | assert self.napp.int_manager.enable_int.call_count == 1 |
|
46 | ||
47 | enable_int_args = self.napp.int_manager.enable_int.call_args |
|
48 | # evcs arg |
|
49 | assert evc_id in enable_int_args[0][0] |
|
50 | # assert the other args |
|
51 | assert enable_int_args[1] == { |
|
52 | "force": False, |
|
53 | "proxy_port_enabled": None, |
|
54 | "set_proxy_port_metadata": True, |
|
55 | } |
|
56 | ||
57 | assert response.status_code == 201 |
|
58 | assert response.json() == [evc_id] |
|
59 | ||
60 | async def test_enable_telemetry_wrong_types(self, monkeypatch) -> None: |
|
61 | """Test enable telemetry wrong types.""" |
|
@@ 60-90 (lines=31) @@ | ||
57 | assert response.status_code == 201 |
|
58 | assert response.json() == [evc_id] |
|
59 | ||
60 | async def test_enable_telemetry_wrong_types(self, monkeypatch) -> None: |
|
61 | """Test enable telemetry wrong types.""" |
|
62 | api_mock, flow = AsyncMock(), MagicMock() |
|
63 | flow.cookie = 0xA800000000000001 |
|
64 | monkeypatch.setattr( |
|
65 | "napps.kytos.telemetry_int.main.api", |
|
66 | api_mock, |
|
67 | ) |
|
68 | evc_id = utils.get_id_from_cookie(flow.cookie) |
|
69 | api_mock.get_evcs.return_value = { |
|
70 | evc_id: {"metadata": {"telemetry": {"enabled": False}}} |
|
71 | } |
|
72 | ||
73 | endpoint = f"{self.base_endpoint}/evc/enable" |
|
74 | response = await self.api_client.post( |
|
75 | endpoint, json={"evc_ids": [evc_id], "proxy_port_enabled": 1} |
|
76 | ) |
|
77 | assert response.status_code == 400 |
|
78 | assert ( |
|
79 | "1 is not of type 'boolean' for field proxy_port_enabled" |
|
80 | in response.json()["description"] |
|
81 | ) |
|
82 | ||
83 | endpoint = f"{self.base_endpoint}/evc/enable" |
|
84 | response = await self.api_client.post( |
|
85 | endpoint, json={"evc_ids": [evc_id], "force": 2} |
|
86 | ) |
|
87 | assert response.status_code == 400 |
|
88 | assert ( |
|
89 | "2 is not of type 'boolean' for field force" |
|
90 | in response.json()["description"] |
|
91 | ) |
|
92 | ||
93 | async def test_redeploy_telemetry_enabled(self, monkeypatch) -> None: |