Code Duplication    Length = 58-64 lines in 2 locations

tests/unit/models/test_link_protection.py 2 locations

@@ 363-426 (lines=64) @@
360
        msg = f"Failed to re-deploy {evc} after link down."
361
        log_mocked.debug.assert_called_once_with(msg)
362
363
    @patch("napps.kytos.mef_eline.models.evc.log")
364
    @patch("napps.kytos.mef_eline.models.evc.EVCDeploy.deploy_to_path")
365
    @patch("napps.kytos.mef_eline.models.evc.EVCDeploy._send_flow_mods")
366
    @patch(DEPLOY_TO_PRIMARY_PATH)
367
    @patch("napps.kytos.mef_eline.models.path.Path.status", EntityStatus.DOWN)
368
    async def test_handle_link_down_case_4(
369
        self,
370
        deploy_to_mocked,
371
        _send_flow_mods_mocked,
372
        deploy_mocked,
373
        log_mocked,
374
    ):
375
        """Test if circuit with dynamic path is return success."""
376
        deploy_mocked.return_value = True
377
        deploy_to_mocked.return_value = False
378
        primary_path = [
379
            get_link_mocked(
380
                endpoint_a_port=9,
381
                endpoint_b_port=10,
382
                metadata={"s_vlan": 5},
383
                status=EntityStatus.DOWN,
384
            ),
385
            get_link_mocked(
386
                endpoint_a_port=11,
387
                endpoint_b_port=12,
388
                metadata={"s_vlan": 6},
389
                status=EntityStatus.UP,
390
            ),
391
        ]
392
        backup_path = [
393
            get_link_mocked(
394
                endpoint_a_port=9,
395
                endpoint_b_port=10,
396
                metadata={"s_vlan": 5},
397
                status=EntityStatus.DOWN,
398
            ),
399
            get_link_mocked(
400
                endpoint_a_port=13,
401
                endpoint_b_port=14,
402
                metadata={"s_vlan": 6},
403
                status=EntityStatus.UP,
404
            ),
405
        ]
406
        attributes = {
407
            "controller": get_controller_mock(),
408
            "name": "circuit_8",
409
            "uni_a": get_uni_mocked(is_valid=True),
410
            "uni_z": get_uni_mocked(is_valid=True),
411
            "primary_path": primary_path,
412
            "backup_path": backup_path,
413
            "enabled": True,
414
            "dynamic_backup_path": True,
415
        }
416
417
        evc = EVC(**attributes)
418
        evc.current_path = evc.backup_path
419
420
        deploy_to_mocked.reset_mock()
421
        current_handle_link_down = evc.handle_link_down()
422
        assert deploy_to_mocked.call_count == 1
423
424
        assert current_handle_link_down
425
        msg = f"{evc} deployed after link down."
426
        log_mocked.debug.assert_called_with(msg)
427
428
    @patch("napps.kytos.mef_eline.models.evc.EVCDeploy.deploy")
429
    @patch("napps.kytos.mef_eline.models.evc.LinkProtection.deploy_to")
@@ 241-298 (lines=58) @@
238
        msg = f"{self.evc} deployed after link down."
239
        log_mocked.debug.assert_called_once_with(msg)
240
241
    @patch("napps.kytos.mef_eline.models.evc.log")
242
    @patch("napps.kytos.mef_eline.models.evc.EVCDeploy.deploy")
243
    @patch(DEPLOY_TO_PRIMARY_PATH)
244
    @patch("napps.kytos.mef_eline.models.path.Path.status")
245
    async def test_handle_link_down_case_2(
246
        self, path_status_mocked, deploy_to_mocked, deploy_mocked, log_mocked
247
    ):
248
        """Test if deploy_to backup path is called."""
249
        deploy_mocked.return_value = True
250
        deploy_to_mocked.return_value = True
251
        path_status_mocked.side_effect = [EntityStatus.UP, EntityStatus.DOWN]
252
        primary_path = [
253
            get_link_mocked(
254
                endpoint_a_port=7,
255
                endpoint_b_port=8,
256
                metadata={"s_vlan": 5},
257
                status=EntityStatus.UP,
258
            ),
259
            get_link_mocked(
260
                endpoint_a_port=11,
261
                endpoint_b_port=12,
262
                metadata={"s_vlan": 6},
263
                status=EntityStatus.UP,
264
            ),
265
        ]
266
        backup_path = [
267
            get_link_mocked(
268
                endpoint_a_port=7,
269
                endpoint_b_port=10,
270
                metadata={"s_vlan": 5},
271
                status=EntityStatus.DOWN,
272
            ),
273
            get_link_mocked(
274
                endpoint_a_port=15,
275
                endpoint_b_port=12,
276
                metadata={"s_vlan": 6},
277
                status=EntityStatus.UP,
278
            ),
279
        ]
280
        attributes = {
281
            "controller": get_controller_mock(),
282
            "name": "circuit_13",
283
            "uni_a": get_uni_mocked(is_valid=True),
284
            "uni_z": get_uni_mocked(is_valid=True),
285
            "primary_path": primary_path,
286
            "backup_path": backup_path,
287
            "enabled": True,
288
        }
289
290
        evc = EVC(**attributes)
291
        evc.current_path = evc.backup_path
292
        deploy_to_mocked.reset_mock()
293
        current_handle_link_down = evc.handle_link_down()
294
        assert deploy_mocked.call_count == 0
295
        deploy_to_mocked.assert_called_once()
296
        assert current_handle_link_down
297
        msg = f"{evc} deployed after link down."
298
        log_mocked.debug.assert_called_once_with(msg)
299
300
    @patch("napps.kytos.mef_eline.controllers.ELineController.upsert_evc")
301
    @patch("napps.kytos.mef_eline.models.evc.log")