@@ 1633-1671 (lines=39) @@ | ||
1630 | assert mock_notify_link_status_change.call_count == 2 |
|
1631 | mock_notify_topology_update.assert_called_once() |
|
1632 | ||
1633 | @patch('napps.kytos.topology.main.Main.notify_topology_update') |
|
1634 | @patch('napps.kytos.topology.main.Main.notify_link_status_change') |
|
1635 | def test_interruption_end( |
|
1636 | self, |
|
1637 | mock_notify_link_status_change, |
|
1638 | mock_notify_topology_update |
|
1639 | ): |
|
1640 | """Tests processing of received interruption end events.""" |
|
1641 | link_a = MagicMock() |
|
1642 | link_b = MagicMock() |
|
1643 | link_c = MagicMock() |
|
1644 | self.napp.controller.links = { |
|
1645 | 'link_a': link_a, |
|
1646 | 'link_b': link_b, |
|
1647 | 'link_c': link_c, |
|
1648 | } |
|
1649 | event = KytosEvent( |
|
1650 | "topology.interruption.start", |
|
1651 | { |
|
1652 | 'type': 'test_interruption', |
|
1653 | 'switches': [ |
|
1654 | ], |
|
1655 | 'interfaces': [ |
|
1656 | ], |
|
1657 | 'links': [ |
|
1658 | 'link_a', |
|
1659 | 'link_c', |
|
1660 | ], |
|
1661 | } |
|
1662 | ) |
|
1663 | self.napp.handle_interruption_end(event) |
|
1664 | mock_notify_link_status_change.assert_has_calls( |
|
1665 | [ |
|
1666 | call(link_a, 'test_interruption'), |
|
1667 | call(link_c, 'test_interruption'), |
|
1668 | ] |
|
1669 | ) |
|
1670 | assert mock_notify_link_status_change.call_count == 2 |
|
1671 | mock_notify_topology_update.assert_called_once() |
|
1672 | ||
1673 | async def test_set_tag_range(self): |
|
1674 | """Test set_tag_range""" |
|
@@ 1593-1631 (lines=39) @@ | ||
1590 | self.napp.notify_switch_links_status(mock_switch, "link enabled") |
|
1591 | assert self.napp.controller.buffers.app.put.call_count == 1 |
|
1592 | ||
1593 | @patch('napps.kytos.topology.main.Main.notify_topology_update') |
|
1594 | @patch('napps.kytos.topology.main.Main.notify_link_status_change') |
|
1595 | def test_interruption_start( |
|
1596 | self, |
|
1597 | mock_notify_link_status_change, |
|
1598 | mock_notify_topology_update |
|
1599 | ): |
|
1600 | """Tests processing of received interruption start events.""" |
|
1601 | link_a = MagicMock() |
|
1602 | link_b = MagicMock() |
|
1603 | link_c = MagicMock() |
|
1604 | self.napp.controller.links = { |
|
1605 | 'link_a': link_a, |
|
1606 | 'link_b': link_b, |
|
1607 | 'link_c': link_c, |
|
1608 | } |
|
1609 | event = KytosEvent( |
|
1610 | "topology.interruption.start", |
|
1611 | { |
|
1612 | 'type': 'test_interruption', |
|
1613 | 'switches': [ |
|
1614 | ], |
|
1615 | 'interfaces': [ |
|
1616 | ], |
|
1617 | 'links': [ |
|
1618 | 'link_a', |
|
1619 | 'link_c', |
|
1620 | ], |
|
1621 | } |
|
1622 | ) |
|
1623 | self.napp.handle_interruption_start(event) |
|
1624 | mock_notify_link_status_change.assert_has_calls( |
|
1625 | [ |
|
1626 | call(link_a, 'test_interruption'), |
|
1627 | call(link_c, 'test_interruption'), |
|
1628 | ] |
|
1629 | ) |
|
1630 | assert mock_notify_link_status_change.call_count == 2 |
|
1631 | mock_notify_topology_update.assert_called_once() |
|
1632 | ||
1633 | @patch('napps.kytos.topology.main.Main.notify_topology_update') |
|
1634 | @patch('napps.kytos.topology.main.Main.notify_link_status_change') |