Code Duplication    Length = 40-44 lines in 5 locations

tests/models/test_link_protection.py 5 locations

@@ 252-295 (lines=44) @@
249
        msg = f'Failed to re-deploy {evc} after link down.'
250
        log_mocked.debug.assert_called_once_with(msg)
251
252
    @patch('napps.kytos.mef_eline.models.log')
253
    @patch('napps.kytos.mef_eline.models.EVCDeploy.deploy')
254
    @patch('napps.kytos.mef_eline.models.LinkProtection.deploy_to')
255
    def test_handle_link_down_case_4(self, deploy_to_mocked, deploy_mocked,
256
                                     log_mocked):
257
        """Test if circuit without dynamic path is return failed."""
258
        deploy_mocked.return_value = True
259
        deploy_to_mocked.return_value = False
260
        primary_path = [
261
                get_link_mocked(endpoint_a_port=9, endpoint_b_port=10,
262
                                metadata={"s_vlan": 5},
263
                                status=EntityStatus.DOWN),
264
                get_link_mocked(endpoint_a_port=11, endpoint_b_port=12,
265
                                metadata={"s_vlan": 6},
266
                                status=EntityStatus.UP),
267
        ]
268
        backup_path = [
269
                get_link_mocked(endpoint_a_port=9, endpoint_b_port=10,
270
                                metadata={"s_vlan": 5},
271
                                status=EntityStatus.DOWN),
272
                get_link_mocked(endpoint_a_port=11, endpoint_b_port=12,
273
                                metadata={"s_vlan": 6},
274
                                status=EntityStatus.UP),
275
        ]
276
        attributes = {
277
            "name": "circuit_name",
278
            "uni_a": get_uni_mocked(is_valid=True),
279
            "uni_z": get_uni_mocked(is_valid=True),
280
            "primary_path": primary_path,
281
            "backup_path": backup_path,
282
            "enabled": True,
283
            "dynamic_backup_path": True
284
        }
285
286
        evc = EVC(**attributes)
287
        evc.current_path = evc.backup_path
288
        current_handle_link_down = evc.handle_link_down()
289
        self.assertEqual(deploy_mocked.call_count, 1)
290
        self.assertEqual(deploy_to_mocked.call_count, 1)
291
        deploy_to_mocked.assert_called_once_with('primary_path',
292
                                                 evc.primary_path)
293
        self.assertTrue(current_handle_link_down)
294
        msg = f"{evc} deployed after link down."
295
        log_mocked.debug.assert_called_once_with(msg)
296
297
    @patch('napps.kytos.mef_eline.models.EVCDeploy.deploy')
298
    @patch('napps.kytos.mef_eline.models.LinkProtection.deploy_to')
@@ 208-250 (lines=43) @@
205
        msg = f"{evc} deployed after link down."
206
        log_mocked.debug.assert_called_once_with(msg)
207
208
    @patch('napps.kytos.mef_eline.models.log')
209
    @patch('napps.kytos.mef_eline.models.EVCDeploy.deploy')
210
    @patch('napps.kytos.mef_eline.models.LinkProtection.deploy_to')
211
    def test_handle_link_down_case_3(self, deploy_to_mocked, deploy_mocked,
212
                                     log_mocked):
213
        """Test if circuit without dynamic path is return failed."""
214
        deploy_mocked.return_value = False
215
        deploy_to_mocked.return_value = False
216
        primary_path = [
217
                get_link_mocked(endpoint_a_port=9, endpoint_b_port=10,
218
                                metadata={"s_vlan": 5},
219
                                status=EntityStatus.DOWN),
220
                get_link_mocked(endpoint_a_port=11, endpoint_b_port=12,
221
                                metadata={"s_vlan": 6},
222
                                status=EntityStatus.UP),
223
        ]
224
        backup_path = [
225
                get_link_mocked(endpoint_a_port=9, endpoint_b_port=10,
226
                                metadata={"s_vlan": 5},
227
                                status=EntityStatus.DOWN),
228
                get_link_mocked(endpoint_a_port=11, endpoint_b_port=12,
229
                                metadata={"s_vlan": 6},
230
                                status=EntityStatus.UP),
231
        ]
