Passed
Push — master ( e6735a...b235ee )
by Italo Valcy
02:28 queued 14s
created

build.tests.unit.models.test_link_protection   A

Complexity

Total Complexity 15

Size/Duplication

Total Lines 695
Duplicated Lines 17.55 %

Importance

Changes 0
Metric Value
wmc 15
eloc 541
dl 122
loc 695
rs 10
c 0
b 0
f 0

15 Methods

Rating   Name   Duplication   Size   Complexity  
A TestLinkProtection.test_deploy_to_case_1() 0 25 1
A TestLinkProtection.test_handle_link_up_case_5() 0 15 1
A TestLinkProtection.setUp() 0 40 1
A TestLinkProtection.test_is_using_backup_path() 0 26 1
B TestLinkProtection.test_handle_link_up_case_2() 0 54 1
A TestLinkProtection.test_is_using_primary_path() 0 22 1
A TestLinkProtection.test_deploy_to_case_3() 0 21 1
B TestLinkProtection.test_handle_link_up_case_4() 0 71 1
B TestLinkProtection.test_handle_link_down_case_3() 0 62 1
A TestLinkProtection.test_handle_link_down_case_1() 0 27 1
A TestLinkProtection.test_deploy_to_case_2() 0 41 1
B TestLinkProtection.test_handle_link_down_case_2() 58 58 1
B TestLinkProtection.test_handle_link_up_case_3() 0 67 1
B TestLinkProtection.test_handle_link_down_case_4() 64 64 1
B TestLinkProtection.test_handle_link_up_case_1() 0 52 1

How to fix   Duplicated Code   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

1
"""Module to test the LinkProtection class."""
2
import sys
3
4
from unittest import TestCase
5
from unittest.mock import MagicMock, patch
6
7
from napps.kytos.mef_eline.models import EVC, Path  # NOQA pycodestyle
8
from napps.kytos.mef_eline.tests.helpers import (
9
    get_link_mocked,
10
    get_uni_mocked,
11
    get_controller_mock,
12
    get_mocked_requests,
13
)  # NOQA pycodestyle
14
15
16
from kytos.core.common import EntityStatus
17
18
sys.path.insert(0, "/var/lib/kytos/napps/..")
19
20
21
DEPLOY_TO_PRIMARY_PATH = (
22
    "napps.kytos.mef_eline.models.evc.LinkProtection.deploy_to_primary_path"
23
)
24
DEPLOY_TO_BACKUP_PATH = (
25
    "napps.kytos.mef_eline.models.evc.LinkProtection.deploy_to_backup_path"
26
)
27
GET_BEST_PATH = (
28
    "napps.kytos.mef_eline.models.path.DynamicPathManager.get_best_path"
29
)
30
31
32
class TestLinkProtection(TestCase):  # pylint: disable=too-many-public-methods
33
    """Tests to validate LinkProtection class."""
34
35
    def setUp(self):
36
        primary_path = [
37
            get_link_mocked(
38
                endpoint_a_port=9,
39
                endpoint_b_port=10,
40
                metadata={"s_vlan": 5},
41
                status=EntityStatus.UP,
42
            ),
43
            get_link_mocked(
44
                endpoint_a_port=11,
45
                endpoint_b_port=12,
46
                metadata={"s_vlan": 6},
47
                status=EntityStatus.DOWN,
48
            ),
49
        ]
50
        backup_path = [
51
            get_link_mocked(
52
                endpoint_a_port=13,
53
                endpoint_b_port=14,
54
                metadata={"s_vlan": 5},
55
                status=EntityStatus.DOWN,
56
            ),
57
            get_link_mocked(
58
                endpoint_a_port=11,
59
                endpoint_b_port=12,
60
                metadata={"s_vlan": 6},
61
                status=EntityStatus.DOWN,
62
            ),
63
        ]
64
        attributes = {
65
            "controller": get_controller_mock(),
66
            "name": "circuit_1",
67
            "uni_a": get_uni_mocked(is_valid=True),
68
            "uni_z": get_uni_mocked(is_valid=True),
69
            "primary_path": primary_path,
70
            "backup_path": backup_path,
71
            "enabled": True,
72
            "dynamic_backup_path": True,
73
        }
74
        self.evc = EVC(**attributes)
75
76
    def test_is_using_backup_path(self):
77
        """Test test is using backup path."""
