Code Duplication    Length = 39-39 lines in 2 locations

tests/unit/test_main.py 2 locations

@@ 1650-1688 (lines=39) @@
1647
        assert mock_notify_link_status_change.call_count == 2
1648
        mock_notify_topology_update.assert_called_once()
1649
1650
    @patch('napps.kytos.topology.main.Main.notify_topology_update')
1651
    @patch('napps.kytos.topology.main.Main.notify_link_status_change')
1652
    def test_interruption_end(
1653
        self,
1654
        mock_notify_link_status_change,
1655
        mock_notify_topology_update
1656
    ):
1657
        """Tests processing of received interruption end events."""
1658
        link_a = MagicMock()
1659
        link_b = MagicMock()
1660
        link_c = MagicMock()
1661
        self.napp.controller.links = {
1662
            'link_a': link_a,
1663
            'link_b': link_b,
1664
            'link_c': link_c,
1665
        }
1666
        event = KytosEvent(
1667
            "topology.interruption.start",
1668
            {
1669
                'type': 'test_interruption',
1670
                'switches': [
1671
                ],
1672
                'interfaces': [
1673
                ],
1674
                'links': [
1675
                    'link_a',
1676
                    'link_c',
1677
                ],
1678
            }
1679
        )
1680
        self.napp.handle_interruption_end(event)
1681
        mock_notify_link_status_change.assert_has_calls(
1682
            [
1683
                call(link_a, 'test_interruption'),
1684
                call(link_c, 'test_interruption'),
1685
            ]
1686
        )
1687
        assert mock_notify_link_status_change.call_count == 2
1688
        mock_notify_topology_update.assert_called_once()
1689
1690
    async def test_set_tag_range(self):
1691
        """Test set_tag_range"""
@@ 1610-1648 (lines=39) @@
1607
        self.napp._notify_interface_link_status([mock_intf], "link enabled")
1608
        assert self.napp.controller.buffers.app.put.call_count == 1
1609
1610
    @patch('napps.kytos.topology.main.Main.notify_topology_update')
1611
    @patch('napps.kytos.topology.main.Main.notify_link_status_change')
1612
    def test_interruption_start(
1613
        self,
1614
        mock_notify_link_status_change,
1615
        mock_notify_topology_update
1616
    ):
1617
        """Tests processing of received interruption start events."""
1618
        link_a = MagicMock()
1619
        link_b = MagicMock()
1620
        link_c = MagicMock()
1621
        self.napp.controller.links = {
1622
            'link_a': link_a,
1623
            'link_b': link_b,
1624
            'link_c': link_c,
1625
        }
1626
        event = KytosEvent(
1627
            "topology.interruption.start",
1628
            {
1629
                'type': 'test_interruption',
1630
                'switches': [
1631
                ],
1632
                'interfaces': [
1633
                ],
1634
                'links': [
1635
                    'link_a',
1636
                    'link_c',
1637
                ],
1638
            }
1639
        )
1640
        self.napp.handle_interruption_start(event)
1641
        mock_notify_link_status_change.assert_has_calls(
1642
            [
1643
                call(link_a, 'test_interruption'),
1644
                call(link_c, 'test_interruption'),
1645
            ]
1646
        )
1647
        assert mock_notify_link_status_change.call_count == 2
1648
        mock_notify_topology_update.assert_called_once()
1649
1650
    @patch('napps.kytos.topology.main.Main.notify_topology_update')
1651
    @patch('napps.kytos.topology.main.Main.notify_link_status_change')