Code Duplication    Length = 39-39 lines in 2 locations

tests/unit/test_main.py 2 locations

@@ 1633-1671 (lines=39) @@
1630
        assert mock_notify_link_status_change.call_count == 2
1631
        mock_notify_topology_update.assert_called_once()
1632
1633
    @patch('napps.kytos.topology.main.Main.notify_topology_update')
1634
    @patch('napps.kytos.topology.main.Main.notify_link_status_change')
1635
    def test_interruption_end(
1636
        self,
1637
        mock_notify_link_status_change,
1638
        mock_notify_topology_update
1639
    ):
1640
        """Tests processing of received interruption end events."""
1641
        link_a = MagicMock()
1642
        link_b = MagicMock()
1643
        link_c = MagicMock()
1644
        self.napp.controller.links = {
1645
            'link_a': link_a,
1646
            'link_b': link_b,
1647
            'link_c': link_c,
1648
        }
1649
        event = KytosEvent(
1650
            "topology.interruption.start",
1651
            {
1652
                'type': 'test_interruption',
1653
                'switches': [
1654
                ],
1655
                'interfaces': [
1656
                ],
1657
                'links': [
1658
                    'link_a',
1659
                    'link_c',
1660
                ],
1661
            }
1662
        )
1663
        self.napp.handle_interruption_end(event)
1664
        mock_notify_link_status_change.assert_has_calls(
1665
            [
1666
                call(link_a, 'test_interruption'),
1667
                call(link_c, 'test_interruption'),
1668
            ]
1669
        )
1670
        assert mock_notify_link_status_change.call_count == 2
1671
        mock_notify_topology_update.assert_called_once()
1672
1673
    async def test_set_tag_range(self):
1674
        """Test set_tag_range"""
@@ 1593-1631 (lines=39) @@
1590
        self.napp.notify_switch_links_status(mock_switch, "link enabled")
1591
        assert self.napp.controller.buffers.app.put.call_count == 1
1592
1593
    @patch('napps.kytos.topology.main.Main.notify_topology_update')
1594
    @patch('napps.kytos.topology.main.Main.notify_link_status_change')
1595
    def test_interruption_start(
1596
        self,
1597
        mock_notify_link_status_change,
1598
        mock_notify_topology_update
1599
    ):
1600
        """Tests processing of received interruption start events."""
1601
        link_a = MagicMock()
1602
        link_b = MagicMock()
1603
        link_c = MagicMock()
1604
        self.napp.controller.links = {
1605
            'link_a': link_a,
1606
            'link_b': link_b,
1607
            'link_c': link_c,
1608
        }
1609
        event = KytosEvent(
1610
            "topology.interruption.start",
1611
            {
1612
                'type': 'test_interruption',
1613
                'switches': [
1614
                ],
1615
                'interfaces': [
1616
                ],
1617
                'links': [
1618
                    'link_a',
1619
                    'link_c',
1620
                ],
1621
            }
1622
        )
1623
        self.napp.handle_interruption_start(event)
1624
        mock_notify_link_status_change.assert_has_calls(
1625
            [
1626
                call(link_a, 'test_interruption'),
1627
                call(link_c, 'test_interruption'),
1628
            ]
1629
        )
1630
        assert mock_notify_link_status_change.call_count == 2
1631
        mock_notify_topology_update.assert_called_once()
1632
1633
    @patch('napps.kytos.topology.main.Main.notify_topology_update')
1634
    @patch('napps.kytos.topology.main.Main.notify_link_status_change')