|
@@ 1592-1663 (lines=72) @@
|
| 1589 |
|
assert 200 == response.status_code |
| 1590 |
|
evc_deploy.assert_called_once() |
| 1591 |
|
|
| 1592 |
|
@patch("napps.kytos.mef_eline.main.Main._check_no_tag_duplication") |
| 1593 |
|
@patch("napps.kytos.mef_eline.models.evc.EVC._tag_lists_equal") |
| 1594 |
|
@patch("napps.kytos.mef_eline.main.Main._use_uni_tags") |
| 1595 |
|
@patch("napps.kytos.mef_eline.models.evc.EVC.deploy") |
| 1596 |
|
@patch("napps.kytos.mef_eline.scheduler.Scheduler.add") |
| 1597 |
|
@patch("napps.kytos.mef_eline.main.Main._uni_from_dict") |
| 1598 |
|
@patch("napps.kytos.mef_eline.controllers.ELineController.upsert_evc") |
| 1599 |
|
@patch("napps.kytos.mef_eline.models.evc.EVC._validate") |
| 1600 |
|
@patch("napps.kytos.mef_eline.main.EVC.as_dict") |
| 1601 |
|
async def test_update_circuit_invalid_json( |
| 1602 |
|
self, |
| 1603 |
|
evc_as_dict_mock, |
| 1604 |
|
validate_mock, |
| 1605 |
|
mongo_controller_upsert_mock, |
| 1606 |
|
uni_from_dict_mock, |
| 1607 |
|
sched_add_mock, |
| 1608 |
|
evc_deploy_mock, |
| 1609 |
|
mock_use_uni_tags, |
| 1610 |
|
mock_tags_equal, |
| 1611 |
|
mock_check_duplicate |
| 1612 |
|
): |
| 1613 |
|
"""Test update a circuit circuit.""" |
| 1614 |
|
self.napp.controller.loop = asyncio.get_running_loop() |
| 1615 |
|
validate_mock.return_value = True |
| 1616 |
|
mongo_controller_upsert_mock.return_value = True |
| 1617 |
|
sched_add_mock.return_value = True |
| 1618 |
|
evc_deploy_mock.return_value = True |
| 1619 |
|
mock_use_uni_tags.return_value = True |
| 1620 |
|
mock_tags_equal.return_value = True |
| 1621 |
|
mock_check_duplicate.return_value = True |
| 1622 |
|
uni1 = create_autospec(UNI) |
| 1623 |
|
uni2 = create_autospec(UNI) |
| 1624 |
|
uni1.interface = create_autospec(Interface) |
| 1625 |
|
uni2.interface = create_autospec(Interface) |
| 1626 |
|
uni1.interface.switch = "00:00:00:00:00:00:00:01" |
| 1627 |
|
uni2.interface.switch = "00:00:00:00:00:00:00:02" |
| 1628 |
|
uni_from_dict_mock.side_effect = [uni1, uni2, uni1, uni2] |
| 1629 |
|
|
| 1630 |
|
payload1 = { |
| 1631 |
|
"name": "my evc1", |
| 1632 |
|
"uni_a": { |
| 1633 |
|
"interface_id": "00:00:00:00:00:00:00:01:1", |
| 1634 |
|
"tag": {"tag_type": 'vlan', "value": 80}, |
| 1635 |
|
}, |
| 1636 |
|
"uni_z": { |
| 1637 |
|
"interface_id": "00:00:00:00:00:00:00:02:2", |
| 1638 |
|
"tag": {"tag_type": 'vlan', "value": 1}, |
| 1639 |
|
}, |
| 1640 |
|
"dynamic_backup_path": True, |
| 1641 |
|
} |
| 1642 |
|
|
| 1643 |
|
payload2 = { |
| 1644 |
|
"dynamic_backup_path": False, |
| 1645 |
|
} |
| 1646 |
|
|
| 1647 |
|
evc_as_dict_mock.return_value = payload1 |
| 1648 |
|
response = await self.api_client.post( |
| 1649 |
|
f"{self.base_endpoint}/v2/evc/", |
| 1650 |
|
json=payload1 |
| 1651 |
|
) |
| 1652 |
|
assert 201 == response.status_code |
| 1653 |
|
|
| 1654 |
|
evc_as_dict_mock.return_value = payload2 |
| 1655 |
|
current_data = response.json() |
| 1656 |
|
circuit_id = current_data["circuit_id"] |
| 1657 |
|
response = await self.api_client.patch( |
| 1658 |
|
f"{self.base_endpoint}/v2/evc/{circuit_id}", |
| 1659 |
|
json=payload2 |
| 1660 |
|
) |
| 1661 |
|
current_data = response.json() |
| 1662 |
|
assert 400 == response.status_code |
| 1663 |
|
assert "must have a primary path or" in current_data["description"] |
| 1664 |
|
|
| 1665 |
|
@patch("napps.kytos.mef_eline.main.Main._check_no_tag_duplication") |
| 1666 |
|
@patch("napps.kytos.mef_eline.models.evc.EVC._tag_lists_equal") |
|
@@ 1789-1853 (lines=65) @@
|
| 1786 |
|
with pytest.raises(ValueError): |
| 1787 |
|
self.napp._uni_from_dict(uni_dict) |
| 1788 |
|
|
| 1789 |
|
@patch("napps.kytos.mef_eline.main.Main._check_no_tag_duplication") |
| 1790 |
|
@patch("napps.kytos.mef_eline.models.evc.EVC._tag_lists_equal") |
| 1791 |
|
@patch("napps.kytos.mef_eline.main.Main._use_uni_tags") |
| 1792 |
|
@patch("napps.kytos.mef_eline.models.evc.EVC.deploy") |
| 1793 |
|
@patch("napps.kytos.mef_eline.scheduler.Scheduler.add") |
| 1794 |
|
@patch("napps.kytos.mef_eline.main.Main._uni_from_dict") |
| 1795 |
|
@patch("napps.kytos.mef_eline.models.evc.EVC._validate") |
| 1796 |
|
@patch("napps.kytos.mef_eline.controllers.ELineController.upsert_evc") |
| 1797 |
|
async def test_update_evc_no_json_mime( |
| 1798 |
|
self, |
| 1799 |
|
mongo_controller_upsert_mock, |
| 1800 |
|
validate_mock, |
| 1801 |
|
uni_from_dict_mock, |
| 1802 |
|
sched_add_mock, |
| 1803 |
|
evc_deploy_mock, |
| 1804 |
|
mock_use_uni_tags, |
| 1805 |
|
mock_tags_equal, |
| 1806 |
|
mock_check_duplicate |
| 1807 |
|
): |
| 1808 |
|
"""Test update a circuit with wrong mimetype.""" |
| 1809 |
|
self.napp.controller.loop = asyncio.get_running_loop() |
| 1810 |
|
validate_mock.return_value = True |
| 1811 |
|
sched_add_mock.return_value = True |
| 1812 |
|
evc_deploy_mock.return_value = True |
| 1813 |
|
mock_use_uni_tags.return_value = True |
| 1814 |
|
mock_tags_equal.return_value = True |
| 1815 |
|
mock_check_duplicate.return_value = True |
| 1816 |
|
uni1 = create_autospec(UNI) |
| 1817 |
|
uni2 = create_autospec(UNI) |
| 1818 |
|
uni1.interface = create_autospec(Interface) |
| 1819 |
|
uni2.interface = create_autospec(Interface) |
| 1820 |
|
uni1.interface.switch = "00:00:00:00:00:00:00:01" |
| 1821 |
|
uni2.interface.switch = "00:00:00:00:00:00:00:02" |
| 1822 |
|
uni_from_dict_mock.side_effect = [uni1, uni2, uni1, uni2] |
| 1823 |
|
mongo_controller_upsert_mock.return_value = True |
| 1824 |
|
|
| 1825 |
|
payload1 = { |
| 1826 |
|
"name": "my evc1", |
| 1827 |
|
"uni_a": { |
| 1828 |
|
"interface_id": "00:00:00:00:00:00:00:01:1", |
| 1829 |
|
"tag": {"tag_type": 'vlan', "value": 80}, |
| 1830 |
|
}, |
| 1831 |
|
"uni_z": { |
| 1832 |
|
"interface_id": "00:00:00:00:00:00:00:02:2", |
| 1833 |
|
"tag": {"tag_type": 'vlan', "value": 1}, |
| 1834 |
|
}, |
| 1835 |
|
"dynamic_backup_path": True, |
| 1836 |
|
} |
| 1837 |
|
|
| 1838 |
|
payload2 = {"dynamic_backup_path": False} |
| 1839 |
|
|
| 1840 |
|
response = await self.api_client.post( |
| 1841 |
|
f"{self.base_endpoint}/v2/evc/", |
| 1842 |
|
json=payload1, |
| 1843 |
|
) |
| 1844 |
|
assert 201 == response.status_code |
| 1845 |
|
|
| 1846 |
|
current_data = response.json() |
| 1847 |
|
circuit_id = current_data["circuit_id"] |
| 1848 |
|
response = await self.api_client.patch( |
| 1849 |
|
f"{self.base_endpoint}/v2/evc/{circuit_id}", data=payload2 |
| 1850 |
|
) |
| 1851 |
|
current_data = response.json() |
| 1852 |
|
assert 415 == response.status_code |
| 1853 |
|
assert "application/json" in current_data["description"] |
| 1854 |
|
|
| 1855 |
|
async def test_delete_no_evc(self): |
| 1856 |
|
"""Test delete when EVC does not exist.""" |