Total Complexity | 40 |
Total Lines | 740 |
Duplicated Lines | 14.19 % |
Coverage | 100% |
Changes | 0 |
Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.
Common duplication problems, and corresponding solutions are:
Complex classes like test_main often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
1 | """Test Main methods.""" |
||
2 | |||
3 | 1 | from unittest.mock import AsyncMock, MagicMock, patch |
|
4 | |||
5 | 1 | import pytest |
|
6 | 1 | from napps.kytos.telemetry_int import utils |
|
7 | 1 | from napps.kytos.telemetry_int.exceptions import EVCError, ProxyPortShared |
|
8 | 1 | from napps.kytos.telemetry_int.main import Main |
|
9 | |||
10 | 1 | from kytos.core.common import EntityStatus |
|
11 | 1 | from kytos.core.events import KytosEvent |
|
12 | 1 | from kytos.lib.helpers import get_controller_mock, get_test_client |
|
13 | |||
14 | |||
15 | 1 | class TestMain: |
|
16 | """Tests for the Main class.""" |
||
17 | |||
18 | 1 | def setup_method(self): |
|
19 | """Setup.""" |
||
20 | 1 | patch("kytos.core.helpers.run_on_thread", lambda x: x).start() |
|
21 | # pylint: disable=import-outside-toplevel |
||
22 | 1 | controller = get_controller_mock() |
|
23 | 1 | self.napp = Main(controller) |
|
24 | 1 | self.api_client = get_test_client(controller, self.napp) |
|
25 | 1 | self.base_endpoint = "kytos/telemetry_int/v1" |
|
26 | |||
27 | 1 | View Code Duplication | async def test_enable_telemetry(self, monkeypatch) -> None: |
|
|||
28 | """Test enable telemetry.""" |
||
29 | 1 | api_mock, flow = AsyncMock(), MagicMock() |
|
30 | 1 | flow.cookie = 0xA800000000000001 |
|
31 | 1 | monkeypatch.setattr( |
|
32 | "napps.kytos.telemetry_int.main.api", |
||
33 | api_mock, |
||
34 | ) |
||
35 | |||
36 | 1 | evc_id = utils.get_id_from_cookie(flow.cookie) |
|
37 | 1 | api_mock.get_evcs.return_value = { |
|
38 | evc_id: {"metadata": {"telemetry": {"enabled": False}}} |
||
39 | } |
||
40 | |||
41 | 1 | self.napp.int_manager = AsyncMock() |
|
42 | |||
43 | 1 | endpoint = f"{self.base_endpoint}/evc/enable" |
|
44 | 1 | response = await self.api_client.post(endpoint, json={"evc_ids": [evc_id]}) |
|
45 | 1 | assert self.napp.int_manager.enable_int.call_count == 1 |
|
46 | |||
47 | 1 | enable_int_args = self.napp.int_manager.enable_int.call_args |
|
48 | # evcs arg |
||
49 | 1 | assert evc_id in enable_int_args[0][0] |
|
50 | # assert the other args |
||
51 | 1 | assert enable_int_args[1] == { |
|
52 | "force": False, |
||
53 | "proxy_port_enabled": None, |
||
54 | "set_proxy_port_metadata": True, |
||
55 | } |
||
56 | |||
57 | 1 | assert response.status_code == 201 |
|
58 | 1 | assert response.json() == [evc_id] |
|
59 | |||
60 | 1 | View Code Duplication | async def test_enable_telemetry_wrong_types(self, monkeypatch) -> None: |
61 | """Test enable telemetry wrong types.""" |
||
62 | 1 | api_mock, flow = AsyncMock(), MagicMock() |
|
63 | 1 | flow.cookie = 0xA800000000000001 |
|
64 | 1 | monkeypatch.setattr( |
|
65 | "napps.kytos.telemetry_int.main.api", |
||
66 | api_mock, |
||
67 | ) |
||
68 | 1 | evc_id = utils.get_id_from_cookie(flow.cookie) |
|
69 | 1 | api_mock.get_evcs.return_value = { |
|
70 | evc_id: {"metadata": {"telemetry": {"enabled": False}}} |
||
71 | } |
||
72 | |||
73 | 1 | endpoint = f"{self.base_endpoint}/evc/enable" |
|
74 | 1 | response = await self.api_client.post( |
|
75 | endpoint, json={"evc_ids": [evc_id], "proxy_port_enabled": 1} |
||
76 | ) |
||
77 | 1 | assert response.status_code == 400 |
|
78 | 1 | assert ( |
|
79 | "1 is not of type 'boolean' for field proxy_port_enabled" |
||
80 | in response.json()["description"] |
||
81 | ) |
||
82 | |||
83 | 1 | endpoint = f"{self.base_endpoint}/evc/enable" |
|
84 | 1 | response = await self.api_client.post( |
|
85 | endpoint, json={"evc_ids": [evc_id], "force": 2} |
||
86 | ) |
||
87 | 1 | assert response.status_code == 400 |
|
88 | 1 | assert ( |
|
89 | "2 is not of type 'boolean' for field force" |
||
90 | in response.json()["description"] |
||
91 | ) |
||
92 | |||
93 | 1 | View Code Duplication | async def test_redeploy_telemetry_enabled(self, monkeypatch) -> None: |
94 | """Test redeploy telemetry enabled.""" |
||
95 | 1 | api_mock, flow = AsyncMock(), MagicMock() |
|
96 | 1 | flow.cookie = 0xA800000000000001 |
|
97 | 1 | monkeypatch.setattr( |
|
98 | "napps.kytos.telemetry_int.main.api", |
||
99 | api_mock, |
||
100 | ) |
||
101 | |||
102 | 1 | evc_id = utils.get_id_from_cookie(flow.cookie) |
|
103 | 1 | api_mock.get_evc.return_value = { |
|
104 | evc_id: {"metadata": {"telemetry": {"enabled": True}}} |
||
105 | } |
||
106 | |||
107 | 1 | self.napp.int_manager = AsyncMock() |
|
108 | |||
109 | 1 | endpoint = f"{self.base_endpoint}/evc/redeploy" |
|
110 | 1 | response = await self.api_client.patch(endpoint, json={"evc_ids": [evc_id]}) |
|
111 | 1 | assert self.napp.int_manager.redeploy_int.call_count == 1 |
|
112 | 1 | assert response.status_code == 201 |
|
113 | 1 | assert response.json() == [evc_id] |
|
114 | |||
115 | 1 | async def test_redeploy_telemetry_not_enabled(self, monkeypatch) -> None: |
|
116 | """Test redeploy telemetry not enabled.""" |
||
117 | 1 | api_mock, flow, api_mngr_mock = AsyncMock(), MagicMock(), AsyncMock() |
|
118 | 1 | flow.cookie = 0xA800000000000001 |
|
119 | 1 | monkeypatch.setattr( |
|
120 | "napps.kytos.telemetry_int.main.api", |
||
121 | api_mock, |
||
122 | ) |
||
123 | 1 | monkeypatch.setattr( |
|
124 | "napps.kytos.telemetry_int.managers.int.api", |
||
125 | api_mngr_mock, |
||
126 | ) |
||
127 | |||
128 | 1 | evc_id = utils.get_id_from_cookie(flow.cookie) |
|
129 | 1 | api_mock.get_evc.return_value = { |
|
130 | evc_id: {"metadata": {"telemetry": {"enabled": False}}} |
||
131 | } |
||
132 | |||
133 | 1 | self.napp.int_manager._validate_map_enable_evcs = MagicMock() |
|
134 | 1 | self.napp.int_manager._remove_int_flows_by_cookies = AsyncMock() |
|
135 | 1 | self.napp.int_manager.install_int_flows = AsyncMock() |
|
136 | 1 | endpoint = f"{self.base_endpoint}/evc/redeploy" |
|
137 | 1 | response = await self.api_client.patch(endpoint, json={"evc_ids": [evc_id]}) |
|
138 | 1 | assert response.status_code == 409 |
|
139 | 1 | assert "isn't enabled" in response.json()["description"] |
|
140 | |||
141 | 1 | @pytest.mark.parametrize("route", ["/evc/enable", "/evc/disable"]) |
|
142 | 1 | async def test_en_dis_openapi_validation(self, route: str) -> None: |
|
143 | """Test OpenAPI enable/disable basic validation.""" |
||
144 | 1 | endpoint = f"{self.base_endpoint}{route}" |
|
145 | # wrong evc_ids payload data type |
||
146 | 1 | response = await self.api_client.post(endpoint, json={"evc_ids": 1}) |
|
147 | 1 | assert response.status_code == 400 |
|
148 | 1 | assert "evc_ids" in response.json()["description"] |
|
149 | |||
150 | 1 | View Code Duplication | async def test_disable_telemetry(self, monkeypatch) -> None: |
151 | """Test disable telemetry.""" |
||
152 | 1 | api_mock, flow = AsyncMock(), MagicMock() |
|
153 | 1 | flow.cookie = 0xA800000000000001 |
|
154 | 1 | monkeypatch.setattr( |
|
155 | "napps.kytos.telemetry_int.main.api", |
||
156 | api_mock, |
||
157 | ) |
||
158 | |||
159 | 1 | evc_id = utils.get_id_from_cookie(flow.cookie) |
|
160 | 1 | api_mock.get_evcs.return_value = { |
|
161 | evc_id: {"metadata": {"telemetry": {"enabled": True}}} |
||
162 | } |
||
163 | |||
164 | 1 | self.napp.int_manager = AsyncMock() |
|
165 | |||
166 | 1 | endpoint = f"{self.base_endpoint}/evc/disable" |
|
167 | 1 | response = await self.api_client.post(endpoint, json={"evc_ids": [evc_id]}) |
|
168 | 1 | assert self.napp.int_manager.disable_int.call_count == 1 |
|
169 | 1 | assert response.status_code == 200 |
|
170 | 1 | assert response.json() == [evc_id] |
|
171 | |||
172 | 1 | async def test_get_enabled_evcs(self, monkeypatch) -> None: |
|
173 | """Test get enabled evcs.""" |
||
174 | 1 | api_mock, flow = AsyncMock(), MagicMock() |
|
175 | 1 | flow.cookie = 0xA800000000000001 |
|
176 | 1 | monkeypatch.setattr( |
|
177 | "napps.kytos.telemetry_int.main.api", |
||
178 | api_mock, |
||
179 | ) |
||
180 | |||
181 | 1 | evc_id = utils.get_id_from_cookie(flow.cookie) |
|
182 | 1 | api_mock.get_evcs.return_value = { |
|
183 | evc_id: {"metadata": {"telemetry": {"enabled": True}}}, |
||
184 | } |
||
185 | |||
186 | 1 | endpoint = f"{self.base_endpoint}/evc" |
|
187 | 1 | response = await self.api_client.get(endpoint) |
|
188 | 1 | assert api_mock.get_evcs.call_args[1] == {"metadata.telemetry.enabled": "true"} |
|
189 | 1 | assert response.status_code == 200 |
|
190 | 1 | data = response.json() |
|
191 | 1 | assert len(data) == 1 |
|
192 | 1 | assert evc_id in data |
|
193 | |||
194 | 1 | async def test_get_evc_compare(self, monkeypatch) -> None: |
|
195 | """Test get evc compre ok case.""" |
||
196 | 1 | api_mock, flow = AsyncMock(), MagicMock() |
|
197 | 1 | flow.cookie = 0xA800000000000001 |
|
198 | 1 | evc_id = "1" |
|
199 | 1 | monkeypatch.setattr( |
|
200 | "napps.kytos.telemetry_int.main.api", |
||
201 | api_mock, |
||
202 | ) |
||
203 | |||
204 | 1 | evc_id = utils.get_id_from_cookie(flow.cookie) |
|
205 | 1 | api_mock.get_evcs.return_value = { |
|
206 | evc_id: { |
||
207 | "id": evc_id, |
||
208 | "name": "evc", |
||
209 | "metadata": {"telemetry": {"enabled": True}}, |
||
210 | }, |
||
211 | } |
||
212 | 1 | api_mock.get_stored_flows.side_effect = [ |
|
213 | {flow.cookie: [{"id": "some_1", "match": {"in_port": 1}}]}, |
||
214 | {flow.cookie: [{"id": "some_2", "match": {"in_port": 1}}]}, |
||
215 | ] |
||
216 | |||
217 | 1 | endpoint = f"{self.base_endpoint}/evc/compare" |
|
218 | 1 | response = await self.api_client.get(endpoint) |
|
219 | 1 | assert response.status_code == 200 |
|
220 | 1 | data = response.json() |
|
221 | 1 | assert len(data) == 0 |
|
222 | |||
223 | 1 | async def test_get_evc_compare_wrong_metadata(self, monkeypatch) -> None: |
|
224 | """Test get evc compre wrong_metadata_has_int_flows case.""" |
||
225 | 1 | api_mock, flow = AsyncMock(), MagicMock() |
|
226 | 1 | flow.cookie = 0xA800000000000001 |
|
227 | 1 | evc_id = "1" |
|
228 | 1 | monkeypatch.setattr( |
|
229 | "napps.kytos.telemetry_int.main.api", |
||
230 | api_mock, |
||
231 | ) |
||
232 | |||
233 | 1 | evc_id = utils.get_id_from_cookie(flow.cookie) |
|
234 | 1 | api_mock.get_evcs.return_value = { |
|
235 | evc_id: {"id": evc_id, "name": "evc", "metadata": {}}, |
||
236 | } |
||
237 | 1 | api_mock.get_stored_flows.side_effect = [ |
|
238 | {flow.cookie: [{"id": "some_1", "match": {"in_port": 1}}]}, |
||
239 | {flow.cookie: [{"id": "some_2", "match": {"in_port": 1}}]}, |
||
240 | ] |
||
241 | |||
242 | 1 | endpoint = f"{self.base_endpoint}/evc/compare" |
|
243 | 1 | response = await self.api_client.get(endpoint) |
|
244 | 1 | assert response.status_code == 200 |
|
245 | 1 | data = response.json() |
|
246 | 1 | assert len(data) == 1 |
|
247 | 1 | assert data[0]["id"] == evc_id |
|
248 | 1 | assert data[0]["compare_reason"] == ["wrong_metadata_has_int_flows"] |
|
249 | 1 | assert data[0]["name"] == "evc" |
|
250 | |||
251 | 1 | async def test_get_evc_compare_missing_some_int_flows(self, monkeypatch) -> None: |
|
252 | """Test get evc compre missing_some_int_flows case.""" |
||
253 | 1 | api_mock, flow = AsyncMock(), MagicMock() |
|
254 | 1 | flow.cookie = 0xA800000000000001 |
|
255 | 1 | evc_id = "1" |
|
256 | 1 | monkeypatch.setattr( |
|
257 | "napps.kytos.telemetry_int.main.api", |
||
258 | api_mock, |
||
259 | ) |
||
260 | |||
261 | 1 | evc_id = utils.get_id_from_cookie(flow.cookie) |
|
262 | 1 | api_mock.get_evcs.return_value = { |
|
263 | evc_id: { |
||
264 | "id": evc_id, |
||
265 | "name": "evc", |
||
266 | "metadata": {"telemetry": {"enabled": True}}, |
||
267 | }, |
||
268 | } |
||
269 | 1 | api_mock.get_stored_flows.side_effect = [ |
|
270 | {flow.cookie: [{"id": "some_1", "match": {"in_port": 1}}]}, |
||
271 | { |
||
272 | flow.cookie: [ |
||
273 | {"id": "some_2", "match": {"in_port": 1}}, |
||
274 | {"id": "some_3", "match": {"in_port": 1}}, |
||
275 | ] |
||
276 | }, |
||
277 | ] |
||
278 | |||
279 | 1 | endpoint = f"{self.base_endpoint}/evc/compare" |
|
280 | 1 | response = await self.api_client.get(endpoint) |
|
281 | 1 | assert response.status_code == 200 |
|
282 | 1 | data = response.json() |
|
283 | 1 | assert len(data) == 1 |
|
284 | 1 | assert data[0]["id"] == evc_id |
|
285 | 1 | assert data[0]["compare_reason"] == ["missing_some_int_flows"] |
|
286 | 1 | assert data[0]["name"] == "evc" |
|
287 | |||
288 | 1 | async def test_delete_proxy_port_metadata(self, monkeypatch) -> None: |
|
289 | """Test delete proxy_port metadata.""" |
||
290 | 1 | api_mock = AsyncMock() |
|
291 | 1 | monkeypatch.setattr( |
|
292 | "napps.kytos.telemetry_int.main.api", |
||
293 | api_mock, |
||
294 | ) |
||
295 | 1 | intf_id, port_number = "00:00:00:00:00:00:00:01:1", 7 |
|
296 | 1 | endpoint = f"{self.base_endpoint}/uni/{intf_id}/proxy_port" |
|
297 | 1 | self.napp.controller.get_interface_by_id = MagicMock() |
|
298 | 1 | pp = MagicMock() |
|
299 | 1 | pp.evc_ids = set() |
|
300 | 1 | self.napp.int_manager.get_proxy_port_or_raise = MagicMock() |
|
301 | 1 | self.napp.int_manager.get_proxy_port_or_raise.return_value = pp |
|
302 | 1 | intf_mock = MagicMock() |
|
303 | 1 | intf_mock.metadata = {"proxy_port": port_number} |
|
304 | 1 | self.napp.controller.get_interface_by_id = MagicMock() |
|
305 | 1 | self.napp.controller.get_interface_by_id.return_value = intf_mock |
|
306 | 1 | response = await self.api_client.delete(endpoint) |
|
307 | 1 | assert response.status_code == 200 |
|
308 | 1 | data = response.json() |
|
309 | 1 | assert data == "Operation successful" |
|
310 | 1 | assert api_mock.delete_proxy_port_metadata.call_count == 1 |
|
311 | |||
312 | 1 | async def test_delete_proxy_port_metadata_force(self, monkeypatch) -> None: |
|
313 | """Test delete proxy_port metadata force.""" |
||
314 | 1 | api_mock = AsyncMock() |
|
315 | 1 | monkeypatch.setattr( |
|
316 | "napps.kytos.telemetry_int.main.api", |
||
317 | api_mock, |
||
318 | ) |
||
319 | 1 | intf_id, port_number = "00:00:00:00:00:00:00:01:1", 7 |
|
320 | 1 | src_id = "00:00:00:00:00:00:00:01:7" |
|
321 | 1 | endpoint = f"{self.base_endpoint}/uni/{intf_id}/proxy_port" |
|
322 | 1 | self.napp.controller.get_interface_by_id = MagicMock() |
|
323 | 1 | pp = MagicMock() |
|
324 | 1 | pp.evc_ids = set(["some_id"]) |
|
325 | 1 | self.napp.int_manager.unis_src[intf_id] = src_id |
|
326 | 1 | self.napp.int_manager.srcs_pp[src_id] = pp |
|
327 | 1 | intf_mock = MagicMock() |
|
328 | 1 | intf_mock.metadata = {"proxy_port": port_number} |
|
329 | 1 | self.napp.controller.get_interface_by_id = MagicMock() |
|
330 | 1 | self.napp.controller.get_interface_by_id.return_value = intf_mock |
|
331 | 1 | response = await self.api_client.delete(endpoint) |
|
332 | 1 | assert response.status_code == 409 |
|
333 | 1 | data = response.json()["description"] |
|
334 | 1 | assert "is in use on 1" in data |
|
335 | 1 | assert not api_mock.delete_proxy_port_metadata.call_count |
|
336 | |||
337 | 1 | endpoint = f"{self.base_endpoint}/uni/{intf_id}/proxy_port?force=true" |
|
338 | 1 | response = await self.api_client.delete(endpoint) |
|
339 | 1 | assert response.status_code == 200 |
|
340 | 1 | data = response.json() |
|
341 | 1 | assert "Operation successful" in data |
|
342 | 1 | assert api_mock.delete_proxy_port_metadata.call_count == 1 |
|
343 | |||
344 | 1 | async def test_delete_proxy_port_metadata_early_ret(self, monkeypatch) -> None: |
|
345 | """Test delete proxy_port metadata early ret.""" |
||
346 | 1 | api_mock = AsyncMock() |
|
347 | 1 | monkeypatch.setattr( |
|
348 | "napps.kytos.telemetry_int.main.api", |
||
349 | api_mock, |
||
350 | ) |
||
351 | 1 | intf_id = "00:00:00:00:00:00:00:01:1" |
|
352 | 1 | endpoint = f"{self.base_endpoint}/uni/{intf_id}/proxy_port" |
|
353 | 1 | self.napp.controller.get_interface_by_id = MagicMock() |
|
354 | 1 | pp = MagicMock() |
|
355 | 1 | pp.evc_ids = set(["some_id"]) |
|
356 | 1 | self.napp.int_manager.get_proxy_port_or_raise = MagicMock() |
|
357 | 1 | self.napp.int_manager.get_proxy_port_or_raise.return_value = pp |
|
358 | 1 | intf_mock = MagicMock() |
|
359 | 1 | intf_mock.metadata = {} |
|
360 | 1 | self.napp.controller.get_interface_by_id = MagicMock() |
|
361 | 1 | self.napp.controller.get_interface_by_id.return_value = intf_mock |
|
362 | 1 | response = await self.api_client.delete(endpoint) |
|
363 | 1 | assert response.status_code == 200 |
|
364 | 1 | data = response.json() |
|
365 | 1 | assert "Operation successful" in data |
|
366 | 1 | assert not api_mock.delete_proxy_port_metadata.call_count |
|
367 | |||
368 | 1 | async def test_add_proxy_port_metadata(self, monkeypatch) -> None: |
|
369 | """Test add proxy_port metadata.""" |
||
370 | 1 | api_mock = AsyncMock() |
|
371 | 1 | api_mock.get_evcs.return_value = {} |
|
372 | 1 | monkeypatch.setattr( |
|
373 | "napps.kytos.telemetry_int.main.api", |
||
374 | api_mock, |
||
375 | ) |
||
376 | 1 | intf_id, port_number = "00:00:00:00:00:00:00:01:1", 7 |
|
377 | 1 | endpoint = f"{self.base_endpoint}/uni/{intf_id}/proxy_port/{port_number}" |
|
378 | 1 | self.napp.controller.get_interface_by_id = MagicMock() |
|
379 | 1 | pp = MagicMock() |
|
380 | 1 | pp.status = EntityStatus.UP |
|
381 | 1 | self.napp.int_manager.get_proxy_port_or_raise = MagicMock() |
|
382 | 1 | self.napp.int_manager.get_proxy_port_or_raise.return_value = pp |
|
383 | 1 | response = await self.api_client.post(endpoint) |
|
384 | 1 | assert response.status_code == 200 |
|
385 | 1 | data = response.json() |
|
386 | 1 | assert data == "Operation successful" |
|
387 | 1 | assert api_mock.add_proxy_port_metadata.call_count == 1 |
|
388 | |||
389 | 1 | async def test_add_proxy_port_metadata_early_ret(self, monkeypatch) -> None: |
|
390 | """Test add proxy_port metadata early ret.""" |
||
391 | 1 | api_mock = AsyncMock() |
|
392 | 1 | monkeypatch.setattr( |
|
393 | "napps.kytos.telemetry_int.main.api", |
||
394 | api_mock, |
||
395 | ) |
||
396 | 1 | intf_id, port_number = "00:00:00:00:00:00:00:01:1", 7 |
|
397 | 1 | endpoint = f"{self.base_endpoint}/uni/{intf_id}/proxy_port/{port_number}" |
|
398 | 1 | intf_mock = MagicMock() |
|
399 | 1 | intf_mock.metadata = {"proxy_port": port_number} |
|
400 | 1 | self.napp.controller.get_interface_by_id = MagicMock() |
|
401 | 1 | self.napp.controller.get_interface_by_id.return_value = intf_mock |
|
402 | 1 | response = await self.api_client.post(endpoint) |
|
403 | 1 | assert response.status_code == 200 |
|
404 | 1 | data = response.json() |
|
405 | 1 | assert data == "Operation successful" |
|
406 | 1 | assert not api_mock.add_proxy_port_metadata.call_count |
|
407 | |||
408 | 1 | async def test_add_proxy_port_metadata_conflict(self, monkeypatch) -> None: |
|
409 | """Test add proxy_port metadata conflict.""" |
||
410 | 1 | api_mock = AsyncMock() |
|
411 | 1 | api_mock.get_evcs.return_value = {} |
|
412 | 1 | monkeypatch.setattr( |
|
413 | "napps.kytos.telemetry_int.main.api", |
||
414 | api_mock, |
||
415 | ) |
||
416 | 1 | intf_id, port_number = "00:00:00:00:00:00:00:01:1", 7 |
|
417 | 1 | endpoint = f"{self.base_endpoint}/uni/{intf_id}/proxy_port/{port_number}" |
|
418 | 1 | self.napp.controller.get_interface_by_id = MagicMock() |
|
419 | 1 | pp = MagicMock() |
|
420 | 1 | pp.status = EntityStatus.UP |
|
421 | 1 | self.napp.int_manager.get_proxy_port_or_raise = MagicMock() |
|
422 | 1 | self.napp.int_manager.get_proxy_port_or_raise.side_effect = ProxyPortShared( |
|
423 | "no_evc_id", "boom" |
||
424 | ) |
||
425 | 1 | response = await self.api_client.post(endpoint) |
|
426 | 1 | assert response.status_code == 409 |
|
427 | |||
428 | 1 | async def test_add_proxy_port_metadata_force(self, monkeypatch) -> None: |
|
429 | """Test add proxy_port metadata force.""" |
||
430 | 1 | api_mock = AsyncMock() |
|
431 | 1 | api_mock.get_evcs.return_value = {} |
|
432 | 1 | monkeypatch.setattr( |
|
433 | "napps.kytos.telemetry_int.main.api", |
||
434 | api_mock, |
||
435 | ) |
||
436 | 1 | intf_id, port_number = "00:00:00:00:00:00:00:01:1", 7 |
|
437 | 1 | force = "true" |
|
438 | 1 | endpoint = ( |
|
439 | f"{self.base_endpoint}/uni/{intf_id}/proxy_port/{port_number}?force={force}" |
||
440 | ) |
||
441 | 1 | self.napp.controller.get_interface_by_id = MagicMock() |
|
442 | 1 | pp = MagicMock() |
|
443 | # despite proxy port down, with force true the request shoudl succeed |
||
444 | 1 | pp.status = EntityStatus.DOWN |
|
445 | 1 | self.napp.int_manager.get_proxy_port_or_raise = MagicMock() |
|
446 | 1 | self.napp.int_manager.get_proxy_port_or_raise.return_value = pp |
|
447 | 1 | response = await self.api_client.post(endpoint) |
|
448 | 1 | assert response.status_code == 200 |
|
449 | 1 | data = response.json() |
|
450 | 1 | assert data == "Operation successful" |
|
451 | 1 | assert api_mock.add_proxy_port_metadata.call_count == 1 |
|
452 | |||
453 | 1 | force = "false" |
|
454 | 1 | endpoint = ( |
|
455 | f"{self.base_endpoint}/uni/{intf_id}/proxy_port/{port_number}?force={force}" |
||
456 | ) |
||
457 | 1 | response = await self.api_client.post(endpoint) |
|
458 | 1 | assert response.status_code == 409 |
|
459 | 1 | assert "isn't UP" in response.json()["description"] |
|
460 | |||
461 | 1 | async def test_list_proxy_port(self) -> None: |
|
462 | """Test list proxy port.""" |
||
463 | 1 | endpoint = f"{self.base_endpoint}/uni/proxy_port" |
|
464 | 1 | response = await self.api_client.get(endpoint) |
|
465 | 1 | assert response.status_code == 200 |
|
466 | 1 | data = response.json() |
|
467 | 1 | assert not data |
|
468 | |||
469 | 1 | sw1, intf_mock = MagicMock(), MagicMock() |
|
470 | 1 | intf_mock.metadata = {"proxy_port": 1} |
|
471 | 1 | intf_mock.status.value = "UP" |
|
472 | 1 | intf_mock.id = "1" |
|
473 | 1 | sw1.interfaces = {"intf1": intf_mock} |
|
474 | 1 | self.napp.controller.switches = {"sw1": sw1} |
|
475 | 1 | response = await self.api_client.get(endpoint) |
|
476 | 1 | assert response.status_code == 200 |
|
477 | 1 | data = response.json() |
|
478 | 1 | expected = [ |
|
479 | { |
||
480 | "proxy_port": { |
||
481 | "port_number": 1, |
||
482 | "status": "DOWN", |
||
483 | "status_reason": ["UNI interface 1 not found"], |
||
484 | }, |
||
485 | "uni": {"id": "1", "status": "UP", "status_reason": []}, |
||
486 | } |
||
487 | ] |
||
488 | 1 | assert data == expected |
|
489 | |||
490 | 1 | pp = MagicMock() |
|
491 | 1 | self.napp.int_manager.get_proxy_port_or_raise = MagicMock() |
|
492 | 1 | self.napp.int_manager.get_proxy_port_or_raise.return_value = pp |
|
493 | 1 | pp.status.value = "UP" |
|
494 | |||
495 | 1 | response = await self.api_client.get(endpoint) |
|
496 | 1 | assert response.status_code == 200 |
|
497 | 1 | data = response.json() |
|
498 | 1 | expected = [ |
|
499 | { |
||
500 | "proxy_port": { |
||
501 | "port_number": 1, |
||
502 | "status": "UP", |
||
503 | "status_reason": [], |
||
504 | }, |
||
505 | "uni": {"id": "1", "status": "UP", "status_reason": []}, |
||
506 | } |
||
507 | ] |
||
508 | 1 | assert data == expected |
|
509 | |||
510 | 1 | async def test_on_table_enabled(self) -> None: |
|
511 | """Test on_table_enabled.""" |
||
512 | 1 | assert self.napp.int_manager.flow_builder.table_group == {"evpl": 2, "epl": 3} |
|
513 | 1 | await self.napp.on_table_enabled( |
|
514 | KytosEvent(content={"telemetry_int": {"evpl": 22, "epl": 33}}) |
||
515 | ) |
||
516 | 1 | assert self.napp.int_manager.flow_builder.table_group == {"evpl": 22, "epl": 33} |
|
517 | 1 | assert self.napp.controller.buffers.app.aput.call_count == 1 |
|
518 | |||
519 | 1 | async def test_on_table_enabled_no_group(self) -> None: |
|
520 | """Test on_table_enabled no group.""" |
||
521 | 1 | await self.napp.on_table_enabled( |
|
522 | KytosEvent(content={"mef_eline": {"evpl": 22, "epl": 33}}) |
||
523 | ) |
||
524 | 1 | assert not self.napp.controller.buffers.app.aput.call_count |
|
525 | |||
526 | 1 | async def test_on_evc_deployed(self) -> None: |
|
527 | """Test on_evc_deployed.""" |
||
528 | 1 | content = {"metadata": {"telemetry_request": {}}, "id": "some_id"} |
|
529 | 1 | self.napp.int_manager.redeploy_int = AsyncMock() |
|
530 | 1 | self.napp.int_manager.enable_int = AsyncMock() |
|
531 | 1 | await self.napp.on_evc_deployed(KytosEvent(content=content)) |
|
532 | 1 | assert self.napp.int_manager.enable_int.call_count == 1 |
|
533 | 1 | assert self.napp.int_manager.redeploy_int.call_count == 0 |
|
534 | |||
535 | 1 | content = {"metadata": {"telemetry": {"enabled": True}}, "id": "some_id"} |
|
536 | 1 | await self.napp.on_evc_deployed(KytosEvent(content=content)) |
|
537 | 1 | assert self.napp.int_manager.enable_int.call_count == 1 |
|
538 | 1 | assert self.napp.int_manager.redeploy_int.call_count == 1 |
|
539 | |||
540 | 1 | async def test_on_evc_deployed_error(self, monkeypatch) -> None: |
|
541 | """Test on_evc_deployed error.""" |
||
542 | 1 | content = {"metadata": {"telemetry_request": {}}, "id": "some_id"} |
|
543 | 1 | self.napp.int_manager.enable_int = AsyncMock() |
|
544 | 1 | self.napp.int_manager.enable_int.side_effect = EVCError("no_id", "boom") |
|
545 | 1 | log_mock = MagicMock() |
|
546 | 1 | monkeypatch.setattr("napps.kytos.telemetry_int.main.log", log_mock) |
|
547 | 1 | api_mock = AsyncMock() |
|
548 | 1 | monkeypatch.setattr( |
|
549 | "napps.kytos.telemetry_int.main.api", |
||
550 | api_mock, |
||
551 | ) |
||
552 | 1 | await self.napp.on_evc_deployed(KytosEvent(content=content)) |
|
553 | 1 | assert log_mock.error.call_count == 1 |
|
554 | 1 | assert api_mock.add_evcs_metadata.call_count == 1 |
|
555 | |||
556 | 1 | async def test_on_evc_deleted(self) -> None: |
|
557 | """Test on_evc_deleted.""" |
||
558 | 1 | content = {"metadata": {"telemetry": {"enabled": True}}, "id": "some_id"} |
|
559 | 1 | self.napp.int_manager.disable_int = AsyncMock() |
|
560 | 1 | await self.napp.on_evc_deleted(KytosEvent(content=content)) |
|
561 | 1 | assert self.napp.int_manager.disable_int.call_count == 1 |
|
562 | |||
563 | 1 | async def test_on_uni_active_updated(self, monkeypatch) -> None: |
|
564 | """Test on UNI active updated.""" |
||
565 | 1 | api_mock = AsyncMock() |
|
566 | 1 | monkeypatch.setattr( |
|
567 | "napps.kytos.telemetry_int.main.api", |
||
568 | api_mock, |
||
569 | ) |
||
570 | 1 | content = { |
|
571 | "metadata": {"telemetry": {"enabled": True}}, |
||
572 | "id": "some_id", |
||
573 | "active": True, |
||
574 | } |
||
575 | 1 | await self.napp.on_uni_active_updated(KytosEvent(content=content)) |
|
576 | 1 | assert api_mock.add_evcs_metadata.call_count == 1 |
|
577 | 1 | args = api_mock.add_evcs_metadata.call_args[0][1] |
|
578 | 1 | assert args["telemetry"]["status"] == "UP" |
|
579 | |||
580 | 1 | content["active"] = False |
|
581 | 1 | await self.napp.on_uni_active_updated(KytosEvent(content=content)) |
|
582 | 1 | assert api_mock.add_evcs_metadata.call_count == 2 |
|
583 | 1 | args = api_mock.add_evcs_metadata.call_args[0][1] |
|
584 | 1 | assert args["telemetry"]["status"] == "DOWN" |
|
585 | |||
586 | 1 | async def test_on_evc_undeployed(self) -> None: |
|
587 | """Test on_evc_undeployed.""" |
||
588 | 1 | content = { |
|
589 | "enabled": False, |
||
590 | "metadata": {"telemetry": {"enabled": False}}, |
||
591 | "id": "some_id", |
||
592 | } |
||
593 | 1 | self.napp.int_manager.remove_int_flows = AsyncMock() |
|
594 | 1 | await self.napp.on_evc_undeployed(KytosEvent(content=content)) |
|
595 | 1 | assert self.napp.int_manager.remove_int_flows.call_count == 0 |
|
596 | |||
597 | 1 | content["metadata"]["telemetry"]["enabled"] = True |
|
598 | 1 | await self.napp.on_evc_undeployed(KytosEvent(content=content)) |
|
599 | 1 | assert self.napp.int_manager.remove_int_flows.call_count == 1 |
|
600 | |||
601 | 1 | async def test_on_evc_redeployed_link(self) -> None: |
|
602 | """Test on redeployed_link_down|redeployed_link_up.""" |
||
603 | 1 | content = { |
|
604 | "enabled": True, |
||
605 | "metadata": {"telemetry": {"enabled": False}}, |
||
606 | "id": "some_id", |
||
607 | } |
||
608 | 1 | self.napp.int_manager.redeploy_int = AsyncMock() |
|
609 | 1 | await self.napp.on_evc_redeployed_link(KytosEvent(content=content)) |
|
610 | 1 | assert self.napp.int_manager.redeploy_int.call_count == 0 |
|
611 | |||
612 | 1 | content["metadata"]["telemetry"]["enabled"] = True |
|
613 | 1 | await self.napp.on_evc_redeployed_link(KytosEvent(content=content)) |
|
614 | 1 | assert self.napp.int_manager.redeploy_int.call_count == 1 |
|
615 | |||
616 | 1 | async def test_on_evc_redeployed_link_error(self, monkeypatch) -> None: |
|
617 | """Test on redeployed_link_down|redeployed_link_up error.""" |
||
618 | 1 | content = { |
|
619 | "enabled": True, |
||
620 | "metadata": {"telemetry": {"enabled": True}}, |
||
621 | "id": "some_id", |
||
622 | } |
||
623 | 1 | log_mock = MagicMock() |
|
624 | 1 | monkeypatch.setattr("napps.kytos.telemetry_int.main.log", log_mock) |
|
625 | 1 | self.napp.int_manager.redeploy_int = AsyncMock() |
|
626 | 1 | self.napp.int_manager.redeploy_int.side_effect = EVCError("no_id", "boom") |
|
627 | 1 | await self.napp.on_evc_redeployed_link(KytosEvent(content=content)) |
|
628 | 1 | assert log_mock.error.call_count == 1 |
|
629 | |||
630 | 1 | async def test_on_evc_error_redeployed_link_down(self) -> None: |
|
631 | """Test error_redeployed_link_down.""" |
||
632 | 1 | content = { |
|
633 | "enabled": True, |
||
634 | "metadata": {"telemetry": {"enabled": False}}, |
||
635 | "id": "some_id", |
||
636 | } |
||
637 | 1 | self.napp.int_manager.remove_int_flows = AsyncMock() |
|
638 | 1 | await self.napp.on_evc_error_redeployed_link_down(KytosEvent(content=content)) |
|
639 | 1 | assert self.napp.int_manager.remove_int_flows.call_count == 0 |
|
640 | |||
641 | 1 | content["metadata"]["telemetry"]["enabled"] = True |
|
642 | 1 | await self.napp.on_evc_error_redeployed_link_down(KytosEvent(content=content)) |
|
643 | 1 | assert self.napp.int_manager.remove_int_flows.call_count == 1 |
|
644 | |||
645 | 1 | async def test_on_link_down(self) -> None: |
|
646 | """Test on link_down.""" |
||
647 | 1 | self.napp.int_manager.handle_pp_link_down = AsyncMock() |
|
648 | 1 | await self.napp.on_link_down(KytosEvent(content={"link": MagicMock()})) |
|
649 | 1 | assert self.napp.int_manager.handle_pp_link_down.call_count == 1 |
|
650 | |||
651 | 1 | async def test_on_link_up(self) -> None: |
|
652 | """Test on link_up.""" |
||
653 | 1 | self.napp.int_manager.handle_pp_link_up = AsyncMock() |
|
654 | 1 | await self.napp.on_link_up(KytosEvent(content={"link": MagicMock()})) |
|
655 | 1 | assert self.napp.int_manager.handle_pp_link_up.call_count == 1 |
|
656 | |||
657 | 1 | async def test_on_table_enabled_error(self, monkeypatch) -> None: |
|
658 | """Test on_table_enabled error case.""" |
||
659 | 1 | assert self.napp.int_manager.flow_builder.table_group == {"evpl": 2, "epl": 3} |
|
660 | 1 | log_mock = MagicMock() |
|
661 | 1 | monkeypatch.setattr("napps.kytos.telemetry_int.main.log", log_mock) |
|
662 | 1 | await self.napp.on_table_enabled( |
|
663 | KytosEvent(content={"telemetry_int": {"invalid": 1}}) |
||
664 | ) |
||
665 | 1 | assert self.napp.int_manager.flow_builder.table_group == {"evpl": 2, "epl": 3} |
|
666 | 1 | assert log_mock.error.call_count == 1 |
|
667 | 1 | assert not self.napp.controller.buffers.app.aput.call_count |
|
668 | |||
669 | 1 | async def test_on_flow_mod_error(self, monkeypatch) -> None: |
|
670 | """Test on_flow_mod_error.""" |
||
671 | 1 | api_mock_main, api_mock_int, flow = AsyncMock(), AsyncMock(), MagicMock() |
|
672 | 1 | flow.cookie = 0xA800000000000001 |
|
673 | 1 | monkeypatch.setattr( |
|
674 | "napps.kytos.telemetry_int.main.api", |
||
675 | api_mock_main, |
||
676 | ) |
||
677 | 1 | monkeypatch.setattr( |
|
678 | "napps.kytos.telemetry_int.managers.int.api", |
||
679 | api_mock_int, |
||
680 | ) |
||
681 | 1 | cookie = utils.get_id_from_cookie(flow.cookie) |
|
682 | 1 | api_mock_main.get_evc.return_value = { |
|
683 | cookie: {"metadata": {"telemetry": {"enabled": True}}} |
||
684 | } |
||
685 | 1 | api_mock_int.get_stored_flows.return_value = {cookie: [MagicMock()]} |
|
686 | 1 | self.napp.int_manager._remove_int_flows_by_cookies = AsyncMock() |
|
687 | |||
688 | 1 | event = KytosEvent(content={"flow": flow, "error_command": "add"}) |
|
689 | 1 | await self.napp.on_flow_mod_error(event) |
|
690 | |||
691 | 1 | assert api_mock_main.get_evc.call_count == 1 |
|
692 | 1 | assert api_mock_int.get_stored_flows.call_count == 1 |
|
693 | 1 | assert api_mock_int.add_evcs_metadata.call_count == 1 |
|
694 | 1 | assert self.napp.int_manager._remove_int_flows_by_cookies.call_count == 1 |
|
695 | |||
696 | 1 | async def test_on_mef_eline_evcs_loaded(self): |
|
697 | """Test on_mef_eline_evcs_loaded.""" |
||
698 | 1 | evcs = {"1": {}, "2": {}} |
|
699 | 1 | event = KytosEvent(content=evcs) |
|
700 | 1 | self.napp.int_manager = MagicMock() |
|
701 | 1 | await self.napp.on_mef_eline_evcs_loaded(event) |
|
702 | 1 | self.napp.int_manager.load_uni_src_proxy_ports.assert_called_with(evcs) |
|
703 | |||
704 | 1 | async def test_on_intf_metadata_remove(self): |
|
705 | """Test on_intf_metadata_removed.""" |
||
706 | 1 | intf = MagicMock() |
|
707 | 1 | event = KytosEvent(content={"interface": intf}) |
|
708 | 1 | self.napp.int_manager = MagicMock() |
|
709 | 1 | await self.napp.on_intf_metadata_removed(event) |
|
710 | 1 | self.napp.int_manager.handle_pp_metadata_removed.assert_called_with(intf) |
|
711 | |||
712 | 1 | async def test_on_intf_metadata_added(self): |
|
713 | """Test on_intf_metadata_added.""" |
||
714 | 1 | intf = MagicMock() |
|
715 | 1 | event = KytosEvent(content={"interface": intf}) |
|
716 | 1 | self.napp.int_manager = MagicMock() |
|
717 | 1 | await self.napp.on_intf_metadata_added(event) |
|
718 | 1 | self.napp.int_manager.handle_pp_metadata_added.assert_called_with(intf) |
|
719 | |||
720 | 1 | async def test_on_failover_deployed(self): |
|
721 | """Test on_failover_deployed.""" |
||
722 | 1 | event = KytosEvent(content={}) |
|
723 | 1 | self.napp.int_manager = MagicMock() |
|
724 | 1 | await self.napp.on_failover_deployed(event) |
|
725 | 1 | self.napp.int_manager.handle_failover_flows.assert_called() |
|
726 | |||
727 | 1 | async def test_on_failover_link_down(self): |
|
728 | """Test on_failover_link_down.""" |
||
729 | 1 | event = KytosEvent(content={}) |
|
730 | 1 | self.napp.int_manager = MagicMock() |
|
731 | 1 | await self.napp.on_failover_link_down(event) |
|
732 | 1 | self.napp.int_manager.handle_failover_flows.assert_called() |
|
733 | |||
734 | 1 | async def test_on_failover_old_path(self): |
|
735 | """Test on_failover_old_path.""" |
||
736 | 1 | event = KytosEvent(content={}) |
|
737 | 1 | self.napp.int_manager = MagicMock() |
|
738 | 1 | await self.napp.on_failover_old_path(event) |
|
739 | self.napp.int_manager.handle_failover_flows.assert_called() |
||
740 |