Code Duplication    Length = 25-26 lines in 2 locations

tests/unit/test_main.py 2 locations

@@ 144-169 (lines=26) @@
141
                                                  flow, 'add')
142
        mock_send_napp_event.assert_called_with(self.switch_01, flow, 'add')
143
144
    @patch('napps.kytos.flow_manager.main.Main._store_changed_flows')
145
    @patch('napps.kytos.flow_manager.main.Main._send_napp_event')
146
    @patch('napps.kytos.flow_manager.main.Main._add_flow_mod_sent')
147
    @patch('napps.kytos.flow_manager.main.Main._send_flow_mod')
148
    @patch('napps.kytos.flow_manager.main.FlowFactory.get_class')
149
    def test_install_flows_with_delete_strict(self, *args):
150
        """Test _install_flows method with strict delete command."""
151
        (mock_flow_factory, mock_send_flow_mod, mock_add_flow_mod_sent,
152
         mock_send_napp_event, _) = args
153
        serializer = MagicMock()
154
        flow = MagicMock()
155
        flow_mod = MagicMock()
156
157
        flow.as_of_strict_delete_flow_mod.return_value = flow_mod
158
        serializer.from_dict.return_value = flow
159
        mock_flow_factory.return_value = serializer
160
161
        flows_dict = {'flows': [MagicMock()]}
162
        switches = [self.switch_01]
163
        self.napp._install_flows('delete_strict', flows_dict, switches)
164
165
        mock_send_flow_mod.assert_called_with(flow.switch, flow_mod)
166
        mock_add_flow_mod_sent.assert_called_with(flow_mod.header.xid,
167
                                                  flow, 'delete_strict')
168
        mock_send_napp_event.assert_called_with(self.switch_01, flow,
169
                                                'delete_strict')
170
171
    def test_add_flow_mod_sent(self):
172
        """Test _add_flow_mod_sent method."""
@@ 118-142 (lines=25) @@
115
116
        self.assertEqual(switches, [self.switch_01])
117
118
    @patch('napps.kytos.flow_manager.main.Main._store_changed_flows')
119
    @patch('napps.kytos.flow_manager.main.Main._send_napp_event')
120
    @patch('napps.kytos.flow_manager.main.Main._add_flow_mod_sent')
121
    @patch('napps.kytos.flow_manager.main.Main._send_flow_mod')
122
    @patch('napps.kytos.flow_manager.main.FlowFactory.get_class')
123
    def test_install_flows(self, *args):
124
        """Test _install_flows method."""
125
        (mock_flow_factory, mock_send_flow_mod, mock_add_flow_mod_sent,
126
         mock_send_napp_event, _) = args
127
        serializer = MagicMock()
128
        flow = MagicMock()
129
        flow_mod = MagicMock()
130
131
        flow.as_of_add_flow_mod.return_value = flow_mod
132
        serializer.from_dict.return_value = flow
133
        mock_flow_factory.return_value = serializer
134
135
        flows_dict = {'flows': [MagicMock()]}
136
        switches = [self.switch_01]
137
        self.napp._install_flows('add', flows_dict, switches)
138
139
        mock_send_flow_mod.assert_called_with(flow.switch, flow_mod)
140
        mock_add_flow_mod_sent.assert_called_with(flow_mod.header.xid,
141
                                                  flow, 'add')
142
        mock_send_napp_event.assert_called_with(self.switch_01, flow, 'add')
143
144
    @patch('napps.kytos.flow_manager.main.Main._store_changed_flows')
145
    @patch('napps.kytos.flow_manager.main.Main._send_napp_event')