78
79
        attributes = {
80
            "controller": get_controller_mock(),
81
            "name": "circuit_1",
82
            "uni_a": get_uni_mocked(is_valid=True),
83
            "uni_z": get_uni_mocked(is_valid=True),
84
            "backup_path": [
85
                get_link_mocked(
86
                    endpoint_a_port=10,
87
                    endpoint_b_port=9,
88
                    metadata={"s_vlan": 5},
89
                ),
90
                get_link_mocked(
91
                    endpoint_a_port=12,
92
                    endpoint_b_port=11,
93
                    metadata={"s_vlan": 6},
94
                ),
95
            ],
96
        }
97
98
        evc = EVC(**attributes)
99
        self.assertFalse(evc.is_using_backup_path())
100
        evc.current_path = evc.backup_path
101
        self.assertTrue(evc.is_using_backup_path())
102
103
    def test_is_using_primary_path(self):
104
        """Test test is using primary path."""
105
        primary_path = [
106
            get_link_mocked(
107
                endpoint_a_port=10, endpoint_b_port=9, metadata={"s_vlan": 5}
108
            ),
109
            get_link_mocked(
110
                endpoint_a_port=12, endpoint_b_port=11, metadata={"s_vlan": 6}
111
            ),
112
        ]
113
114
        attributes = {
115
            "controller": get_controller_mock(),
116
            "name": "circuit_2",
117
            "uni_a": get_uni_mocked(is_valid=True),
118
            "uni_z": get_uni_mocked(is_valid=True),
119
            "primary_path": primary_path,
120
        }
121
        evc = EVC(**attributes)
122
        self.assertFalse(evc.is_using_primary_path())
123
        evc.current_path = evc.primary_path
124
        self.assertTrue(evc.is_using_primary_path())
125
126
    @patch("napps.kytos.mef_eline.models.evc.log")
127
    def test_deploy_to_case_1(self, log_mocked):
128
        """Test if the path is equal to current_path."""
129
        primary_path = [
130
            get_link_mocked(
131
                endpoint_a_port=10, endpoint_b_port=9, metadata={"s_vlan": 5}
132
            ),
133
            get_link_mocked(
134
                endpoint_a_port=12, endpoint_b_port=11, metadata={"s_vlan": 6}
135
            ),
136
        ]
137
        attributes = {
138
            "controller": get_controller_mock(),
139
            "name": "circuit_3",
140
            "uni_a": get_uni_mocked(is_valid=True),
141
            "uni_z": get_uni_mocked(is_valid=True),
142
            "primary_path": primary_path,
143
        }
144
        evc = EVC(**attributes)
145
        evc.current_path = evc.primary_path
146
147
        expected_deployed = evc.deploy_to("primary_path", evc.primary_path)
148
        expected_msg = "primary_path is equal to current_path."
149
        log_mocked.debug.assert_called_with(expected_msg)
150
        self.assertTrue(expected_deployed)
151
152
    # pylint: disable=too-many-arguments
153
    @patch("napps.kytos.mef_eline.models.evc.notify_link_available_tags")
154
    @patch("requests.post")
155
    @patch("napps.kytos.mef_eline.controllers.ELineController.upsert_evc")
156
    @patch("napps.kytos.mef_eline.models.evc.EVCDeploy.deploy")
157
    @patch("napps.kytos.mef_eline.models.evc.EVC._install_nni_flows")
158
    @patch("napps.kytos.mef_eline.models.evc.EVC._install_uni_flows")
159
    @patch("napps.kytos.mef_eline.models.path.Path.status", EntityStatus.UP)
160
    def test_deploy_to_case_2(
161
        self,
162
        install_uni_flows_mocked,
163
        install_nni_flows_mocked,
164
        deploy_mocked,
165
        _,
166
        requests_mock,
167
        notify_mock
168
    ):
169
        """Test deploy with all links up."""
170
        deploy_mocked.return_value = True
171
        response = MagicMock()
172
        response.status_code = 201
173
        requests_mock.return_value = response
174
175
        primary_path = [
176
            get_link_mocked(status=EntityStatus.UP),
177
            get_link_mocked(status=EntityStatus.UP),
178
        ]
