|
@@ 1349-1420 (lines=72) @@
|
| 1346 |
|
assert 200 == response.status_code |
| 1347 |
|
evc_deploy.assert_called_once() |
| 1348 |
|
|
| 1349 |
|
@patch("napps.kytos.mef_eline.main.Main._check_no_tag_duplication") |
| 1350 |
|
@patch("napps.kytos.mef_eline.models.evc.EVC._tag_lists_equal") |
| 1351 |
|
@patch("napps.kytos.mef_eline.main.Main._use_uni_tags") |
| 1352 |
|
@patch("napps.kytos.mef_eline.models.evc.EVC.deploy") |
| 1353 |
|
@patch("napps.kytos.mef_eline.scheduler.Scheduler.add") |
| 1354 |
|
@patch("napps.kytos.mef_eline.main.Main._uni_from_dict") |
| 1355 |
|
@patch("napps.kytos.mef_eline.controllers.ELineController.upsert_evc") |
| 1356 |
|
@patch("napps.kytos.mef_eline.models.evc.EVC._validate") |
| 1357 |
|
@patch("napps.kytos.mef_eline.main.EVC.as_dict") |
| 1358 |
|
async def test_update_circuit_invalid_json( |
| 1359 |
|
self, |
| 1360 |
|
evc_as_dict_mock, |
| 1361 |
|
validate_mock, |
| 1362 |
|
mongo_controller_upsert_mock, |
| 1363 |
|
uni_from_dict_mock, |
| 1364 |
|
sched_add_mock, |
| 1365 |
|
evc_deploy_mock, |
| 1366 |
|
mock_use_uni_tags, |
| 1367 |
|
mock_tags_equal, |
| 1368 |
|
mock_check_duplicate |
| 1369 |
|
): |
| 1370 |
|
"""Test update a circuit circuit.""" |
| 1371 |
|
self.napp.controller.loop = asyncio.get_running_loop() |
| 1372 |
|
validate_mock.return_value = True |
| 1373 |
|
mongo_controller_upsert_mock.return_value = True |
| 1374 |
|
sched_add_mock.return_value = True |
| 1375 |
|
evc_deploy_mock.return_value = True |
| 1376 |
|
mock_use_uni_tags.return_value = True |
| 1377 |
|
mock_tags_equal.return_value = True |
| 1378 |
|
mock_check_duplicate.return_value = True |
| 1379 |
|
uni1 = create_autospec(UNI) |
| 1380 |
|
uni2 = create_autospec(UNI) |
| 1381 |
|
uni1.interface = create_autospec(Interface) |
| 1382 |
|
uni2.interface = create_autospec(Interface) |
| 1383 |
|
uni1.interface.switch = "00:00:00:00:00:00:00:01" |
| 1384 |
|
uni2.interface.switch = "00:00:00:00:00:00:00:02" |
| 1385 |
|
uni_from_dict_mock.side_effect = [uni1, uni2, uni1, uni2] |
| 1386 |
|
|
| 1387 |
|
payload1 = { |
| 1388 |
|
"name": "my evc1", |
| 1389 |
|
"uni_a": { |
| 1390 |
|
"interface_id": "00:00:00:00:00:00:00:01:1", |
| 1391 |
|
"tag": {"tag_type": 'vlan', "value": 80}, |
| 1392 |
|
}, |
| 1393 |
|
"uni_z": { |
| 1394 |
|
"interface_id": "00:00:00:00:00:00:00:02:2", |
| 1395 |
|
"tag": {"tag_type": 'vlan', "value": 1}, |
| 1396 |
|
}, |
| 1397 |
|
"dynamic_backup_path": True, |
| 1398 |
|
} |
| 1399 |
|
|
| 1400 |
|
payload2 = { |
| 1401 |
|
"dynamic_backup_path": False, |
| 1402 |
|
} |
| 1403 |
|
|
| 1404 |
|
evc_as_dict_mock.return_value = payload1 |
| 1405 |
|
response = await self.api_client.post( |
| 1406 |
|
f"{self.base_endpoint}/v2/evc/", |
| 1407 |
|
json=payload1 |
| 1408 |
|
) |
| 1409 |
|
assert 201 == response.status_code |
| 1410 |
|
|
| 1411 |
|
evc_as_dict_mock.return_value = payload2 |
| 1412 |
|
current_data = response.json() |
| 1413 |
|
circuit_id = current_data["circuit_id"] |
| 1414 |
|
response = await self.api_client.patch( |
| 1415 |
|
f"{self.base_endpoint}/v2/evc/{circuit_id}", |
| 1416 |
|
json=payload2 |
| 1417 |
|
) |
| 1418 |
|
current_data = response.json() |
| 1419 |
|
assert 400 == response.status_code |
| 1420 |
|
assert "must have a primary path or" in current_data["description"] |
| 1421 |
|
|
| 1422 |
|
@patch("napps.kytos.mef_eline.main.Main._check_no_tag_duplication") |
| 1423 |
|
@patch("napps.kytos.mef_eline.models.evc.EVC._tag_lists_equal") |
|
@@ 1546-1610 (lines=65) @@
|
| 1543 |
|
with pytest.raises(ValueError): |
| 1544 |
|
self.napp._uni_from_dict(uni_dict) |
| 1545 |
|
|
| 1546 |
|
@patch("napps.kytos.mef_eline.main.Main._check_no_tag_duplication") |
| 1547 |
|
@patch("napps.kytos.mef_eline.models.evc.EVC._tag_lists_equal") |
| 1548 |
|
@patch("napps.kytos.mef_eline.main.Main._use_uni_tags") |
| 1549 |
|
@patch("napps.kytos.mef_eline.models.evc.EVC.deploy") |
| 1550 |
|
@patch("napps.kytos.mef_eline.scheduler.Scheduler.add") |
| 1551 |
|
@patch("napps.kytos.mef_eline.main.Main._uni_from_dict") |
| 1552 |
|
@patch("napps.kytos.mef_eline.models.evc.EVC._validate") |
| 1553 |
|
@patch("napps.kytos.mef_eline.controllers.ELineController.upsert_evc") |
| 1554 |
|
async def test_update_evc_no_json_mime( |
| 1555 |
|
self, |
| 1556 |
|
mongo_controller_upsert_mock, |
| 1557 |
|
validate_mock, |
| 1558 |
|
uni_from_dict_mock, |
| 1559 |
|
sched_add_mock, |
| 1560 |
|
evc_deploy_mock, |
| 1561 |
|
mock_use_uni_tags, |
| 1562 |
|
mock_tags_equal, |
| 1563 |
|
mock_check_duplicate |
| 1564 |
|
): |
| 1565 |
|
"""Test update a circuit with wrong mimetype.""" |
| 1566 |
|
self.napp.controller.loop = asyncio.get_running_loop() |
| 1567 |
|
validate_mock.return_value = True |
| 1568 |
|
sched_add_mock.return_value = True |
| 1569 |
|
evc_deploy_mock.return_value = True |
| 1570 |
|
mock_use_uni_tags.return_value = True |
| 1571 |
|
mock_tags_equal.return_value = True |
| 1572 |
|
mock_check_duplicate.return_value = True |
| 1573 |
|
uni1 = create_autospec(UNI) |
| 1574 |
|
uni2 = create_autospec(UNI) |
| 1575 |
|
uni1.interface = create_autospec(Interface) |
| 1576 |
|
uni2.interface = create_autospec(Interface) |
| 1577 |
|
uni1.interface.switch = "00:00:00:00:00:00:00:01" |
| 1578 |
|
uni2.interface.switch = "00:00:00:00:00:00:00:02" |
| 1579 |
|
uni_from_dict_mock.side_effect = [uni1, uni2, uni1, uni2] |
| 1580 |
|
mongo_controller_upsert_mock.return_value = True |
| 1581 |
|
|
| 1582 |
|
payload1 = { |
| 1583 |
|
"name": "my evc1", |
| 1584 |
|
"uni_a": { |
| 1585 |
|
"interface_id": "00:00:00:00:00:00:00:01:1", |
| 1586 |
|
"tag": {"tag_type": 'vlan', "value": 80}, |
| 1587 |
|
}, |
| 1588 |
|
"uni_z": { |
| 1589 |
|
"interface_id": "00:00:00:00:00:00:00:02:2", |
| 1590 |
|
"tag": {"tag_type": 'vlan', "value": 1}, |
| 1591 |
|
}, |
| 1592 |
|
"dynamic_backup_path": True, |
| 1593 |
|
} |
| 1594 |
|
|
| 1595 |
|
payload2 = {"dynamic_backup_path": False} |
| 1596 |
|
|
| 1597 |
|
response = await self.api_client.post( |
| 1598 |
|
f"{self.base_endpoint}/v2/evc/", |
| 1599 |
|
json=payload1, |
| 1600 |
|
) |
| 1601 |
|
assert 201 == response.status_code |
| 1602 |
|
|
| 1603 |
|
current_data = response.json() |
| 1604 |
|
circuit_id = current_data["circuit_id"] |
| 1605 |
|
response = await self.api_client.patch( |
| 1606 |
|
f"{self.base_endpoint}/v2/evc/{circuit_id}", data=payload2 |
| 1607 |
|
) |
| 1608 |
|
current_data = response.json() |
| 1609 |
|
assert 415 == response.status_code |
| 1610 |
|
assert "application/json" in current_data["description"] |
| 1611 |
|
|
| 1612 |
|
async def test_delete_no_evc(self): |
| 1613 |
|
"""Test delete when EVC does not exist.""" |