|
@@ 1487-1558 (lines=72) @@
|
| 1484 |
|
assert 200 == response.status_code |
| 1485 |
|
evc_deploy.assert_called_once() |
| 1486 |
|
|
| 1487 |
|
@patch("napps.kytos.mef_eline.main.Main._check_no_tag_duplication") |
| 1488 |
|
@patch("napps.kytos.mef_eline.models.evc.EVC._tag_lists_equal") |
| 1489 |
|
@patch("napps.kytos.mef_eline.main.Main._use_uni_tags") |
| 1490 |
|
@patch("napps.kytos.mef_eline.models.evc.EVC.deploy") |
| 1491 |
|
@patch("napps.kytos.mef_eline.scheduler.Scheduler.add") |
| 1492 |
|
@patch("napps.kytos.mef_eline.main.Main._uni_from_dict") |
| 1493 |
|
@patch("napps.kytos.mef_eline.controllers.ELineController.upsert_evc") |
| 1494 |
|
@patch("napps.kytos.mef_eline.models.evc.EVC._validate") |
| 1495 |
|
@patch("napps.kytos.mef_eline.main.EVC.as_dict") |
| 1496 |
|
async def test_update_circuit_invalid_json( |
| 1497 |
|
self, |
| 1498 |
|
evc_as_dict_mock, |
| 1499 |
|
validate_mock, |
| 1500 |
|
mongo_controller_upsert_mock, |
| 1501 |
|
uni_from_dict_mock, |
| 1502 |
|
sched_add_mock, |
| 1503 |
|
evc_deploy_mock, |
| 1504 |
|
mock_use_uni_tags, |
| 1505 |
|
mock_tags_equal, |
| 1506 |
|
mock_check_duplicate |
| 1507 |
|
): |
| 1508 |
|
"""Test update a circuit circuit.""" |
| 1509 |
|
self.napp.controller.loop = asyncio.get_running_loop() |
| 1510 |
|
validate_mock.return_value = True |
| 1511 |
|
mongo_controller_upsert_mock.return_value = True |
| 1512 |
|
sched_add_mock.return_value = True |
| 1513 |
|
evc_deploy_mock.return_value = True |
| 1514 |
|
mock_use_uni_tags.return_value = True |
| 1515 |
|
mock_tags_equal.return_value = True |
| 1516 |
|
mock_check_duplicate.return_value = True |
| 1517 |
|
uni1 = create_autospec(UNI) |
| 1518 |
|
uni2 = create_autospec(UNI) |
| 1519 |
|
uni1.interface = create_autospec(Interface) |
| 1520 |
|
uni2.interface = create_autospec(Interface) |
| 1521 |
|
uni1.interface.switch = "00:00:00:00:00:00:00:01" |
| 1522 |
|
uni2.interface.switch = "00:00:00:00:00:00:00:02" |
| 1523 |
|
uni_from_dict_mock.side_effect = [uni1, uni2, uni1, uni2] |
| 1524 |
|
|
| 1525 |
|
payload1 = { |
| 1526 |
|
"name": "my evc1", |
| 1527 |
|
"uni_a": { |
| 1528 |
|
"interface_id": "00:00:00:00:00:00:00:01:1", |
| 1529 |
|
"tag": {"tag_type": 'vlan', "value": 80}, |
| 1530 |
|
}, |
| 1531 |
|
"uni_z": { |
| 1532 |
|
"interface_id": "00:00:00:00:00:00:00:02:2", |
| 1533 |
|
"tag": {"tag_type": 'vlan', "value": 1}, |
| 1534 |
|
}, |
| 1535 |
|
"dynamic_backup_path": True, |
| 1536 |
|
} |
| 1537 |
|
|
| 1538 |
|
payload2 = { |
| 1539 |
|
"dynamic_backup_path": False, |
| 1540 |
|
} |
| 1541 |
|
|
| 1542 |
|
evc_as_dict_mock.return_value = payload1 |
| 1543 |
|
response = await self.api_client.post( |
| 1544 |
|
f"{self.base_endpoint}/v2/evc/", |
| 1545 |
|
json=payload1 |
| 1546 |
|
) |
| 1547 |
|
assert 201 == response.status_code |
| 1548 |
|
|
| 1549 |
|
evc_as_dict_mock.return_value = payload2 |
| 1550 |
|
current_data = response.json() |
| 1551 |
|
circuit_id = current_data["circuit_id"] |
| 1552 |
|
response = await self.api_client.patch( |
| 1553 |
|
f"{self.base_endpoint}/v2/evc/{circuit_id}", |
| 1554 |
|
json=payload2 |
| 1555 |
|
) |
| 1556 |
|
current_data = response.json() |
| 1557 |
|
assert 400 == response.status_code |
| 1558 |
|
assert "must have a primary path or" in current_data["description"] |
| 1559 |
|
|
| 1560 |
|
@patch("napps.kytos.mef_eline.main.Main._check_no_tag_duplication") |
| 1561 |
|
@patch("napps.kytos.mef_eline.models.evc.EVC._tag_lists_equal") |
|
@@ 1756-1820 (lines=65) @@
|
| 1753 |
|
with pytest.raises(ValueError): |
| 1754 |
|
self.napp._uni_from_dict(uni_dict) |
| 1755 |
|
|
| 1756 |
|
@patch("napps.kytos.mef_eline.main.Main._check_no_tag_duplication") |
| 1757 |
|
@patch("napps.kytos.mef_eline.models.evc.EVC._tag_lists_equal") |
| 1758 |
|
@patch("napps.kytos.mef_eline.main.Main._use_uni_tags") |
| 1759 |
|
@patch("napps.kytos.mef_eline.models.evc.EVC.deploy") |
| 1760 |
|
@patch("napps.kytos.mef_eline.scheduler.Scheduler.add") |
| 1761 |
|
@patch("napps.kytos.mef_eline.main.Main._uni_from_dict") |
| 1762 |
|
@patch("napps.kytos.mef_eline.models.evc.EVC._validate") |
| 1763 |
|
@patch("napps.kytos.mef_eline.controllers.ELineController.upsert_evc") |
| 1764 |
|
async def test_update_evc_no_json_mime( |
| 1765 |
|
self, |
| 1766 |
|
mongo_controller_upsert_mock, |
| 1767 |
|
validate_mock, |
| 1768 |
|
uni_from_dict_mock, |
| 1769 |
|
sched_add_mock, |
| 1770 |
|
evc_deploy_mock, |
| 1771 |
|
mock_use_uni_tags, |
| 1772 |
|
mock_tags_equal, |
| 1773 |
|
mock_check_duplicate |
| 1774 |
|
): |
| 1775 |
|
"""Test update a circuit with wrong mimetype.""" |
| 1776 |
|
self.napp.controller.loop = asyncio.get_running_loop() |
| 1777 |
|
validate_mock.return_value = True |
| 1778 |
|
sched_add_mock.return_value = True |
| 1779 |
|
evc_deploy_mock.return_value = True |
| 1780 |
|
mock_use_uni_tags.return_value = True |
| 1781 |
|
mock_tags_equal.return_value = True |
| 1782 |
|
mock_check_duplicate.return_value = True |
| 1783 |
|
uni1 = create_autospec(UNI) |
| 1784 |
|
uni2 = create_autospec(UNI) |
| 1785 |
|
uni1.interface = create_autospec(Interface) |
| 1786 |
|
uni2.interface = create_autospec(Interface) |
| 1787 |
|
uni1.interface.switch = "00:00:00:00:00:00:00:01" |
| 1788 |
|
uni2.interface.switch = "00:00:00:00:00:00:00:02" |
| 1789 |
|
uni_from_dict_mock.side_effect = [uni1, uni2, uni1, uni2] |
| 1790 |
|
mongo_controller_upsert_mock.return_value = True |
| 1791 |
|
|
| 1792 |
|
payload1 = { |
| 1793 |
|
"name": "my evc1", |
| 1794 |
|
"uni_a": { |
| 1795 |
|
"interface_id": "00:00:00:00:00:00:00:01:1", |
| 1796 |
|
"tag": {"tag_type": 'vlan', "value": 80}, |
| 1797 |
|
}, |
| 1798 |
|
"uni_z": { |
| 1799 |
|
"interface_id": "00:00:00:00:00:00:00:02:2", |
| 1800 |
|
"tag": {"tag_type": 'vlan', "value": 1}, |
| 1801 |
|
}, |
| 1802 |
|
"dynamic_backup_path": True, |
| 1803 |
|
} |
| 1804 |
|
|
| 1805 |
|
payload2 = {"dynamic_backup_path": False} |
| 1806 |
|
|
| 1807 |
|
response = await self.api_client.post( |
| 1808 |
|
f"{self.base_endpoint}/v2/evc/", |
| 1809 |
|
json=payload1, |
| 1810 |
|
) |
| 1811 |
|
assert 201 == response.status_code |
| 1812 |
|
|
| 1813 |
|
current_data = response.json() |
| 1814 |
|
circuit_id = current_data["circuit_id"] |
| 1815 |
|
response = await self.api_client.patch( |
| 1816 |
|
f"{self.base_endpoint}/v2/evc/{circuit_id}", data=payload2 |
| 1817 |
|
) |
| 1818 |
|
current_data = response.json() |
| 1819 |
|
assert 415 == response.status_code |
| 1820 |
|
assert "application/json" in current_data["description"] |
| 1821 |
|
|
| 1822 |
|
async def test_delete_no_evc(self): |
| 1823 |
|
"""Test delete when EVC does not exist.""" |