179
        attributes = {
180
            "controller": get_controller_mock(),
181
            "name": "circuit_4",
182
            "uni_a": get_uni_mocked(is_valid=True),
183
            "uni_z": get_uni_mocked(is_valid=True),
184
            "primary_path": primary_path,
185
            "enabled": True,
186
        }
187
        evc = EVC(**attributes)
188
189
        deployed = evc.deploy_to("primary_path", evc.primary_path)
190
        install_uni_flows_mocked.assert_called_with(evc.primary_path)
191
        install_nni_flows_mocked.assert_called_with(evc.primary_path)
192
        self.assertTrue(deployed)
193
        notify_mock.assert_called()
194
195
    @patch("requests.get", side_effect=get_mocked_requests)
196
    def test_deploy_to_case_3(self, requests_mocked):
197
        # pylint: disable=unused-argument
198
        """Test deploy with one link down."""
199
        link1 = get_link_mocked()
200
        link2 = get_link_mocked()
201
        link1.id = "abc"
202
        link2.id = "def"
203
        primary_path = [link1, link2]
204
        attributes = {
205
            "controller": get_controller_mock(),
206
            "name": "circuit_5",
207
            "uni_a": get_uni_mocked(is_valid=True),
208
            "uni_z": get_uni_mocked(is_valid=True),
209
            "primary_path": primary_path,
210
            "enabled": True,
211
        }
212
        evc = EVC(**attributes)
213
214
        deployed = evc.deploy_to("primary_path", evc.primary_path)
215
        self.assertFalse(deployed)
216
217
    @patch("napps.kytos.mef_eline.models.evc.log")
218
    @patch("napps.kytos.mef_eline.models.evc.EVCDeploy._send_flow_mods")
219
    @patch(DEPLOY_TO_BACKUP_PATH)
220
    @patch("napps.kytos.mef_eline.models.evc.EVCDeploy.deploy")
221
    @patch("napps.kytos.mef_eline.models.path.Path.status")
222
    def test_handle_link_down_case_1(
223
        self,
224
        path_status_mocked,
225
        deploy_mocked,
226
        deploy_to_mocked,
227
        _send_flow_mods_mocked,
228
        log_mocked,
229
    ):
230
        """Test if deploy_to backup path is called."""
231
        deploy_mocked.return_value = True
232
        path_status_mocked.side_effect = [EntityStatus.DOWN, EntityStatus.UP]
233
234
        self.evc.current_path = self.evc.primary_path
235
        self.evc.activate()
236
        deploy_to_mocked.reset_mock()
237
        current_handle_link_down = self.evc.handle_link_down()
238
        self.assertEqual(deploy_mocked.call_count, 0)
239
        deploy_to_mocked.assert_called_once()
240
241
        self.assertTrue(current_handle_link_down)
242
        msg = f"{self.evc} deployed after link down."
243
        log_mocked.debug.assert_called_once_with(msg)
244
245 View Code Duplication
    @patch("napps.kytos.mef_eline.models.evc.log")
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
246
    @patch("napps.kytos.mef_eline.models.evc.EVCDeploy.deploy")
247
    @patch(DEPLOY_TO_PRIMARY_PATH)
248
    @patch("napps.kytos.mef_eline.models.path.Path.status")
249
    def test_handle_link_down_case_2(
250
        self, path_status_mocked, deploy_to_mocked, deploy_mocked, log_mocked
251
    ):
252
        """Test if deploy_to backup path is called."""
253
        deploy_mocked.return_value = True
254
        deploy_to_mocked.return_value = True
255
        path_status_mocked.side_effect = [EntityStatus.UP, EntityStatus.DOWN]
256
        primary_path = [
257
            get_link_mocked(
258
                endpoint_a_port=7,
259
                endpoint_b_port=8,
260
                metadata={"s_vlan": 5},
261
                status=EntityStatus.UP,
262
            ),
263
            get_link_mocked(
264
                endpoint_a_port=11,
265
                endpoint_b_port=12,
266
                metadata={"s_vlan": 6},
267
                status=EntityStatus.UP,
268
            ),
269
        ]
270
        backup_path = [
271
            get_link_mocked(
272
                endpoint_a_port=7,
273
                endpoint_b_port=10,
274
                metadata={"s_vlan": 5},
275
                status=EntityStatus.DOWN,
276
            ),
277
            get_link_mocked(
278
                endpoint_a_port=15,
279
                endpoint_b_port=12,
280
                metadata={"s_vlan": 6},
281
                status=EntityStatus.UP,
282
            ),
283
        ]
