Code Duplication    Length = 25-26 lines in 2 locations

tests/unit/test_main.py 2 locations

@@ 163-188 (lines=26) @@
160
                                                  flow, 'add')
161
        mock_send_napp_event.assert_called_with(self.switch_01, flow, 'add')
162
163
    @patch('napps.kytos.flow_manager.main.Main._store_changed_flows')
164
    @patch('napps.kytos.flow_manager.main.Main._send_napp_event')
165
    @patch('napps.kytos.flow_manager.main.Main._add_flow_mod_sent')
166
    @patch('napps.kytos.flow_manager.main.Main._send_flow_mod')
167
    @patch('napps.kytos.flow_manager.main.FlowFactory.get_class')
168
    def test_install_flows_with_delete_strict(self, *args):
169
        """Test _install_flows method with strict delete command."""
170
        (mock_flow_factory, mock_send_flow_mod, mock_add_flow_mod_sent,
171
         mock_send_napp_event, _) = args
172
        serializer = MagicMock()
173
        flow = MagicMock()
174
        flow_mod = MagicMock()
175
176
        flow.as_of_strict_delete_flow_mod.return_value = flow_mod
177
        serializer.from_dict.return_value = flow
178
        mock_flow_factory.return_value = serializer
179
180
        flows_dict = {'flows': [MagicMock()]}
181
        switches = [self.switch_01]
182
        self.napp._install_flows('delete_strict', flows_dict, switches)
183
184
        mock_send_flow_mod.assert_called_with(flow.switch, flow_mod)
185
        mock_add_flow_mod_sent.assert_called_with(flow_mod.header.xid,
186
                                                  flow, 'delete_strict')
187
        mock_send_napp_event.assert_called_with(self.switch_01, flow,
188
                                                'delete_strict')
189
190
    @patch('napps.kytos.flow_manager.main.Main._install_flows')
191
    def test_event_add_flow(self, mock_install_flows):
@@ 137-161 (lines=25) @@
134
135
        self.assertEqual(switches, [self.switch_01])
136
137
    @patch('napps.kytos.flow_manager.main.Main._store_changed_flows')
138
    @patch('napps.kytos.flow_manager.main.Main._send_napp_event')
139
    @patch('napps.kytos.flow_manager.main.Main._add_flow_mod_sent')
140
    @patch('napps.kytos.flow_manager.main.Main._send_flow_mod')
141
    @patch('napps.kytos.flow_manager.main.FlowFactory.get_class')
142
    def test_install_flows(self, *args):
143
        """Test _install_flows method."""
144
        (mock_flow_factory, mock_send_flow_mod, mock_add_flow_mod_sent,
145
         mock_send_napp_event, _) = args
146
        serializer = MagicMock()
147
        flow = MagicMock()
148
        flow_mod = MagicMock()
149
150
        flow.as_of_add_flow_mod.return_value = flow_mod
151
        serializer.from_dict.return_value = flow
152
        mock_flow_factory.return_value = serializer
153
154
        flows_dict = {'flows': [MagicMock()]}
155
        switches = [self.switch_01]
156
        self.napp._install_flows('add', flows_dict, switches)
157
158
        mock_send_flow_mod.assert_called_with(flow.switch, flow_mod)
159
        mock_add_flow_mod_sent.assert_called_with(flow_mod.header.xid,
160
                                                  flow, 'add')
161
        mock_send_napp_event.assert_called_with(self.switch_01, flow, 'add')
162
163
    @patch('napps.kytos.flow_manager.main.Main._store_changed_flows')
164
    @patch('napps.kytos.flow_manager.main.Main._send_napp_event')