Code Duplication    Length = 58-64 lines in 2 locations

tests/unit/models/test_link_protection.py 2 locations

@@ 365-428 (lines=64) @@
362
        msg = f"Failed to re-deploy {evc} after link down."
363
        log_mocked.debug.assert_called_once_with(msg)
364
365
    @patch("napps.kytos.mef_eline.models.evc.log")
366
    @patch("napps.kytos.mef_eline.models.evc.EVCDeploy.deploy_to_path")
367
    @patch("napps.kytos.mef_eline.models.evc.EVCDeploy._send_flow_mods")
368
    @patch(DEPLOY_TO_PRIMARY_PATH)
369
    @patch("napps.kytos.mef_eline.models.path.Path.status", EntityStatus.DOWN)
370
    async def test_handle_link_down_case_4(
371
        self,
372
        deploy_to_mocked,
373
        _send_flow_mods_mocked,
374
        deploy_mocked,
375
        log_mocked,
376
    ):
377
        """Test if circuit with dynamic path is return success."""
378
        deploy_mocked.return_value = True
379
        deploy_to_mocked.return_value = False
380
        primary_path = [
381
            get_link_mocked(
382
                endpoint_a_port=9,
383
                endpoint_b_port=10,
384
                metadata={"s_vlan": 5},
385
                status=EntityStatus.DOWN,
386
            ),
387
            get_link_mocked(
388
                endpoint_a_port=11,
389
                endpoint_b_port=12,
390
                metadata={"s_vlan": 6},
391
                status=EntityStatus.UP,
392
            ),
393
        ]
394
        backup_path = [
395
            get_link_mocked(
396
                endpoint_a_port=9,
397
                endpoint_b_port=10,
398
                metadata={"s_vlan": 5},
399
                status=EntityStatus.DOWN,
400
            ),
401
            get_link_mocked(
402
                endpoint_a_port=13,
403
                endpoint_b_port=14,
404
                metadata={"s_vlan": 6},
405
                status=EntityStatus.UP,
406
            ),
407
        ]
408
        attributes = {
409
            "controller": get_controller_mock(),
410
            "name": "circuit_8",
411
            "uni_a": get_uni_mocked(is_valid=True),
412
            "uni_z": get_uni_mocked(is_valid=True),
413
            "primary_path": primary_path,
414
            "backup_path": backup_path,
415
            "enabled": True,
416
            "dynamic_backup_path": True,
417
        }
418
419
        evc = EVC(**attributes)
420
        evc.current_path = evc.backup_path
421
422
        deploy_to_mocked.reset_mock()
423
        current_handle_link_down = evc.handle_link_down()
424
        assert deploy_to_mocked.call_count == 1
425
426
        assert current_handle_link_down
427
        msg = f"{evc} deployed after link down."
428
        log_mocked.debug.assert_called_with(msg)
429
430
    @patch("napps.kytos.mef_eline.models.evc.EVCDeploy.deploy")
431
    @patch("napps.kytos.mef_eline.models.evc.LinkProtection.deploy_to")
@@ 243-300 (lines=58) @@
240
        msg = f"{self.evc} deployed after link down."
241
        log_mocked.debug.assert_called_once_with(msg)
242
243
    @patch("napps.kytos.mef_eline.models.evc.log")
244
    @patch("napps.kytos.mef_eline.models.evc.EVCDeploy.deploy")
245
    @patch(DEPLOY_TO_PRIMARY_PATH)
246
    @patch("napps.kytos.mef_eline.models.path.Path.status")
247
    async def test_handle_link_down_case_2(
248
        self, path_status_mocked, deploy_to_mocked, deploy_mocked, log_mocked
249
    ):
250
        """Test if deploy_to backup path is called."""
251
        deploy_mocked.return_value = True
252
        deploy_to_mocked.return_value = True
253
        path_status_mocked.side_effect = [EntityStatus.UP, EntityStatus.DOWN]
254
        primary_path = [
255
            get_link_mocked(
256
                endpoint_a_port=7,
257
                endpoint_b_port=8,
258
                metadata={"s_vlan": 5},
259
                status=EntityStatus.UP,
260
            ),
261
            get_link_mocked(
262
                endpoint_a_port=11,
263
                endpoint_b_port=12,
264
                metadata={"s_vlan": 6},
265
                status=EntityStatus.UP,
266
            ),
267
        ]
268
        backup_path = [
269
            get_link_mocked(
270
                endpoint_a_port=7,
271
                endpoint_b_port=10,
272
                metadata={"s_vlan": 5},
273
                status=EntityStatus.DOWN,
274
            ),
275
            get_link_mocked(
276
                endpoint_a_port=15,
277
                endpoint_b_port=12,
278
                metadata={"s_vlan": 6},
279
                status=EntityStatus.UP,
280
            ),
281
        ]
282
        attributes = {
283
            "controller": get_controller_mock(),
284
            "name": "circuit_13",
285
            "uni_a": get_uni_mocked(is_valid=True),
286
            "uni_z": get_uni_mocked(is_valid=True),
287
            "primary_path": primary_path,
288
            "backup_path": backup_path,
289
            "enabled": True,
290
        }
291
292
        evc = EVC(**attributes)
293
        evc.current_path = evc.backup_path
294
        deploy_to_mocked.reset_mock()
295
        current_handle_link_down = evc.handle_link_down()
296
        assert deploy_mocked.call_count == 0
297
        deploy_to_mocked.assert_called_once()
298
        assert current_handle_link_down
299
        msg = f"{evc} deployed after link down."
300
        log_mocked.debug.assert_called_once_with(msg)
301
302
    @patch("napps.kytos.mef_eline.controllers.ELineController.upsert_evc")
303
    @patch("napps.kytos.mef_eline.models.evc.log")