Code Duplication    Length = 25-26 lines in 2 locations

tests/unit/test_main.py 2 locations

@@ 188-213 (lines=26) @@
185
                                                  flow, 'add')
186
        mock_send_napp_event.assert_called_with(self.switch_01, flow, 'add')
187
188
    @patch('napps.kytos.flow_manager.main.Main._store_changed_flows')
189
    @patch('napps.kytos.flow_manager.main.Main._send_napp_event')
190
    @patch('napps.kytos.flow_manager.main.Main._add_flow_mod_sent')
191
    @patch('napps.kytos.flow_manager.main.Main._send_flow_mod')
192
    @patch('napps.kytos.flow_manager.main.FlowFactory.get_class')
193
    def test_install_flows_with_delete_strict(self, *args):
194
        """Test _install_flows method with strict delete command."""
195
        (mock_flow_factory, mock_send_flow_mod, mock_add_flow_mod_sent,
196
         mock_send_napp_event, _) = args
197
        serializer = MagicMock()
198
        flow = MagicMock()
199
        flow_mod = MagicMock()
200
201
        flow.as_of_strict_delete_flow_mod.return_value = flow_mod
202
        serializer.from_dict.return_value = flow
203
        mock_flow_factory.return_value = serializer
204
205
        flows_dict = {'flows': [MagicMock()]}
206
        switches = [self.switch_01]
207
        self.napp._install_flows('delete_strict', flows_dict, switches)
208
209
        mock_send_flow_mod.assert_called_with(flow.switch, flow_mod)
210
        mock_add_flow_mod_sent.assert_called_with(flow_mod.header.xid,
211
                                                  flow, 'delete_strict')
212
        mock_send_napp_event.assert_called_with(self.switch_01, flow,
213
                                                'delete_strict')
214
215
    @patch('napps.kytos.flow_manager.main.Main._install_flows')
216
    def test_event_add_flow(self, mock_install_flows):
@@ 162-186 (lines=25) @@
159
160
        self.assertEqual(switches, [self.switch_01])
161
162
    @patch('napps.kytos.flow_manager.main.Main._store_changed_flows')
163
    @patch('napps.kytos.flow_manager.main.Main._send_napp_event')
164
    @patch('napps.kytos.flow_manager.main.Main._add_flow_mod_sent')
165
    @patch('napps.kytos.flow_manager.main.Main._send_flow_mod')
166
    @patch('napps.kytos.flow_manager.main.FlowFactory.get_class')
167
    def test_install_flows(self, *args):
168
        """Test _install_flows method."""
169
        (mock_flow_factory, mock_send_flow_mod, mock_add_flow_mod_sent,
170
         mock_send_napp_event, _) = args
171
        serializer = MagicMock()
172
        flow = MagicMock()
173
        flow_mod = MagicMock()
174
175
        flow.as_of_add_flow_mod.return_value = flow_mod
176
        serializer.from_dict.return_value = flow
177
        mock_flow_factory.return_value = serializer
178
179
        flows_dict = {'flows': [MagicMock()]}
180
        switches = [self.switch_01]
181
        self.napp._install_flows('add', flows_dict, switches)
182
183
        mock_send_flow_mod.assert_called_with(flow.switch, flow_mod)
184
        mock_add_flow_mod_sent.assert_called_with(flow_mod.header.xid,
185
                                                  flow, 'add')
186
        mock_send_napp_event.assert_called_with(self.switch_01, flow, 'add')
187
188
    @patch('napps.kytos.flow_manager.main.Main._store_changed_flows')
189
    @patch('napps.kytos.flow_manager.main.Main._send_napp_event')