Code Duplication    Length = 25-26 lines in 2 locations

tests/unit/test_main.py 2 locations

@@ 173-198 (lines=26) @@
170
                                                  flow, 'add')
171
        mock_send_napp_event.assert_called_with(self.switch_01, flow, 'add')
172
173
    @patch('napps.kytos.flow_manager.main.Main._store_changed_flows')
174
    @patch('napps.kytos.flow_manager.main.Main._send_napp_event')
175
    @patch('napps.kytos.flow_manager.main.Main._add_flow_mod_sent')
176
    @patch('napps.kytos.flow_manager.main.Main._send_flow_mod')
177
    @patch('napps.kytos.flow_manager.main.FlowFactory.get_class')
178
    def test_install_flows_with_delete_strict(self, *args):
179
        """Test _install_flows method with strict delete command."""
180
        (mock_flow_factory, mock_send_flow_mod, mock_add_flow_mod_sent,
181
         mock_send_napp_event, _) = args
182
        serializer = MagicMock()
183
        flow = MagicMock()
184
        flow_mod = MagicMock()
185
186
        flow.as_of_strict_delete_flow_mod.return_value = flow_mod
187
        serializer.from_dict.return_value = flow
188
        mock_flow_factory.return_value = serializer
189
190
        flows_dict = {'flows': [MagicMock()]}
191
        switches = [self.switch_01]
192
        self.napp._install_flows('delete_strict', flows_dict, switches)
193
194
        mock_send_flow_mod.assert_called_with(flow.switch, flow_mod)
195
        mock_add_flow_mod_sent.assert_called_with(flow_mod.header.xid,
196
                                                  flow, 'delete_strict')
197
        mock_send_napp_event.assert_called_with(self.switch_01, flow,
198
                                                'delete_strict')
199
200
    @patch('napps.kytos.flow_manager.main.Main._install_flows')
201
    def test_event_add_flow(self, mock_install_flows):
@@ 147-171 (lines=25) @@
144
145
        self.assertEqual(switches, [self.switch_01])
146
147
    @patch('napps.kytos.flow_manager.main.Main._store_changed_flows')
148
    @patch('napps.kytos.flow_manager.main.Main._send_napp_event')
149
    @patch('napps.kytos.flow_manager.main.Main._add_flow_mod_sent')
150
    @patch('napps.kytos.flow_manager.main.Main._send_flow_mod')
151
    @patch('napps.kytos.flow_manager.main.FlowFactory.get_class')
152
    def test_install_flows(self, *args):
153
        """Test _install_flows method."""
154
        (mock_flow_factory, mock_send_flow_mod, mock_add_flow_mod_sent,
155
         mock_send_napp_event, _) = args
156
        serializer = MagicMock()
157
        flow = MagicMock()
158
        flow_mod = MagicMock()
159
160
        flow.as_of_add_flow_mod.return_value = flow_mod
161
        serializer.from_dict.return_value = flow
162
        mock_flow_factory.return_value = serializer
163
164
        flows_dict = {'flows': [MagicMock()]}
165
        switches = [self.switch_01]
166
        self.napp._install_flows('add', flows_dict, switches)
167
168
        mock_send_flow_mod.assert_called_with(flow.switch, flow_mod)
169
        mock_add_flow_mod_sent.assert_called_with(flow_mod.header.xid,
170
                                                  flow, 'add')
171
        mock_send_napp_event.assert_called_with(self.switch_01, flow, 'add')
172
173
    @patch('napps.kytos.flow_manager.main.Main._store_changed_flows')
174
    @patch('napps.kytos.flow_manager.main.Main._send_napp_event')