Code Duplication    Length = 58-64 lines in 2 locations

tests/unit/models/test_link_protection.py 2 locations

@@ 366-429 (lines=64) @@
363
        msg = f"Failed to re-deploy {evc} after link down."
364
        log_mocked.debug.assert_called_once_with(msg)
365
366
    @patch("napps.kytos.mef_eline.models.evc.log")
367
    @patch("napps.kytos.mef_eline.models.evc.EVCDeploy.deploy_to_path")
368
    @patch("napps.kytos.mef_eline.models.evc.EVCDeploy._send_flow_mods")
369
    @patch(DEPLOY_TO_PRIMARY_PATH)
370
    @patch("napps.kytos.mef_eline.models.path.Path.status", EntityStatus.DOWN)
371
    async def test_handle_link_down_case_4(
372
        self,
373
        deploy_to_mocked,
374
        _send_flow_mods_mocked,
375
        deploy_mocked,
376
        log_mocked,
377
    ):
378
        """Test if circuit with dynamic path is return success."""
379
        deploy_mocked.return_value = True
380
        deploy_to_mocked.return_value = False
381
        primary_path = [
382
            get_link_mocked(
383
                endpoint_a_port=9,
384
                endpoint_b_port=10,
385
                metadata={"s_vlan": 5},
386
                status=EntityStatus.DOWN,
387
            ),
388
            get_link_mocked(
389
                endpoint_a_port=11,
390
                endpoint_b_port=12,
391
                metadata={"s_vlan": 6},
392
                status=EntityStatus.UP,
393
            ),
394
        ]
395
        backup_path = [
396
            get_link_mocked(
397
                endpoint_a_port=9,
398
                endpoint_b_port=10,
399
                metadata={"s_vlan": 5},
400
                status=EntityStatus.DOWN,
401
            ),
402
            get_link_mocked(
403
                endpoint_a_port=13,
404
                endpoint_b_port=14,
405
                metadata={"s_vlan": 6},
406
                status=EntityStatus.UP,
407
            ),
408
        ]
409
        attributes = {
410
            "controller": get_controller_mock(),
411
            "name": "circuit_8",
412
            "uni_a": get_uni_mocked(is_valid=True),
413
            "uni_z": get_uni_mocked(is_valid=True),
414
            "primary_path": primary_path,
415
            "backup_path": backup_path,
416
            "enabled": True,
417
            "dynamic_backup_path": True,
418
        }
419
420
        evc = EVC(**attributes)
421
        evc.current_path = evc.backup_path
422
423
        deploy_to_mocked.reset_mock()
424
        current_handle_link_down = evc.handle_link_down()
425
        assert deploy_to_mocked.call_count == 1
426
427
        assert current_handle_link_down
428
        msg = f"{evc} deployed after link down."
429
        log_mocked.debug.assert_called_with(msg)
430
431
    @patch("napps.kytos.mef_eline.models.evc.EVCDeploy.deploy")
432
    @patch("napps.kytos.mef_eline.models.evc.LinkProtection.deploy_to")
@@ 244-301 (lines=58) @@
241
        msg = f"{self.evc} deployed after link down."
242
        log_mocked.debug.assert_called_once_with(msg)
243
244
    @patch("napps.kytos.mef_eline.models.evc.log")
245
    @patch("napps.kytos.mef_eline.models.evc.EVCDeploy.deploy")
246
    @patch(DEPLOY_TO_PRIMARY_PATH)
247
    @patch("napps.kytos.mef_eline.models.path.Path.status")
248
    async def test_handle_link_down_case_2(
249
        self, path_status_mocked, deploy_to_mocked, deploy_mocked, log_mocked
250
    ):
251
        """Test if deploy_to backup path is called."""
252
        deploy_mocked.return_value = True
253
        deploy_to_mocked.return_value = True
254
        path_status_mocked.side_effect = [EntityStatus.UP, EntityStatus.DOWN]
255
        primary_path = [
256
            get_link_mocked(
257
                endpoint_a_port=7,
258
                endpoint_b_port=8,
259
                metadata={"s_vlan": 5},
260
                status=EntityStatus.UP,
261
            ),
262
            get_link_mocked(
263
                endpoint_a_port=11,
264
                endpoint_b_port=12,
265
                metadata={"s_vlan": 6},
266
                status=EntityStatus.UP,
267
            ),
268
        ]
269
        backup_path = [
270
            get_link_mocked(
271
                endpoint_a_port=7,
272
                endpoint_b_port=10,
273
                metadata={"s_vlan": 5},
274
                status=EntityStatus.DOWN,
275
            ),
276
            get_link_mocked(
277
                endpoint_a_port=15,
278
                endpoint_b_port=12,
279
                metadata={"s_vlan": 6},
280
                status=EntityStatus.UP,
281
            ),
282
        ]
283
        attributes = {
284
            "controller": get_controller_mock(),
285
            "name": "circuit_13",
286
            "uni_a": get_uni_mocked(is_valid=True),
287
            "uni_z": get_uni_mocked(is_valid=True),
288
            "primary_path": primary_path,
289
            "backup_path": backup_path,
290
            "enabled": True,
291
        }
292
293
        evc = EVC(**attributes)
294
        evc.current_path = evc.backup_path
295
        deploy_to_mocked.reset_mock()
296
        current_handle_link_down = evc.handle_link_down()
297
        assert deploy_mocked.call_count == 0
298
        deploy_to_mocked.assert_called_once()
299
        assert current_handle_link_down
300
        msg = f"{evc} deployed after link down."
301
        log_mocked.debug.assert_called_once_with(msg)
302
303
    @patch("napps.kytos.mef_eline.controllers.ELineController.upsert_evc")
304
    @patch("napps.kytos.mef_eline.models.evc.log")