284
        attributes = {
285
            "controller": get_controller_mock(),
286
            "name": "circuit_13",
287
            "uni_a": get_uni_mocked(is_valid=True),
288
            "uni_z": get_uni_mocked(is_valid=True),
289
            "primary_path": primary_path,
290
            "backup_path": backup_path,
291
            "enabled": True,
292
        }
293
294
        evc = EVC(**attributes)
295
        evc.current_path = evc.backup_path
296
        deploy_to_mocked.reset_mock()
297
        current_handle_link_down = evc.handle_link_down()
298
        self.assertEqual(deploy_mocked.call_count, 0)
299
        deploy_to_mocked.assert_called_once()
300
        self.assertTrue(current_handle_link_down)
301
        msg = f"{evc} deployed after link down."
302
        log_mocked.debug.assert_called_once_with(msg)
303
304
    @patch("napps.kytos.mef_eline.controllers.ELineController.upsert_evc")
305
    @patch("napps.kytos.mef_eline.models.evc.log")
306
    @patch("napps.kytos.mef_eline.models.evc.EVCDeploy.deploy")
307
    @patch(DEPLOY_TO_PRIMARY_PATH)
308
    @patch("napps.kytos.mef_eline.models.path.DynamicPathManager.get_paths")
309
    @patch("napps.kytos.mef_eline.models.path.Path.status", EntityStatus.DOWN)
310
    def test_handle_link_down_case_3(
311
        self, get_paths_mocked, deploy_to_mocked, deploy_mocked, log_mocked, _
312
    ):
313
        """Test if circuit without dynamic path is return failed."""
314
        deploy_mocked.return_value = False
315
        deploy_to_mocked.return_value = False
316
        primary_path = [
317
            get_link_mocked(
318
                endpoint_a_port=9,
319
                endpoint_b_port=10,
320
                metadata={"s_vlan": 5},
321
                status=EntityStatus.DOWN,
322
            ),
323
            get_link_mocked(
324
                endpoint_a_port=11,
325
                endpoint_b_port=12,
326
                metadata={"s_vlan": 6},
327
                status=EntityStatus.UP,
328
            ),
329
        ]
330
        backup_path = [
331
            get_link_mocked(
332
                endpoint_a_port=9,
333
                endpoint_b_port=10,
334
                metadata={"s_vlan": 5},
335
                status=EntityStatus.DOWN,
336
            ),
337
            get_link_mocked(
338
                endpoint_a_port=13,
339
                endpoint_b_port=14,
340
                metadata={"s_vlan": 6},
341
                status=EntityStatus.UP,
342
            ),
343
        ]
344
        attributes = {
345
            "controller": get_controller_mock(),
346
            "name": "circuit_7",
347
            "uni_a": get_uni_mocked(is_valid=True),
348
            "uni_z": get_uni_mocked(is_valid=True),
349
            "primary_path": primary_path,
350
            "backup_path": backup_path,
351
            "enabled": True,
352
        }
353
354
        evc = EVC(**attributes)
355
        evc.current_path = evc.backup_path
356
        deploy_to_mocked.reset_mock()
357
        current_handle_link_down = evc.handle_link_down()
358
359
        self.assertEqual(get_paths_mocked.call_count, 0)
360
        self.assertEqual(deploy_mocked.call_count, 0)
361
        self.assertEqual(deploy_to_mocked.call_count, 1)
362
363
        self.assertFalse(current_handle_link_down)
364
        msg = f"Failed to re-deploy {evc} after link down."
365
        log_mocked.debug.assert_called_once_with(msg)
366
367 View Code Duplication
    @patch("napps.kytos.mef_eline.models.evc.log")
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
368
    @patch("napps.kytos.mef_eline.models.evc.EVCDeploy.deploy_to_path")
369
    @patch("napps.kytos.mef_eline.models.evc.EVCDeploy._send_flow_mods")
370
    @patch(DEPLOY_TO_PRIMARY_PATH)
371
    @patch("napps.kytos.mef_eline.models.path.Path.status", EntityStatus.DOWN)
372
    def test_handle_link_down_case_4(
373
        self,
374
        deploy_to_mocked,
375
        _send_flow_mods_mocked,
376
        deploy_mocked,
377
        log_mocked,
378
    ):
