Code Duplication    Length = 58-64 lines in 2 locations

tests/unit/models/test_link_protection.py 2 locations

@@ 278-341 (lines=64) @@
275
        msg = f"Failed to re-deploy {evc} after link down."
276
        log_mocked.debug.assert_called_once_with(msg)
277
278
    @patch("napps.kytos.mef_eline.models.evc.log")
279
    @patch("napps.kytos.mef_eline.models.evc.EVCDeploy.deploy_to_path")
280
    @patch("napps.kytos.mef_eline.models.evc.EVCDeploy._send_flow_mods")
281
    @patch(DEPLOY_TO_PRIMARY_PATH)
282
    @patch("napps.kytos.mef_eline.models.path.Path.status", EntityStatus.DOWN)
283
    async def test_handle_link_down_case_4(
284
        self,
285
        deploy_to_mocked,
286
        _send_flow_mods_mocked,
287
        deploy_mocked,
288
        log_mocked,
289
    ):
290
        """Test if circuit with dynamic path is return success."""
291
        deploy_mocked.return_value = True
292
        deploy_to_mocked.return_value = False
293
        primary_path = [
294
            get_link_mocked(
295
                endpoint_a_port=9,
296
                endpoint_b_port=10,
297
                metadata={"s_vlan": 5},
298
                status=EntityStatus.DOWN,
299
            ),
300
            get_link_mocked(
301
                endpoint_a_port=11,
302
                endpoint_b_port=12,
303
                metadata={"s_vlan": 6},
304
                status=EntityStatus.UP,
305
            ),
306
        ]
307
        backup_path = [
308
            get_link_mocked(
309
                endpoint_a_port=9,
310
                endpoint_b_port=10,
311
                metadata={"s_vlan": 5},
312
                status=EntityStatus.DOWN,
313
            ),
314
            get_link_mocked(
315
                endpoint_a_port=13,
316
                endpoint_b_port=14,
317
                metadata={"s_vlan": 6},
318
                status=EntityStatus.UP,
319
            ),
320
        ]
321
        attributes = {
322
            "controller": get_controller_mock(),
323
            "name": "circuit_8",
324
            "uni_a": get_uni_mocked(is_valid=True),
325
            "uni_z": get_uni_mocked(is_valid=True),
326
            "primary_path": primary_path,
327
            "backup_path": backup_path,
328
            "enabled": True,
329
            "dynamic_backup_path": True,
330
        }
331
332
        evc = EVC(**attributes)
333
        evc.current_path = evc.backup_path
334
335
        deploy_to_mocked.reset_mock()
336
        current_handle_link_down = evc.handle_link_down()
337
        assert deploy_to_mocked.call_count == 1
338
339
        assert current_handle_link_down
340
        msg = f"{evc} deployed after link down."
341
        log_mocked.debug.assert_called_with(msg)
342
343
    @patch("napps.kytos.mef_eline.models.evc.EVCDeploy.deploy")
344
    async def test_handle_link_up_case_1(
@@ 152-209 (lines=58) @@
149
        msg = f"{self.evc} deployed after link down."
150
        log_mocked.debug.assert_called_once_with(msg)
151
152
    @patch("napps.kytos.mef_eline.models.evc.log")
153
    @patch("napps.kytos.mef_eline.models.evc.EVCDeploy.deploy")
154
    @patch(DEPLOY_TO_PRIMARY_PATH)
155
    @patch("napps.kytos.mef_eline.models.path.Path.status")
156
    async def test_handle_link_down_case_2(
157
        self, path_status_mocked, deploy_to_mocked, deploy_mocked, log_mocked
158
    ):
159
        """Test if deploy_to backup path is called."""
160
        deploy_mocked.return_value = True
161
        deploy_to_mocked.return_value = True
162
        path_status_mocked.side_effect = [EntityStatus.UP, EntityStatus.DOWN]
163
        primary_path = [
164
            get_link_mocked(
165
                endpoint_a_port=7,
166
                endpoint_b_port=8,
167
                metadata={"s_vlan": 5},
168
                status=EntityStatus.UP,
169
            ),
170
            get_link_mocked(
171
                endpoint_a_port=11,
172
                endpoint_b_port=12,
173
                metadata={"s_vlan": 6},
174
                status=EntityStatus.UP,
175
            ),
176
        ]
177
        backup_path = [
178
            get_link_mocked(
179
                endpoint_a_port=7,
180
                endpoint_b_port=10,
181
                metadata={"s_vlan": 5},
182
                status=EntityStatus.DOWN,
183
            ),
184
            get_link_mocked(
185
                endpoint_a_port=15,
186
                endpoint_b_port=12,
187
                metadata={"s_vlan": 6},
188
                status=EntityStatus.UP,
189
            ),
190
        ]
191
        attributes = {
192
            "controller": get_controller_mock(),
193
            "name": "circuit_13",
194
            "uni_a": get_uni_mocked(is_valid=True),
195
            "uni_z": get_uni_mocked(is_valid=True),
196
            "primary_path": primary_path,
197
            "backup_path": backup_path,
198
            "enabled": True,
199
        }
200
201
        evc = EVC(**attributes)
202
        evc.current_path = evc.backup_path
203
        deploy_to_mocked.reset_mock()
204
        current_handle_link_down = evc.handle_link_down()
205
        assert deploy_mocked.call_count == 0
206
        deploy_to_mocked.assert_called_once()
207
        assert current_handle_link_down
208
        msg = f"{evc} deployed after link down."
209
        log_mocked.debug.assert_called_once_with(msg)
210
211
    # pylint: disable=too-many-arguments
212
    @patch("napps.kytos.mef_eline.models.evc.EVCDeploy.remove_current_flows")