Code Duplication    Length = 39-39 lines in 2 locations

tests/unit/test_main.py 2 locations

@@ 1679-1717 (lines=39) @@
1676
        assert mock_notify_link_status_change.call_count == 2
1677
        mock_notify_topology_update.assert_called_once()
1678
1679
    @patch('napps.kytos.topology.main.Main.notify_topology_update')
1680
    @patch('napps.kytos.topology.main.Main.notify_link_status_change')
1681
    def test_interruption_end(
1682
        self,
1683
        mock_notify_link_status_change,
1684
        mock_notify_topology_update
1685
    ):
1686
        """Tests processing of received interruption end events."""
1687
        link_a = MagicMock()
1688
        link_b = MagicMock()
1689
        link_c = MagicMock()
1690
        self.napp.links = {
1691
            'link_a': link_a,
1692
            'link_b': link_b,
1693
            'link_c': link_c,
1694
        }
1695
        event = KytosEvent(
1696
            "topology.interruption.start",
1697
            {
1698
                'type': 'test_interruption',
1699
                'switches': [
1700
                ],
1701
                'interfaces': [
1702
                ],
1703
                'links': [
1704
                    'link_a',
1705
                    'link_c',
1706
                ],
1707
            }
1708
        )
1709
        self.napp.handle_interruption_end(event)
1710
        mock_notify_link_status_change.assert_has_calls(
1711
            [
1712
                call(link_a, 'test_interruption'),
1713
                call(link_c, 'test_interruption'),
1714
            ]
1715
        )
1716
        assert mock_notify_link_status_change.call_count == 2
1717
        mock_notify_topology_update.assert_called_once()
1718
1719
    async def test_set_tag_range(self):
1720
        """Test set_tag_range"""
@@ 1639-1677 (lines=39) @@
1636
        assert mock_get_link_from_interface.call_count == 3
1637
        assert self.napp.controller.buffers.app.put.call_count == 1
1638
1639
    @patch('napps.kytos.topology.main.Main.notify_topology_update')
1640
    @patch('napps.kytos.topology.main.Main.notify_link_status_change')
1641
    def test_interruption_start(
1642
        self,
1643
        mock_notify_link_status_change,
1644
        mock_notify_topology_update
1645
    ):
1646
        """Tests processing of received interruption start events."""
1647
        link_a = MagicMock()
1648
        link_b = MagicMock()
1649
        link_c = MagicMock()
1650
        self.napp.links = {
1651
            'link_a': link_a,
1652
            'link_b': link_b,
1653
            'link_c': link_c,
1654
        }
1655
        event = KytosEvent(
1656
            "topology.interruption.start",
1657
            {
1658
                'type': 'test_interruption',
1659
                'switches': [
1660
                ],
1661
                'interfaces': [
1662
                ],
1663
                'links': [
1664
                    'link_a',
1665
                    'link_c',
1666
                ],
1667
            }
1668
        )
1669
        self.napp.handle_interruption_start(event)
1670
        mock_notify_link_status_change.assert_has_calls(
1671
            [
1672
                call(link_a, 'test_interruption'),
1673
                call(link_c, 'test_interruption'),
1674
            ]
1675
        )
1676
        assert mock_notify_link_status_change.call_count == 2
1677
        mock_notify_topology_update.assert_called_once()
1678
1679
    @patch('napps.kytos.topology.main.Main.notify_topology_update')
1680
    @patch('napps.kytos.topology.main.Main.notify_link_status_change')