Code Duplication    Length = 58-68 lines in 3 locations

tests/unit/models/test_link_protection.py 3 locations

@@ 369-436 (lines=68) @@
366
        msg = f"Failed to re-deploy {evc} after link down."
367
        log_mocked.debug.assert_called_once_with(msg)
368
369
    @patch("napps.kytos.mef_eline.models.evc.log")
370
    @patch("napps.kytos.mef_eline.models.evc.EVCDeploy.deploy_to_path")
371
    @patch("napps.kytos.mef_eline.models.evc.EVCDeploy._send_flow_mods")
372
    @patch(DEPLOY_TO_PRIMARY_PATH)
373
    @patch("napps.kytos.mef_eline.models.path.Path.status", EntityStatus.DOWN)
374
    def test_handle_link_down_case_4(
375
        self,
376
        deploy_to_mocked,
377
        _send_flow_mods_mocked,
378
        deploy_mocked,
379
        log_mocked,
380
    ):
381
        """Test if circuit with dynamic path is return success."""
382
        deploy_mocked.return_value = True
383
        deploy_to_mocked.return_value = False
384
        primary_path = [
385
            get_link_mocked(
386
                endpoint_a_port=9,
387
                endpoint_b_port=10,
388
                metadata={"s_vlan": 5},
389
                status=EntityStatus.DOWN,
390
            ),
391
            get_link_mocked(
392
                endpoint_a_port=11,
393
                endpoint_b_port=12,
394
                metadata={"s_vlan": 6},
395
                status=EntityStatus.UP,
396
            ),
397
        ]
398
        backup_path = [
399
            get_link_mocked(
400
                endpoint_a_port=9,
401
                endpoint_b_port=10,
402
                metadata={"s_vlan": 5},
403
                status=EntityStatus.DOWN,
404
            ),
405
            get_link_mocked(
406
                endpoint_a_port=13,
407
                endpoint_b_port=14,
408
                metadata={"s_vlan": 6},
409
                status=EntityStatus.UP,
410
            ),
411
        ]
412
        attributes = {
413
            "controller": get_controller_mock(),
414
            "name": "circuit_8",
415
            "uni_a": get_uni_mocked(is_valid=True),
416
            "uni_z": get_uni_mocked(is_valid=True),
417
            "primary_path": primary_path,
418
            "backup_path": backup_path,
419
            "enabled": True,
420
            "dynamic_backup_path": True,
421
        }
422
423
        evc = EVC(**attributes)
424
        evc.current_path = evc.backup_path
425
426
        # storehouse mock
427
        evc._storehouse.box = Mock()  # pylint: disable=protected-access
428
        evc._storehouse.box.data = {}  # pylint: disable=protected-access
429
430
        deploy_to_mocked.reset_mock()
431
        current_handle_link_down = evc.handle_link_down()
432
        self.assertEqual(deploy_to_mocked.call_count, 1)
433
434
        self.assertTrue(current_handle_link_down)
435
        msg = f"{evc} deployed after link down."
436
        log_mocked.debug.assert_called_with(msg)
437
438
    @patch("napps.kytos.mef_eline.models.evc.EVCDeploy.deploy")
439
    @patch("napps.kytos.mef_eline.models.evc.LinkProtection.deploy_to")
@@ 181-246 (lines=66) @@
178
        deployed = evc.deploy_to("primary_path", evc.primary_path)
179
        self.assertFalse(deployed)
180
181
    @patch("napps.kytos.mef_eline.models.evc.log")
182
    @patch("napps.kytos.mef_eline.models.evc.EVCDeploy._send_flow_mods")
183
    @patch(DEPLOY_TO_BACKUP_PATH)
184
    @patch("napps.kytos.mef_eline.models.evc.EVCDeploy.deploy")
185
    @patch("napps.kytos.mef_eline.models.path.Path.status")
186
    def test_handle_link_down_case_1(
187
        self,
188
        path_status_mocked,
189
        deploy_mocked,
190
        deploy_to_mocked,
191
        _send_flow_mods_mocked,
192
        log_mocked,
193
    ):
194
        """Test if deploy_to backup path is called."""
195
        deploy_mocked.return_value = True
196
        path_status_mocked.side_effect = [EntityStatus.DOWN, EntityStatus.UP]
197
198
        primary_path = [
199
            get_link_mocked(
200
                endpoint_a_port=9,
201
                endpoint_b_port=10,
202
                metadata={"s_vlan": 5},
203
                status=EntityStatus.DOWN,
204
            ),
205
            get_link_mocked(
206
                endpoint_a_port=11,
207
                endpoint_b_port=12,
208
                metadata={"s_vlan": 6},
209
                status=EntityStatus.UP,
210
            ),
211
        ]