379
        """Test if circuit with dynamic path is return success."""
380
        deploy_mocked.return_value = True
381
        deploy_to_mocked.return_value = False
382
        primary_path = [
383
            get_link_mocked(
384
                endpoint_a_port=9,
385
                endpoint_b_port=10,
386
                metadata={"s_vlan": 5},
387
                status=EntityStatus.DOWN,
388
            ),
389
            get_link_mocked(
390
                endpoint_a_port=11,
391
                endpoint_b_port=12,
392
                metadata={"s_vlan": 6},
393
                status=EntityStatus.UP,
394
            ),
395
        ]
396
        backup_path = [
397
            get_link_mocked(
398
                endpoint_a_port=9,
399
                endpoint_b_port=10,
400
                metadata={"s_vlan": 5},
401
                status=EntityStatus.DOWN,
402
            ),
403
            get_link_mocked(
404
                endpoint_a_port=13,
405
                endpoint_b_port=14,
406
                metadata={"s_vlan": 6},
407
                status=EntityStatus.UP,
408
            ),
409
        ]
410
        attributes = {
411
            "controller": get_controller_mock(),
412
            "name": "circuit_8",
413
            "uni_a": get_uni_mocked(is_valid=True),
414
            "uni_z": get_uni_mocked(is_valid=True),
415
            "primary_path": primary_path,
416
            "backup_path": backup_path,
417
            "enabled": True,
418
            "dynamic_backup_path": True,
419
        }
420
421
        evc = EVC(**attributes)
422
        evc.current_path = evc.backup_path
423
424
        deploy_to_mocked.reset_mock()
425
        current_handle_link_down = evc.handle_link_down()
426
        self.assertEqual(deploy_to_mocked.call_count, 1)
427
428
        self.assertTrue(current_handle_link_down)
429
        msg = f"{evc} deployed after link down."
430
        log_mocked.debug.assert_called_with(msg)
431
432
    @patch("napps.kytos.mef_eline.models.evc.EVCDeploy.deploy")
433
    @patch("napps.kytos.mef_eline.models.evc.LinkProtection.deploy_to")
434
    def test_handle_link_up_case_1(self, deploy_to_mocked, deploy_mocked):
435
        """Test if handle link up do nothing when is using primary path."""
436
        deploy_mocked.return_value = True
437
        deploy_to_mocked.return_value = True
438
        primary_path = [
439
            get_link_mocked(
440
                endpoint_a_port=9,
441
                endpoint_b_port=10,
442
                metadata={"s_vlan": 5},
443
                status=EntityStatus.UP,
444
            ),
445
            get_link_mocked(
446
                endpoint_a_port=11,
447
                endpoint_b_port=12,
448
                metadata={"s_vlan": 6},
449
                status=EntityStatus.UP,
450
            ),
451
        ]
452
        backup_path = [
453
            get_link_mocked(
454
                endpoint_a_port=9,
455
                endpoint_b_port=14,
456
                metadata={"s_vlan": 5},
457
                status=EntityStatus.UP,
458
            ),
459
            get_link_mocked(
460
                endpoint_a_port=15,
461
                endpoint_b_port=12,
462
                metadata={"s_vlan": 6},
463
                status=EntityStatus.UP,
464
            ),
465
        ]
466
        attributes = {
467
            "controller": get_controller_mock(),
468
            "name": "circuit_9",
469
            "uni_a": get_uni_mocked(is_valid=True),
470
            "uni_z": get_uni_mocked(is_valid=True),
471
            "primary_path": primary_path,
472
            "backup_path": backup_path,
473
            "enabled": True,
474
            "dynamic_backup_path": True,
475
        }
476
477
        evc = EVC(**attributes)
478
        evc.current_path = evc.primary_path
479
        deploy_to_mocked.reset_mock()
480
        current_handle_link_up = evc.handle_link_up(backup_path[0])
481
        self.assertEqual(deploy_mocked.call_count, 0)
482
        self.assertEqual(deploy_to_mocked.call_count, 0)
483
        self.assertTrue(current_handle_link_up)
484
485
    @patch("napps.kytos.mef_eline.models.evc.EVCDeploy.deploy")
486
    @patch("napps.kytos.mef_eline.models.evc.EVCDeploy.deploy_to_path")
487
    @patch("napps.kytos.mef_eline.models.path.Path.status", EntityStatus.UP)
