Code Duplication    Length = 39-39 lines in 2 locations

tests/unit/test_main.py 2 locations

@@ 1678-1716 (lines=39) @@
1675
        assert mock_notify_link_status_change.call_count == 2
1676
        mock_notify_topology_update.assert_called_once()
1677
1678
    @patch('napps.kytos.topology.main.Main.notify_topology_update')
1679
    @patch('napps.kytos.topology.main.Main.notify_link_status_change')
1680
    def test_interruption_end(
1681
        self,
1682
        mock_notify_link_status_change,
1683
        mock_notify_topology_update
1684
    ):
1685
        """Tests processing of received interruption end events."""
1686
        link_a = MagicMock()
1687
        link_b = MagicMock()
1688
        link_c = MagicMock()
1689
        self.napp.links = {
1690
            'link_a': link_a,
1691
            'link_b': link_b,
1692
            'link_c': link_c,
1693
        }
1694
        event = KytosEvent(
1695
            "topology.interruption.start",
1696
            {
1697
                'type': 'test_interruption',
1698
                'switches': [
1699
                ],
1700
                'interfaces': [
1701
                ],
1702
                'links': [
1703
                    'link_a',
1704
                    'link_c',
1705
                ],
1706
            }
1707
        )
1708
        self.napp.handle_interruption_end(event)
1709
        mock_notify_link_status_change.assert_has_calls(
1710
            [
1711
                call(link_a, 'test_interruption'),
1712
                call(link_c, 'test_interruption'),
1713
            ]
1714
        )
1715
        assert mock_notify_link_status_change.call_count == 2
1716
        mock_notify_topology_update.assert_called_once()
1717
1718
    async def test_set_tag_range(self, event_loop):
1719
        """Test set_tag_range"""
@@ 1638-1676 (lines=39) @@
1635
        assert mock_get_link_from_interface.call_count == 3
1636
        assert self.napp.controller.buffers.app.put.call_count == 1
1637
1638
    @patch('napps.kytos.topology.main.Main.notify_topology_update')
1639
    @patch('napps.kytos.topology.main.Main.notify_link_status_change')
1640
    def test_interruption_start(
1641
        self,
1642
        mock_notify_link_status_change,
1643
        mock_notify_topology_update
1644
    ):
1645
        """Tests processing of received interruption start events."""
1646
        link_a = MagicMock()
1647
        link_b = MagicMock()
1648
        link_c = MagicMock()
1649
        self.napp.links = {
1650
            'link_a': link_a,
1651
            'link_b': link_b,
1652
            'link_c': link_c,
1653
        }
1654
        event = KytosEvent(
1655
            "topology.interruption.start",
1656
            {
1657
                'type': 'test_interruption',
1658
                'switches': [
1659
                ],
1660
                'interfaces': [
1661
                ],
1662
                'links': [
1663
                    'link_a',
1664
                    'link_c',
1665
                ],
1666
            }
1667
        )
1668
        self.napp.handle_interruption_start(event)
1669
        mock_notify_link_status_change.assert_has_calls(
1670
            [
1671
                call(link_a, 'test_interruption'),
1672
                call(link_c, 'test_interruption'),
1673
            ]
1674
        )
1675
        assert mock_notify_link_status_change.call_count == 2
1676
        mock_notify_topology_update.assert_called_once()
1677
1678
    @patch('napps.kytos.topology.main.Main.notify_topology_update')
1679
    @patch('napps.kytos.topology.main.Main.notify_link_status_change')