232
        attributes = {
233
            "name": "circuit_name",
234
            "uni_a": get_uni_mocked(is_valid=True),
235
            "uni_z": get_uni_mocked(is_valid=True),
236
            "primary_path": primary_path,
237
            "backup_path": backup_path,
238
            "enabled": True
239
        }
240
241
        evc = EVC(**attributes)
242
        evc.current_path = evc.backup_path
243
        current_handle_link_down = evc.handle_link_down()
244
        self.assertEqual(deploy_mocked.call_count, 0)
245
        self.assertEqual(deploy_to_mocked.call_count, 1)
246
        deploy_to_mocked.assert_called_once_with('primary_path',
247
                                                 evc.primary_path)
248
        self.assertFalse(current_handle_link_down)
249
        msg = f'Failed to re-deploy {evc} after link down.'
250
        log_mocked.debug.assert_called_once_with(msg)
251
252
    @patch('napps.kytos.mef_eline.models.log')
253
    @patch('napps.kytos.mef_eline.models.EVCDeploy.deploy')
@@ 418-457 (lines=40) @@
415
                                                 evc.backup_path)
416
        self.assertTrue(current_handle_link_up)
417
418
    @patch('napps.kytos.mef_eline.models.EVCDeploy.deploy')
419
    @patch('napps.kytos.mef_eline.models.LinkProtection.deploy_to')
420
    def test_handle_link_up_case_4(self, deploy_to_mocked, deploy_mocked):
421
        """Test if it is deployed after the dynamic_backup_path deploy."""
422
        deploy_mocked.return_value = True
423
        deploy_to_mocked.return_value = False
424
        primary_path = [
425
                get_link_mocked(endpoint_a_port=9, endpoint_b_port=10,
426
                                metadata={"s_vlan": 5},
427
                                status=EntityStatus.DOWN),
428
                get_link_mocked(endpoint_a_port=11, endpoint_b_port=12,
429
                                metadata={"s_vlan": 6},
430
                                status=EntityStatus.UP),
431
        ]
432
        backup_path = [
433
                get_link_mocked(endpoint_a_port=9, endpoint_b_port=10,
434
                                metadata={"s_vlan": 5},
435
                                status=EntityStatus.DOWN),
436
                get_link_mocked(endpoint_a_port=11, endpoint_b_port=12,
437
                                metadata={"s_vlan": 6},
438
                                status=EntityStatus.UP),
439
        ]
440
        attributes = {
441
            "name": "circuit_name",
442
            "uni_a": get_uni_mocked(is_valid=True),
443
            "uni_z": get_uni_mocked(is_valid=True),
444
            "primary_path": primary_path,
445
            "backup_path": backup_path,
446
            "enabled": True,
447
            "dynamic_backup_path": True
448
        }
449
450
        evc = EVC(**attributes)
451
        evc.current_path = Path([])
452
        current_handle_link_up = evc.handle_link_up(backup_path[0])
453
        self.assertEqual(deploy_mocked.call_count, 1)
454
        self.assertEqual(deploy_to_mocked.call_count, 1)
455
        deploy_to_mocked.assert_called_once_with('backup_path',
456
                                                 evc.backup_path)
457
        self.assertTrue(current_handle_link_up)
458
@@ 377-416 (lines=40) @@
374
                                                 evc.primary_path)
375
        self.assertTrue(current_handle_link_up)
376
377
    @patch('napps.kytos.mef_eline.models.EVCDeploy.deploy')
378
    @patch('napps.kytos.mef_eline.models.LinkProtection.deploy_to')
379
    def test_handle_link_up_case_3(self, deploy_to_mocked, deploy_mocked):
380
        """Test if it is deployed after the backup is up."""
381
        deploy_mocked.return_value = True
382
        deploy_to_mocked.return_value = True
