|
@@ 1452-1523 (lines=72) @@
|
| 1449 |
|
assert 200 == response.status_code |
| 1450 |
|
evc_deploy.assert_called_once() |
| 1451 |
|
|
| 1452 |
|
@patch("napps.kytos.mef_eline.main.Main._check_no_tag_duplication") |
| 1453 |
|
@patch("napps.kytos.mef_eline.models.evc.EVC._tag_lists_equal") |
| 1454 |
|
@patch("napps.kytos.mef_eline.main.Main._use_uni_tags") |
| 1455 |
|
@patch("napps.kytos.mef_eline.models.evc.EVC.deploy") |
| 1456 |
|
@patch("napps.kytos.mef_eline.scheduler.Scheduler.add") |
| 1457 |
|
@patch("napps.kytos.mef_eline.main.Main._uni_from_dict") |
| 1458 |
|
@patch("napps.kytos.mef_eline.controllers.ELineController.upsert_evc") |
| 1459 |
|
@patch("napps.kytos.mef_eline.models.evc.EVC._validate") |
| 1460 |
|
@patch("napps.kytos.mef_eline.main.EVC.as_dict") |
| 1461 |
|
async def test_update_circuit_invalid_json( |
| 1462 |
|
self, |
| 1463 |
|
evc_as_dict_mock, |
| 1464 |
|
validate_mock, |
| 1465 |
|
mongo_controller_upsert_mock, |
| 1466 |
|
uni_from_dict_mock, |
| 1467 |
|
sched_add_mock, |
| 1468 |
|
evc_deploy_mock, |
| 1469 |
|
mock_use_uni_tags, |
| 1470 |
|
mock_tags_equal, |
| 1471 |
|
mock_check_duplicate |
| 1472 |
|
): |
| 1473 |
|
"""Test update a circuit circuit.""" |
| 1474 |
|
self.napp.controller.loop = asyncio.get_running_loop() |
| 1475 |
|
validate_mock.return_value = True |
| 1476 |
|
mongo_controller_upsert_mock.return_value = True |
| 1477 |
|
sched_add_mock.return_value = True |
| 1478 |
|
evc_deploy_mock.return_value = True |
| 1479 |
|
mock_use_uni_tags.return_value = True |
| 1480 |
|
mock_tags_equal.return_value = True |
| 1481 |
|
mock_check_duplicate.return_value = True |
| 1482 |
|
uni1 = create_autospec(UNI) |
| 1483 |
|
uni2 = create_autospec(UNI) |
| 1484 |
|
uni1.interface = create_autospec(Interface) |
| 1485 |
|
uni2.interface = create_autospec(Interface) |
| 1486 |
|
uni1.interface.switch = "00:00:00:00:00:00:00:01" |
| 1487 |
|
uni2.interface.switch = "00:00:00:00:00:00:00:02" |
| 1488 |
|
uni_from_dict_mock.side_effect = [uni1, uni2, uni1, uni2] |
| 1489 |
|
|
| 1490 |
|
payload1 = { |
| 1491 |
|
"name": "my evc1", |
| 1492 |
|
"uni_a": { |
| 1493 |
|
"interface_id": "00:00:00:00:00:00:00:01:1", |
| 1494 |
|
"tag": {"tag_type": 'vlan', "value": 80}, |
| 1495 |
|
}, |
| 1496 |
|
"uni_z": { |
| 1497 |
|
"interface_id": "00:00:00:00:00:00:00:02:2", |
| 1498 |
|
"tag": {"tag_type": 'vlan', "value": 1}, |
| 1499 |
|
}, |
| 1500 |
|
"dynamic_backup_path": True, |
| 1501 |
|
} |
| 1502 |
|
|
| 1503 |
|
payload2 = { |
| 1504 |
|
"dynamic_backup_path": False, |
| 1505 |
|
} |
| 1506 |
|
|
| 1507 |
|
evc_as_dict_mock.return_value = payload1 |
| 1508 |
|
response = await self.api_client.post( |
| 1509 |
|
f"{self.base_endpoint}/v2/evc/", |
| 1510 |
|
json=payload1 |
| 1511 |
|
) |
| 1512 |
|
assert 201 == response.status_code |
| 1513 |
|
|
| 1514 |
|
evc_as_dict_mock.return_value = payload2 |
| 1515 |
|
current_data = response.json() |
| 1516 |
|
circuit_id = current_data["circuit_id"] |
| 1517 |
|
response = await self.api_client.patch( |
| 1518 |
|
f"{self.base_endpoint}/v2/evc/{circuit_id}", |
| 1519 |
|
json=payload2 |
| 1520 |
|
) |
| 1521 |
|
current_data = response.json() |
| 1522 |
|
assert 400 == response.status_code |
| 1523 |
|
assert "must have a primary path or" in current_data["description"] |
| 1524 |
|
|
| 1525 |
|
@patch("napps.kytos.mef_eline.main.Main._check_no_tag_duplication") |
| 1526 |
|
@patch("napps.kytos.mef_eline.models.evc.EVC._tag_lists_equal") |
|
@@ 1721-1785 (lines=65) @@
|
| 1718 |
|
with pytest.raises(ValueError): |
| 1719 |
|
self.napp._uni_from_dict(uni_dict) |
| 1720 |
|
|
| 1721 |
|
@patch("napps.kytos.mef_eline.main.Main._check_no_tag_duplication") |
| 1722 |
|
@patch("napps.kytos.mef_eline.models.evc.EVC._tag_lists_equal") |
| 1723 |
|
@patch("napps.kytos.mef_eline.main.Main._use_uni_tags") |
| 1724 |
|
@patch("napps.kytos.mef_eline.models.evc.EVC.deploy") |
| 1725 |
|
@patch("napps.kytos.mef_eline.scheduler.Scheduler.add") |
| 1726 |
|
@patch("napps.kytos.mef_eline.main.Main._uni_from_dict") |
| 1727 |
|
@patch("napps.kytos.mef_eline.models.evc.EVC._validate") |
| 1728 |
|
@patch("napps.kytos.mef_eline.controllers.ELineController.upsert_evc") |
| 1729 |
|
async def test_update_evc_no_json_mime( |
| 1730 |
|
self, |
| 1731 |
|
mongo_controller_upsert_mock, |
| 1732 |
|
validate_mock, |
| 1733 |
|
uni_from_dict_mock, |
| 1734 |
|
sched_add_mock, |
| 1735 |
|
evc_deploy_mock, |
| 1736 |
|
mock_use_uni_tags, |
| 1737 |
|
mock_tags_equal, |
| 1738 |
|
mock_check_duplicate |
| 1739 |
|
): |
| 1740 |
|
"""Test update a circuit with wrong mimetype.""" |
| 1741 |
|
self.napp.controller.loop = asyncio.get_running_loop() |
| 1742 |
|
validate_mock.return_value = True |
| 1743 |
|
sched_add_mock.return_value = True |
| 1744 |
|
evc_deploy_mock.return_value = True |
| 1745 |
|
mock_use_uni_tags.return_value = True |
| 1746 |
|
mock_tags_equal.return_value = True |
| 1747 |
|
mock_check_duplicate.return_value = True |
| 1748 |
|
uni1 = create_autospec(UNI) |
| 1749 |
|
uni2 = create_autospec(UNI) |
| 1750 |
|
uni1.interface = create_autospec(Interface) |
| 1751 |
|
uni2.interface = create_autospec(Interface) |
| 1752 |
|
uni1.interface.switch = "00:00:00:00:00:00:00:01" |
| 1753 |
|
uni2.interface.switch = "00:00:00:00:00:00:00:02" |
| 1754 |
|
uni_from_dict_mock.side_effect = [uni1, uni2, uni1, uni2] |
| 1755 |
|
mongo_controller_upsert_mock.return_value = True |
| 1756 |
|
|
| 1757 |
|
payload1 = { |
| 1758 |
|
"name": "my evc1", |
| 1759 |
|
"uni_a": { |
| 1760 |
|
"interface_id": "00:00:00:00:00:00:00:01:1", |
| 1761 |
|
"tag": {"tag_type": 'vlan', "value": 80}, |
| 1762 |
|
}, |
| 1763 |
|
"uni_z": { |
| 1764 |
|
"interface_id": "00:00:00:00:00:00:00:02:2", |
| 1765 |
|
"tag": {"tag_type": 'vlan', "value": 1}, |
| 1766 |
|
}, |
| 1767 |
|
"dynamic_backup_path": True, |
| 1768 |
|
} |
| 1769 |
|
|
| 1770 |
|
payload2 = {"dynamic_backup_path": False} |
| 1771 |
|
|
| 1772 |
|
response = await self.api_client.post( |
| 1773 |
|
f"{self.base_endpoint}/v2/evc/", |
| 1774 |
|
json=payload1, |
| 1775 |
|
) |
| 1776 |
|
assert 201 == response.status_code |
| 1777 |
|
|
| 1778 |
|
current_data = response.json() |
| 1779 |
|
circuit_id = current_data["circuit_id"] |
| 1780 |
|
response = await self.api_client.patch( |
| 1781 |
|
f"{self.base_endpoint}/v2/evc/{circuit_id}", data=payload2 |
| 1782 |
|
) |
| 1783 |
|
current_data = response.json() |
| 1784 |
|
assert 415 == response.status_code |
| 1785 |
|
assert "application/json" in current_data["description"] |
| 1786 |
|
|
| 1787 |
|
async def test_delete_no_evc(self): |
| 1788 |
|
"""Test delete when EVC does not exist.""" |