488
    def test_handle_link_up_case_2(self, deploy_to_path_mocked, deploy_mocked):
489
        """Test if it is changing from backup_path to primary_path."""
490
        deploy_mocked.return_value = True
491
        deploy_to_path_mocked.return_value = True
492
        primary_path = [
493
            get_link_mocked(
494
                endpoint_a_port=9,
495
                endpoint_b_port=10,
496
                metadata={"s_vlan": 5},
497
                status=EntityStatus.UP,
498
            ),
499
            get_link_mocked(
500
                endpoint_a_port=11,
501
                endpoint_b_port=12,
502
                metadata={"s_vlan": 6},
503
                status=EntityStatus.UP,
504
            ),
505
        ]
506
        backup_path = [
507
            get_link_mocked(
508
                endpoint_a_port=9,
509
                endpoint_b_port=14,
510
                metadata={"s_vlan": 5},
511
                status=EntityStatus.UP,
512
            ),
513
            get_link_mocked(
514
                endpoint_a_port=15,
515
                endpoint_b_port=12,
516
                metadata={"s_vlan": 6},
517
                status=EntityStatus.UP,
518
            ),
519
        ]
520
        attributes = {
521
            "controller": get_controller_mock(),
522
            "name": "circuit_10",
523
            "uni_a": get_uni_mocked(is_valid=True),
524
            "uni_z": get_uni_mocked(is_valid=True),
525
            "primary_path": primary_path,
526
            "backup_path": backup_path,
527
            "enabled": True,
528
            "dynamic_backup_path": True,
529
        }
530
531
        evc = EVC(**attributes)
532
        evc.current_path = evc.backup_path
533
        deploy_to_path_mocked.reset_mock()
534
        current_handle_link_up = evc.handle_link_up(primary_path[0])
535
        self.assertEqual(deploy_mocked.call_count, 0)
536
        self.assertEqual(deploy_to_path_mocked.call_count, 1)
537
        deploy_to_path_mocked.assert_called_once_with(evc.primary_path)
538
        self.assertTrue(current_handle_link_up)
539
540
    @patch("napps.kytos.mef_eline.models.evc.EVCDeploy.deploy")
541
    @patch("napps.kytos.mef_eline.models.evc.EVCDeploy.deploy_to_path")
542
    @patch(GET_BEST_PATH)
543
    @patch("napps.kytos.mef_eline.models.evc.EVC._install_nni_flows")
544
    @patch("napps.kytos.mef_eline.models.evc.EVC._install_uni_flows")
545
    @patch("napps.kytos.mef_eline.models.path.Path.status", EntityStatus.UP)
546
    def test_handle_link_up_case_3(
547
        self,
548
        _install_uni_flows_mocked,
549
        _install_nni_flows_mocked,
550
        get_best_path_mocked,
551
        deploy_to_path_mocked,
552
        deploy_mocked,
553
    ):
554
        """Test if it is deployed after the backup is up."""
555
        deploy_mocked.return_value = True
556
        deploy_to_path_mocked.return_value = True
557
        primary_path = [
558
            get_link_mocked(
559
                endpoint_a_port=9,
560
                endpoint_b_port=10,
561
                metadata={"s_vlan": 5},
562
                status=EntityStatus.DOWN,
563
            ),
564
            get_link_mocked(
565
                endpoint_a_port=11,
566
                endpoint_b_port=12,
567
                metadata={"s_vlan": 6},
568
                status=EntityStatus.UP,
569
            ),
570
        ]
571
        backup_path = [
572
            get_link_mocked(
573
                endpoint_a_port=9,
574
                endpoint_b_port=14,
575
                metadata={"s_vlan": 5},
576
                status=EntityStatus.DOWN,
577
            ),
578
            get_link_mocked(
579
                endpoint_a_port=15,
580
                endpoint_b_port=12,
581
                metadata={"s_vlan": 6},
582
                status=EntityStatus.UP,
583
            ),
584
        ]
585
        attributes = {
586
            "controller": get_controller_mock(),
587
            "name": "circuit_11",
588
            "uni_a": get_uni_mocked(is_valid=True),
589
            "uni_z": get_uni_mocked(is_valid=True),
590
            "primary_path": primary_path,
591
            "backup_path": backup_path,
592
            "enabled": True,
593
            "dynamic_backup_path": True,
594
        }