383
        primary_path = [
384
                get_link_mocked(endpoint_a_port=9, endpoint_b_port=10,
385
                                metadata={"s_vlan": 5},
386
                                status=EntityStatus.DOWN),
387
                get_link_mocked(endpoint_a_port=11, endpoint_b_port=12,
388
                                metadata={"s_vlan": 6},
389
                                status=EntityStatus.UP),
390
        ]
391
        backup_path = [
392
                get_link_mocked(endpoint_a_port=9, endpoint_b_port=10,
393
                                metadata={"s_vlan": 5},
394
                                status=EntityStatus.DOWN),
395
                get_link_mocked(endpoint_a_port=11, endpoint_b_port=12,
396
                                metadata={"s_vlan": 6},
397
                                status=EntityStatus.UP),
398
        ]
399
        attributes = {
400
            "name": "circuit_name",
401
            "uni_a": get_uni_mocked(is_valid=True),
402
            "uni_z": get_uni_mocked(is_valid=True),
403
            "primary_path": primary_path,
404
            "backup_path": backup_path,
405
            "enabled": True,
406
            "dynamic_backup_path": True
407
        }
408
409
        evc = EVC(**attributes)
410
        evc.current_path = Path([])
411
        current_handle_link_up = evc.handle_link_up(backup_path[0])
412
        self.assertEqual(deploy_mocked.call_count, 0)
413
        self.assertEqual(deploy_to_mocked.call_count, 1)
414
        deploy_to_mocked.assert_called_once_with('backup_path',
415
                                                 evc.backup_path)
416
        self.assertTrue(current_handle_link_up)
417
418
    @patch('napps.kytos.mef_eline.models.EVCDeploy.deploy')
419
    @patch('napps.kytos.mef_eline.models.LinkProtection.deploy_to')
@@ 336-375 (lines=40) @@
333
        self.assertEqual(deploy_to_mocked.call_count, 0)
334
        self.assertTrue(current_handle_link_up)
335
336
    @patch('napps.kytos.mef_eline.models.EVCDeploy.deploy')
337
    @patch('napps.kytos.mef_eline.models.LinkProtection.deploy_to')
338
    def test_handle_link_up_case_2(self, deploy_to_mocked, deploy_mocked):
339
        """Test if it is changing from backup_path to primary_path."""
340
        deploy_mocked.return_value = True
341
        deploy_to_mocked.return_value = True
342
        primary_path = [
343
                get_link_mocked(endpoint_a_port=9, endpoint_b_port=10,
344
                                metadata={"s_vlan": 5},
345
                                status=EntityStatus.UP),
346
                get_link_mocked(endpoint_a_port=11, endpoint_b_port=12,
347
                                metadata={"s_vlan": 6},
348
                                status=EntityStatus.UP),
349
        ]
350
        backup_path = [
351
                get_link_mocked(endpoint_a_port=9, endpoint_b_port=10,
352
                                metadata={"s_vlan": 5},
353
                                status=EntityStatus.UP),
354
                get_link_mocked(endpoint_a_port=11, endpoint_b_port=12,
355
                                metadata={"s_vlan": 6},
356
                                status=EntityStatus.UP),
357
        ]
358
        attributes = {
359
            "name": "circuit_name",
360
            "uni_a": get_uni_mocked(is_valid=True),
361
            "uni_z": get_uni_mocked(is_valid=True),
362
            "primary_path": primary_path,
363
            "backup_path": backup_path,
364
            "enabled": True,
365
            "dynamic_backup_path": True
366
        }
367
368
        evc = EVC(**attributes)
369
        evc.current_path = evc.backup_path
370
        current_handle_link_up = evc.handle_link_up(primary_path[0])
371
        self.assertEqual(deploy_mocked.call_count, 0)
372
        self.assertEqual(deploy_to_mocked.call_count, 1)
373
        deploy_to_mocked.assert_called_once_with('primary_path',
374
                                                 evc.primary_path)
375
        self.assertTrue(current_handle_link_up)
376
377
    @patch('napps.kytos.mef_eline.models.EVCDeploy.deploy')
378
    @patch('napps.kytos.mef_eline.models.LinkProtection.deploy_to')