212
        backup_path = [
213
            get_link_mocked(
214
                endpoint_a_port=9,
215
                endpoint_b_port=10,
216
                metadata={"s_vlan": 5},
217
                status=EntityStatus.UP,
218
            ),
219
            get_link_mocked(
220
                endpoint_a_port=11,
221
                endpoint_b_port=12,
222
                metadata={"s_vlan": 6},
223
                status=EntityStatus.UP,
224
            ),
225
        ]
226
        attributes = {
227
            "controller": get_controller_mock(),
228
            "name": "circuit_6",
229
            "uni_a": get_uni_mocked(is_valid=True),
230
            "uni_z": get_uni_mocked(is_valid=True),
231
            "primary_path": primary_path,
232
            "backup_path": backup_path,
233
            "enabled": True,
234
        }
235
        evc = EVC(**attributes)
236
237
        evc.current_path = evc.primary_path
238
        evc.activate()
239
        deploy_to_mocked.reset_mock()
240
        current_handle_link_down = evc.handle_link_down()
241
        self.assertEqual(deploy_mocked.call_count, 0)
242
        deploy_to_mocked.assert_called_once()
243
244
        self.assertTrue(current_handle_link_down)
245
        msg = f"{evc} deployed after link down."
246
        log_mocked.debug.assert_called_once_with(msg)
247
248
    @patch("napps.kytos.mef_eline.models.evc.log")
249
    @patch("napps.kytos.mef_eline.models.evc.EVCDeploy.deploy")
@@ 248-305 (lines=58) @@
245
        msg = f"{evc} deployed after link down."
246
        log_mocked.debug.assert_called_once_with(msg)
247
248
    @patch("napps.kytos.mef_eline.models.evc.log")
249
    @patch("napps.kytos.mef_eline.models.evc.EVCDeploy.deploy")
250
    @patch(DEPLOY_TO_PRIMARY_PATH)
251
    @patch("napps.kytos.mef_eline.models.path.Path.status")
252
    def test_handle_link_down_case_2(
253
        self, path_status_mocked, deploy_to_mocked, deploy_mocked, log_mocked
254
    ):
255
        """Test if deploy_to backup path is called."""
256
        deploy_mocked.return_value = True
257
        deploy_to_mocked.return_value = True
258
        path_status_mocked.side_effect = [EntityStatus.UP, EntityStatus.DOWN]
259
        primary_path = [
260
            get_link_mocked(
261
                endpoint_a_port=7,
262
                endpoint_b_port=8,
263
                metadata={"s_vlan": 5},
264
                status=EntityStatus.UP,
265
            ),
266
            get_link_mocked(
267
                endpoint_a_port=11,
268
                endpoint_b_port=12,
269
                metadata={"s_vlan": 6},
270
                status=EntityStatus.UP,
271
            ),
272
        ]
273
        backup_path = [
274
            get_link_mocked(
275
                endpoint_a_port=7,
276
                endpoint_b_port=10,
277
                metadata={"s_vlan": 5},
278
                status=EntityStatus.DOWN,
279
            ),
280
            get_link_mocked(
281
                endpoint_a_port=15,
282
                endpoint_b_port=12,
283
                metadata={"s_vlan": 6},
284
                status=EntityStatus.UP,
285
            ),
286
        ]
287
        attributes = {
288
            "controller": get_controller_mock(),
289
            "name": "circuit_13",
290
            "uni_a": get_uni_mocked(is_valid=True),
291
            "uni_z": get_uni_mocked(is_valid=True),
292
            "primary_path": primary_path,
293
            "backup_path": backup_path,
294
            "enabled": True,
295
        }
296
297
        evc = EVC(**attributes)
298
        evc.current_path = evc.backup_path
299
        deploy_to_mocked.reset_mock()
300
        current_handle_link_down = evc.handle_link_down()
301
        self.assertEqual(deploy_mocked.call_count, 0)
302
        deploy_to_mocked.assert_called_once()
303
        self.assertTrue(current_handle_link_down)
304
        msg = f"{evc} deployed after link down."
305
        log_mocked.debug.assert_called_once_with(msg)
306
307
    @patch("napps.kytos.mef_eline.models.evc.log")
308
    @patch("napps.kytos.mef_eline.models.evc.EVCDeploy.deploy")