595
596
        evc = EVC(**attributes)
597
598
        evc.current_path = Path([])
599
        deploy_to_path_mocked.reset_mock()
600
        current_handle_link_up = evc.handle_link_up(backup_path[0])
601
602
        self.assertEqual(get_best_path_mocked.call_count, 0)
603
        self.assertEqual(deploy_mocked.call_count, 0)
604
        self.assertEqual(deploy_to_path_mocked.call_count, 1)
605
        deploy_to_path_mocked.assert_called_once_with(evc.backup_path)
606
        self.assertTrue(current_handle_link_up)
607
608
    @patch("napps.kytos.mef_eline.models.evc.EVCDeploy.deploy_to_path")
609
    @patch(GET_BEST_PATH)
610
    @patch("napps.kytos.mef_eline.models.evc.EVC._install_nni_flows")
611
    @patch("napps.kytos.mef_eline.models.evc.EVC._install_uni_flows")
612
    @patch("napps.kytos.mef_eline.models.path.Path.status", EntityStatus.DOWN)
613
    def test_handle_link_up_case_4(self, *args):
614
        """Test if not path is found a dynamic path is used."""
615
        (
616
            _install_uni_flows_mocked,
617
            _install_nni_flows_mocked,
618
            get_best_path_mocked,
619
            deploy_to_path_mocked,
620
        ) = args
621
622
        deploy_to_path_mocked.return_value = True
623
624
        primary_path = [
625
            get_link_mocked(
626
                endpoint_a_port=9,
627
                endpoint_b_port=10,
628
                metadata={"s_vlan": 5},
629
                status=EntityStatus.UP,
630
            ),
631
            get_link_mocked(
632
                endpoint_a_port=11,
633
                endpoint_b_port=12,
634
                metadata={"s_vlan": 6},
635
                status=EntityStatus.DOWN,
636
            ),
637
        ]
638
        backup_path = [
639
            get_link_mocked(
640
                endpoint_a_port=13,
641
                endpoint_b_port=14,
642
                metadata={"s_vlan": 5},
643
                status=EntityStatus.DOWN,
644
            ),
645
            get_link_mocked(
646
                endpoint_a_port=11,
647
                endpoint_b_port=12,
648
                metadata={"s_vlan": 6},
649
                status=EntityStatus.DOWN,
650
            ),
651
        ]
652
653
        # Setup best_path mock
654
        best_path = Path()
655
        best_path.append(primary_path[0])
656
        get_best_path_mocked.return_value = best_path
657
658
        attributes = {
659
            "controller": get_controller_mock(),
660
            "name": "circuit_12",
661
            "uni_a": get_uni_mocked(is_valid=True),
662
            "uni_z": get_uni_mocked(is_valid=True),
663
            "primary_path": primary_path,
664
            "backup_path": backup_path,
665
            "enabled": True,
666
            "dynamic_backup_path": True,
667
        }
668
669
        evc = EVC(**attributes)
670
        evc.current_path = Path([])
671
672
        deploy_to_path_mocked.reset_mock()
673
        current_handle_link_up = evc.handle_link_up(backup_path[0])
674
675
        self.assertEqual(get_best_path_mocked.call_count, 0)
676
        self.assertEqual(deploy_to_path_mocked.call_count, 1)
677
        deploy_to_path_mocked.assert_called_once_with()
678
        self.assertTrue(current_handle_link_up)
679
680
    def test_handle_link_up_case_5(self):
681
        """Test handle_link_up method."""
682
        return_false_mock = MagicMock(return_value=False)
683
        self.evc.is_using_primary_path = return_false_mock
684
        self.evc.primary_path.is_affected_by_link = return_false_mock
685
        self.evc.is_using_backup_path = MagicMock(return_value=True)
686
        self.assertTrue(self.evc.handle_link_up(MagicMock()))
687
688
        # not possible to deploy this evc (it will not benefit from link up)
689
        self.evc.is_using_backup_path = return_false_mock
690
        self.evc.is_using_dynamic_path = return_false_mock
691
        self.evc.backup_path.is_affected_by_link = return_false_mock
692
        self.evc.dynamic_backup_path = True
693
        self.evc.deploy_to_path = return_false_mock
694
        self.assertTrue(self.evc.handle_link_up(MagicMock()))
695