Code Duplication    Length = 39-39 lines in 2 locations

tests/unit/test_main.py 2 locations

@@ 1645-1683 (lines=39) @@
1642
        assert mock_notify_link_status_change.call_count == 2
1643
        mock_notify_topology_update.assert_called_once()
1644
1645
    @patch('napps.kytos.topology.main.Main.notify_topology_update')
1646
    @patch('napps.kytos.topology.main.Main.notify_link_status_change')
1647
    def test_interruption_end(
1648
        self,
1649
        mock_notify_link_status_change,
1650
        mock_notify_topology_update
1651
    ):
1652
        """Tests processing of received interruption end events."""
1653
        link_a = MagicMock()
1654
        link_b = MagicMock()
1655
        link_c = MagicMock()
1656
        self.napp.controller.links = {
1657
            'link_a': link_a,
1658
            'link_b': link_b,
1659
            'link_c': link_c,
1660
        }
1661
        event = KytosEvent(
1662
            "topology.interruption.start",
1663
            {
1664
                'type': 'test_interruption',
1665
                'switches': [
1666
                ],
1667
                'interfaces': [
1668
                ],
1669
                'links': [
1670
                    'link_a',
1671
                    'link_c',
1672
                ],
1673
            }
1674
        )
1675
        self.napp.handle_interruption_end(event)
1676
        mock_notify_link_status_change.assert_has_calls(
1677
            [
1678
                call(link_a, 'test_interruption'),
1679
                call(link_c, 'test_interruption'),
1680
            ]
1681
        )
1682
        assert mock_notify_link_status_change.call_count == 2
1683
        mock_notify_topology_update.assert_called_once()
1684
1685
    async def test_set_tag_range(self):
1686
        """Test set_tag_range"""
@@ 1605-1643 (lines=39) @@
1602
        self.napp.notify_switch_links_status(mock_switch, "link enabled")
1603
        assert self.napp.controller.buffers.app.put.call_count == 1
1604
1605
    @patch('napps.kytos.topology.main.Main.notify_topology_update')
1606
    @patch('napps.kytos.topology.main.Main.notify_link_status_change')
1607
    def test_interruption_start(
1608
        self,
1609
        mock_notify_link_status_change,
1610
        mock_notify_topology_update
1611
    ):
1612
        """Tests processing of received interruption start events."""
1613
        link_a = MagicMock()
1614
        link_b = MagicMock()
1615
        link_c = MagicMock()
1616
        self.napp.controller.links = {
1617
            'link_a': link_a,
1618
            'link_b': link_b,
1619
            'link_c': link_c,
1620
        }
1621
        event = KytosEvent(
1622
            "topology.interruption.start",
1623
            {
1624
                'type': 'test_interruption',
1625
                'switches': [
1626
                ],
1627
                'interfaces': [
1628
                ],
1629
                'links': [
1630
                    'link_a',
1631
                    'link_c',
1632
                ],
1633
            }
1634
        )
1635
        self.napp.handle_interruption_start(event)
1636
        mock_notify_link_status_change.assert_has_calls(
1637
            [
1638
                call(link_a, 'test_interruption'),
1639
                call(link_c, 'test_interruption'),
1640
            ]
1641
        )
1642
        assert mock_notify_link_status_change.call_count == 2
1643
        mock_notify_topology_update.assert_called_once()
1644
1645
    @patch('napps.kytos.topology.main.Main.notify_topology_update')
1646
    @patch('napps.kytos.topology.main.Main.notify_link_status_change')