Passed
Pull Request — master (#359)
by
unknown
03:36
created

TestLinkProtection.setup_method()   A

Complexity

Conditions 1

Size

Total Lines 41
Code Lines 33

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 5
CRAP Score 1

Importance

Changes 0
Metric Value
cc 1
eloc 33
nop 1
dl 0
loc 41
ccs 5
cts 5
cp 1
crap 1
rs 9.0879
c 0
b 0
f 0
1
"""Module to test the LinkProtection class."""
2 1
import sys
3 1
from unittest.mock import MagicMock, patch
4
5 1
from kytos.core.common import EntityStatus
6 1
from kytos.lib.helpers import get_controller_mock
7 1
from napps.kytos.mef_eline.models import EVC, Path  # NOQA pycodestyle
8 1
from napps.kytos.mef_eline.tests.helpers import (
9
    get_link_mocked,
10
    get_uni_mocked,
11
    get_mocked_requests,
12
)  # NOQA pycodestyle
13
14
15 1
sys.path.insert(0, "/var/lib/kytos/napps/..")
16
17
18 1
DEPLOY_TO_PRIMARY_PATH = (
19
    "napps.kytos.mef_eline.models.evc.LinkProtection.deploy_to_primary_path"
20
)
21 1
DEPLOY_TO_BACKUP_PATH = (
22
    "napps.kytos.mef_eline.models.evc.LinkProtection.deploy_to_backup_path"
23
)
24 1
GET_BEST_PATH = (
25
    "napps.kytos.mef_eline.models.path.DynamicPathManager.get_best_path"
26
)
27
28
29 1
class TestLinkProtection():  # pylint: disable=too-many-public-methods
30
    """Tests to validate LinkProtection class."""
31
32 1
    def setup_method(self):
33
        """Set up method"""
34 1
        primary_path = [
35
            get_link_mocked(
36
                endpoint_a_port=9,
37
                endpoint_b_port=10,
38
                metadata={"s_vlan": 5},
39
                status=EntityStatus.UP,
40
            ),
41
            get_link_mocked(
42
                endpoint_a_port=11,
43
                endpoint_b_port=12,
44
                metadata={"s_vlan": 6},
45
                status=EntityStatus.DOWN,
46
            ),
47
        ]
48 1
        backup_path = [
49
            get_link_mocked(
50
                endpoint_a_port=13,
51
                endpoint_b_port=14,
52
                metadata={"s_vlan": 5},
53
                status=EntityStatus.DOWN,
54
            ),
55
            get_link_mocked(
56
                endpoint_a_port=11,
57
                endpoint_b_port=12,
58
                metadata={"s_vlan": 6},
59
                status=EntityStatus.DOWN,
60
            ),
61
        ]
62 1
        attributes = {
63
            "controller": get_controller_mock(),
64
            "name": "circuit_1",
65
            "uni_a": get_uni_mocked(is_valid=True),
66
            "uni_z": get_uni_mocked(is_valid=True),
67
            "primary_path": primary_path,
68
            "backup_path": backup_path,
69
            "enabled": True,
70
            "dynamic_backup_path": True,
71
        }
72 1
        self.evc = EVC(**attributes)
73
74 1
    async def test_is_using_backup_path(self):
75
        """Test test is using backup path."""
76
77 1
        attributes = {
78
            "controller": get_controller_mock(),
79
            "name": "circuit_1",
80
            "uni_a": get_uni_mocked(is_valid=True),
81
            "uni_z": get_uni_mocked(is_valid=True),
82
            "backup_path": [
83
                get_link_mocked(
84
                    endpoint_a_port=10,
85
                    endpoint_b_port=9,
86
                    metadata={"s_vlan": 5},
87
                ),
88
                get_link_mocked(
89
                    endpoint_a_port=12,
90
                    endpoint_b_port=11,
91
                    metadata={"s_vlan": 6},
92
                ),
93
            ],
94
        }
95
96 1
        evc = EVC(**attributes)
97 1
        assert evc.is_using_backup_path() is False
98 1
        evc.current_path = evc.backup_path
99 1
        assert evc.is_using_backup_path()
100
101 1
    async def test_is_using_primary_path(self):
102
        """Test test is using primary path."""
103 1
        primary_path = [
104
            get_link_mocked(
105
                endpoint_a_port=10, endpoint_b_port=9, metadata={"s_vlan": 5}
106
            ),
107
            get_link_mocked(
108
                endpoint_a_port=12, endpoint_b_port=11, metadata={"s_vlan": 6}
109
            ),
110
        ]
111
112 1
        attributes = {
113
            "controller": get_controller_mock(),
114
            "name": "circuit_2",
115
            "uni_a": get_uni_mocked(is_valid=True),
116
            "uni_z": get_uni_mocked(is_valid=True),
117
            "primary_path": primary_path,
118
        }
119 1
        evc = EVC(**attributes)
120 1
        assert evc.is_using_primary_path() is False
121 1
        evc.current_path = evc.primary_path
122 1
        assert evc.is_using_primary_path()
123
124 1
    @patch("napps.kytos.mef_eline.models.evc.log")
125 1
    async def test_deploy_to_case_1(self, log_mocked):
126
        """Test if the path is equal to current_path."""
127 1
        primary_path = [
128
            get_link_mocked(
129
                endpoint_a_port=10, endpoint_b_port=9, metadata={"s_vlan": 5}
130
            ),
131
            get_link_mocked(
132
                endpoint_a_port=12, endpoint_b_port=11, metadata={"s_vlan": 6}
133
            ),
134
        ]
135 1
        attributes = {
136
            "controller": get_controller_mock(),
137
            "name": "circuit_3",
138
            "uni_a": get_uni_mocked(is_valid=True),
139
            "uni_z": get_uni_mocked(is_valid=True),
140
            "primary_path": primary_path,
141
        }
142 1
        evc = EVC(**attributes)
143 1
        evc.current_path = evc.primary_path
144
145 1
        expected_deployed = evc.deploy_to("primary_path", evc.primary_path)
146 1
        expected_msg = "primary_path is equal to current_path."
147 1
        log_mocked.debug.assert_called_with(expected_msg)
148 1
        assert expected_deployed
149
150
    # pylint: disable=too-many-arguments
151 1
    @patch("napps.kytos.mef_eline.models.evc.notify_link_available_tags")
152 1
    @patch("requests.post")
153 1
    @patch("napps.kytos.mef_eline.controllers.ELineController.upsert_evc")
154 1
    @patch("napps.kytos.mef_eline.models.evc.EVCDeploy.deploy")
155 1
    @patch("napps.kytos.mef_eline.models.evc.EVC._install_nni_flows")
156 1
    @patch("napps.kytos.mef_eline.models.evc.EVC._install_uni_flows")
157 1
    @patch("napps.kytos.mef_eline.models.path.Path.status", EntityStatus.UP)
158 1
    async def test_deploy_to_case_2(
159
        self,
160
        install_uni_flows_mocked,
161
        install_nni_flows_mocked,
162
        deploy_mocked,
163
        _,
164
        requests_mock,
165
        notify_mock
166
    ):
167
        """Test deploy with all links up."""
168 1
        deploy_mocked.return_value = True
169 1
        response = MagicMock()
170 1
        response.status_code = 201
171 1
        requests_mock.return_value = response
172
173 1
        primary_path = [
174
            get_link_mocked(status=EntityStatus.UP),
175
            get_link_mocked(status=EntityStatus.UP),
176
        ]
177 1
        attributes = {
178
            "controller": get_controller_mock(),
179
            "name": "circuit_4",
180
            "uni_a": get_uni_mocked(is_valid=True),
181
            "uni_z": get_uni_mocked(is_valid=True),
182
            "primary_path": primary_path,
183
            "enabled": True,
184
        }
185 1
        evc = EVC(**attributes)
186
187 1
        deployed = evc.deploy_to("primary_path", evc.primary_path)
188 1
        install_uni_flows_mocked.assert_called_with(evc.primary_path)
189 1
        install_nni_flows_mocked.assert_called_with(evc.primary_path)
190 1
        assert deployed
191 1
        notify_mock.assert_called()
192
193 1
    @patch("requests.get", side_effect=get_mocked_requests)
194 1
    async def test_deploy_to_case_3(self, requests_mocked):
195
        # pylint: disable=unused-argument
196
        """Test deploy with one link down."""
197 1
        link1 = get_link_mocked()
198 1
        link2 = get_link_mocked()
199 1
        link1.id = "abc"
200 1
        link2.id = "def"
201 1
        primary_path = [link1, link2]
202 1
        attributes = {
203
            "controller": get_controller_mock(),
204
            "name": "circuit_5",
205
            "uni_a": get_uni_mocked(is_valid=True),
206
            "uni_z": get_uni_mocked(is_valid=True),
207
            "primary_path": primary_path,
208
            "enabled": True,
209
        }
210 1
        evc = EVC(**attributes)
211
212 1
        deployed = evc.deploy_to("primary_path", evc.primary_path)
213 1
        assert deployed is False
214
215 1
    @patch("napps.kytos.mef_eline.models.evc.log")
216 1
    @patch("napps.kytos.mef_eline.models.evc.EVCDeploy._send_flow_mods")
217 1
    @patch(DEPLOY_TO_BACKUP_PATH)
218 1
    @patch("napps.kytos.mef_eline.models.evc.EVCDeploy.deploy")
219 1
    @patch("napps.kytos.mef_eline.models.path.Path.status")
220 1
    async def test_handle_link_down_case_1(
221
        self,
222
        path_status_mocked,
223
        deploy_mocked,
224
        deploy_to_mocked,
225
        _send_flow_mods_mocked,
226
        log_mocked,
227
    ):
228
        """Test if deploy_to backup path is called."""
229 1
        deploy_mocked.return_value = True
230 1
        path_status_mocked.side_effect = [EntityStatus.DOWN, EntityStatus.UP]
231
232 1
        self.evc.current_path = self.evc.primary_path
233 1
        self.evc.activate()
234 1
        deploy_to_mocked.reset_mock()
235 1
        current_handle_link_down = self.evc.handle_link_down()
236 1
        assert deploy_mocked.call_count == 0
237 1
        deploy_to_mocked.assert_called_once()
238
239 1
        assert current_handle_link_down
240 1
        msg = f"{self.evc} deployed after link down."
241 1
        log_mocked.debug.assert_called_once_with(msg)
242
243 1 View Code Duplication
    @patch("napps.kytos.mef_eline.models.evc.log")
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
244 1
    @patch("napps.kytos.mef_eline.models.evc.EVCDeploy.deploy")
245 1
    @patch(DEPLOY_TO_PRIMARY_PATH)
246 1
    @patch("napps.kytos.mef_eline.models.path.Path.status")
247 1
    async def test_handle_link_down_case_2(
248
        self, path_status_mocked, deploy_to_mocked, deploy_mocked, log_mocked
249
    ):
250
        """Test if deploy_to backup path is called."""
251 1
        deploy_mocked.return_value = True
252 1
        deploy_to_mocked.return_value = True
253 1
        path_status_mocked.side_effect = [EntityStatus.UP, EntityStatus.DOWN]
254 1
        primary_path = [
255
            get_link_mocked(
256
                endpoint_a_port=7,
257
                endpoint_b_port=8,
258
                metadata={"s_vlan": 5},
259
                status=EntityStatus.UP,
260
            ),
261
            get_link_mocked(
262
                endpoint_a_port=11,
263
                endpoint_b_port=12,
264
                metadata={"s_vlan": 6},
265
                status=EntityStatus.UP,
266
            ),
267
        ]
268 1
        backup_path = [
269
            get_link_mocked(
270
                endpoint_a_port=7,
271
                endpoint_b_port=10,
272
                metadata={"s_vlan": 5},
273
                status=EntityStatus.DOWN,
274
            ),
275
            get_link_mocked(
276
                endpoint_a_port=15,
277
                endpoint_b_port=12,
278
                metadata={"s_vlan": 6},
279
                status=EntityStatus.UP,
280
            ),
281
        ]
282 1
        attributes = {
283
            "controller": get_controller_mock(),
284
            "name": "circuit_13",
285
            "uni_a": get_uni_mocked(is_valid=True),
286
            "uni_z": get_uni_mocked(is_valid=True),
287
            "primary_path": primary_path,
288
            "backup_path": backup_path,
289
            "enabled": True,
290
        }
291
292 1
        evc = EVC(**attributes)
293 1
        evc.current_path = evc.backup_path
294 1
        deploy_to_mocked.reset_mock()
295 1
        current_handle_link_down = evc.handle_link_down()
296 1
        assert deploy_mocked.call_count == 0
297 1
        deploy_to_mocked.assert_called_once()
298 1
        assert current_handle_link_down
299 1
        msg = f"{evc} deployed after link down."
300 1
        log_mocked.debug.assert_called_once_with(msg)
301
302 1
    @patch("napps.kytos.mef_eline.controllers.ELineController.upsert_evc")
303 1
    @patch("napps.kytos.mef_eline.models.evc.log")
304 1
    @patch("napps.kytos.mef_eline.models.evc.EVCDeploy.deploy")
305 1
    @patch(DEPLOY_TO_PRIMARY_PATH)
306 1
    @patch("napps.kytos.mef_eline.models.path.DynamicPathManager.get_paths")
307 1
    @patch("napps.kytos.mef_eline.models.path.Path.status", EntityStatus.DOWN)
308 1
    async def test_handle_link_down_case_3(
309
        self, get_paths_mocked, deploy_to_mocked, deploy_mocked, log_mocked, _
310
    ):
311
        """Test if circuit without dynamic path is return failed."""
312 1
        deploy_mocked.return_value = False
313 1
        deploy_to_mocked.return_value = False
314 1
        primary_path = [
315
            get_link_mocked(
316
                endpoint_a_port=9,
317
                endpoint_b_port=10,
318
                metadata={"s_vlan": 5},
319
                status=EntityStatus.DOWN,
320
            ),
321
            get_link_mocked(
322
                endpoint_a_port=11,
323
                endpoint_b_port=12,
324
                metadata={"s_vlan": 6},
325
                status=EntityStatus.UP,
326
            ),
327
        ]
328 1
        backup_path = [
329
            get_link_mocked(
330
                endpoint_a_port=9,
331
                endpoint_b_port=10,
332
                metadata={"s_vlan": 5},
333
                status=EntityStatus.DOWN,
334
            ),
335
            get_link_mocked(
336
                endpoint_a_port=13,
337
                endpoint_b_port=14,
338
                metadata={"s_vlan": 6},
339
                status=EntityStatus.UP,
340
            ),
341
        ]
342 1
        attributes = {
343
            "controller": get_controller_mock(),
344
            "name": "circuit_7",
345
            "uni_a": get_uni_mocked(is_valid=True),
346
            "uni_z": get_uni_mocked(is_valid=True),
347
            "primary_path": primary_path,
348
            "backup_path": backup_path,
349
            "enabled": True,
350
        }
351
352 1
        evc = EVC(**attributes)
353 1
        evc.current_path = evc.backup_path
354 1
        deploy_to_mocked.reset_mock()
355 1
        current_handle_link_down = evc.handle_link_down()
356
357 1
        assert get_paths_mocked.call_count == 0
358 1
        assert deploy_mocked.call_count == 0
359 1
        assert deploy_to_mocked.call_count == 1
360
361 1
        assert current_handle_link_down is False
362 1
        msg = f"Failed to re-deploy {evc} after link down."
363 1
        log_mocked.debug.assert_called_once_with(msg)
364
365 1 View Code Duplication
    @patch("napps.kytos.mef_eline.models.evc.log")
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
366 1
    @patch("napps.kytos.mef_eline.models.evc.EVCDeploy.deploy_to_path")
367 1
    @patch("napps.kytos.mef_eline.models.evc.EVCDeploy._send_flow_mods")
368 1
    @patch(DEPLOY_TO_PRIMARY_PATH)
369 1
    @patch("napps.kytos.mef_eline.models.path.Path.status", EntityStatus.DOWN)
370 1
    async def test_handle_link_down_case_4(
371
        self,
372
        deploy_to_mocked,
373
        _send_flow_mods_mocked,
374
        deploy_mocked,
375
        log_mocked,
376
    ):
377
        """Test if circuit with dynamic path is return success."""
378 1
        deploy_mocked.return_value = True
379 1
        deploy_to_mocked.return_value = False
380 1
        primary_path = [
381
            get_link_mocked(
382
                endpoint_a_port=9,
383
                endpoint_b_port=10,
384
                metadata={"s_vlan": 5},
385
                status=EntityStatus.DOWN,
386
            ),
387
            get_link_mocked(
388
                endpoint_a_port=11,
389
                endpoint_b_port=12,
390
                metadata={"s_vlan": 6},
391
                status=EntityStatus.UP,
392
            ),
393
        ]
394 1
        backup_path = [
395
            get_link_mocked(
396
                endpoint_a_port=9,
397
                endpoint_b_port=10,
398
                metadata={"s_vlan": 5},
399
                status=EntityStatus.DOWN,
400
            ),
401
            get_link_mocked(
402
                endpoint_a_port=13,
403
                endpoint_b_port=14,
404
                metadata={"s_vlan": 6},
405
                status=EntityStatus.UP,
406
            ),
407
        ]
408 1
        attributes = {
409
            "controller": get_controller_mock(),
410
            "name": "circuit_8",
411
            "uni_a": get_uni_mocked(is_valid=True),
412
            "uni_z": get_uni_mocked(is_valid=True),
413
            "primary_path": primary_path,
414
            "backup_path": backup_path,
415
            "enabled": True,
416
            "dynamic_backup_path": True,
417
        }
418
419 1
        evc = EVC(**attributes)
420 1
        evc.current_path = evc.backup_path
421
422 1
        deploy_to_mocked.reset_mock()
423 1
        current_handle_link_down = evc.handle_link_down()
424 1
        assert deploy_to_mocked.call_count == 1
425
426 1
        assert current_handle_link_down
427 1
        msg = f"{evc} deployed after link down."
428 1
        log_mocked.debug.assert_called_with(msg)
429
430 1
    @patch("napps.kytos.mef_eline.models.evc.EVCDeploy.deploy")
431 1
    @patch("napps.kytos.mef_eline.models.evc.LinkProtection.deploy_to")
432 1
    async def test_handle_link_up_case_1(
433
        self,
434
        deploy_to_mocked,
435
        deploy_mocked
436
    ):
437
        """Test if handle link up do nothing when is using primary path."""
438 1
        deploy_mocked.return_value = True
439 1
        deploy_to_mocked.return_value = True
440 1
        primary_path = [
441
            get_link_mocked(
442
                endpoint_a_port=9,
443
                endpoint_b_port=10,
444
                metadata={"s_vlan": 5},
445
                status=EntityStatus.UP,
446
            ),
447
            get_link_mocked(
448
                endpoint_a_port=11,
449
                endpoint_b_port=12,
450
                metadata={"s_vlan": 6},
451
                status=EntityStatus.UP,
452
            ),
453
        ]
454 1
        backup_path = [
455
            get_link_mocked(
456
                endpoint_a_port=9,
457
                endpoint_b_port=14,
458
                metadata={"s_vlan": 5},
459
                status=EntityStatus.UP,
460
            ),
461
            get_link_mocked(
462
                endpoint_a_port=15,
463
                endpoint_b_port=12,
464
                metadata={"s_vlan": 6},
465
                status=EntityStatus.UP,
466
            ),
467
        ]
468 1
        attributes = {
469
            "controller": get_controller_mock(),
470
            "name": "circuit_9",
471
            "uni_a": get_uni_mocked(is_valid=True),
472
            "uni_z": get_uni_mocked(is_valid=True),
473
            "primary_path": primary_path,
474
            "backup_path": backup_path,
475
            "enabled": True,
476
            "dynamic_backup_path": True,
477
        }
478
479 1
        evc = EVC(**attributes)
480 1
        evc.current_path = evc.primary_path
481 1
        deploy_to_mocked.reset_mock()
482 1
        current_handle_link_up = evc.handle_link_up(backup_path[0])
483 1
        assert deploy_mocked.call_count == 0
484 1
        assert deploy_to_mocked.call_count == 0
485 1
        assert current_handle_link_up
486
487 1
    @patch("napps.kytos.mef_eline.models.evc.EVCDeploy.deploy")
488 1
    @patch("napps.kytos.mef_eline.models.evc.EVCDeploy.deploy_to_path")
489 1
    @patch("napps.kytos.mef_eline.models.path.Path.status", EntityStatus.UP)
490 1
    async def test_handle_link_up_case_2(
491
        self,
492
        deploy_to_path_mocked,
493
        deploy_mocked
494
    ):
495
        """Test if it is changing from backup_path to primary_path."""
496 1
        deploy_mocked.return_value = True
497 1
        deploy_to_path_mocked.return_value = True
498 1
        primary_path = [
499
            get_link_mocked(
500
                endpoint_a_port=9,
501
                endpoint_b_port=10,
502
                metadata={"s_vlan": 5},
503
                status=EntityStatus.UP,
504
            ),
505
            get_link_mocked(
506
                endpoint_a_port=11,
507
                endpoint_b_port=12,
508
                metadata={"s_vlan": 6},
509
                status=EntityStatus.UP,
510
            ),
511
        ]
512 1
        backup_path = [
513
            get_link_mocked(
514
                endpoint_a_port=9,
515
                endpoint_b_port=14,
516
                metadata={"s_vlan": 5},
517
                status=EntityStatus.UP,
518
            ),
519
            get_link_mocked(
520
                endpoint_a_port=15,
521
                endpoint_b_port=12,
522
                metadata={"s_vlan": 6},
523
                status=EntityStatus.UP,
524
            ),
525
        ]
526 1
        attributes = {
527
            "controller": get_controller_mock(),
528
            "name": "circuit_10",
529
            "uni_a": get_uni_mocked(is_valid=True),
530
            "uni_z": get_uni_mocked(is_valid=True),
531
            "primary_path": primary_path,
532
            "backup_path": backup_path,
533
            "enabled": True,
534
            "dynamic_backup_path": True,
535
        }
536
537 1
        evc = EVC(**attributes)
538 1
        evc.current_path = evc.backup_path
539 1
        deploy_to_path_mocked.reset_mock()
540 1
        current_handle_link_up = evc.handle_link_up(primary_path[0])
541 1
        assert deploy_mocked.call_count == 0
542 1
        assert deploy_to_path_mocked.call_count == 1
543 1
        deploy_to_path_mocked.assert_called_once_with(evc.primary_path)
544 1
        assert current_handle_link_up
545
546 1
    @patch("napps.kytos.mef_eline.models.evc.EVCDeploy.deploy")
547 1
    @patch("napps.kytos.mef_eline.models.evc.EVCDeploy.deploy_to_path")
548 1
    @patch(GET_BEST_PATH)
549 1
    @patch("napps.kytos.mef_eline.models.evc.EVC._install_nni_flows")
550 1
    @patch("napps.kytos.mef_eline.models.evc.EVC._install_uni_flows")
551 1
    @patch("napps.kytos.mef_eline.models.path.Path.status", EntityStatus.UP)
552 1
    async def test_handle_link_up_case_3(
553
        self,
554
        _install_uni_flows_mocked,
555
        _install_nni_flows_mocked,
556
        get_best_path_mocked,
557
        deploy_to_path_mocked,
558
        deploy_mocked,
559
    ):
560
        """Test if it is deployed after the backup is up."""
561 1
        deploy_mocked.return_value = True
562 1
        deploy_to_path_mocked.return_value = True
563 1
        primary_path = [
564
            get_link_mocked(
565
                endpoint_a_port=9,
566
                endpoint_b_port=10,
567
                metadata={"s_vlan": 5},
568
                status=EntityStatus.DOWN,
569
            ),
570
            get_link_mocked(
571
                endpoint_a_port=11,
572
                endpoint_b_port=12,
573
                metadata={"s_vlan": 6},
574
                status=EntityStatus.UP,
575
            ),
576
        ]
577 1
        backup_path = [
578
            get_link_mocked(
579
                endpoint_a_port=9,
580
                endpoint_b_port=14,
581
                metadata={"s_vlan": 5},
582
                status=EntityStatus.DOWN,
583
            ),
584
            get_link_mocked(
585
                endpoint_a_port=15,
586
                endpoint_b_port=12,
587
                metadata={"s_vlan": 6},
588
                status=EntityStatus.UP,
589
            ),
590
        ]
591 1
        attributes = {
592
            "controller": get_controller_mock(),
593
            "name": "circuit_11",
594
            "uni_a": get_uni_mocked(is_valid=True),
595
            "uni_z": get_uni_mocked(is_valid=True),
596
            "primary_path": primary_path,
597
            "backup_path": backup_path,
598
            "enabled": True,
599
            "dynamic_backup_path": True,
600
        }
601
602 1
        evc = EVC(**attributes)
603
604 1
        evc.current_path = Path([])
605 1
        deploy_to_path_mocked.reset_mock()
606 1
        current_handle_link_up = evc.handle_link_up(backup_path[0])
607
608 1
        assert get_best_path_mocked.call_count == 0
609 1
        assert deploy_mocked.call_count == 0
610 1
        assert deploy_to_path_mocked.call_count == 1
611 1
        deploy_to_path_mocked.assert_called_once_with(evc.backup_path)
612 1
        assert current_handle_link_up
613
614 1
    @patch("napps.kytos.mef_eline.models.evc.EVCDeploy.deploy_to_path")
615 1
    @patch(GET_BEST_PATH)
616 1
    @patch("napps.kytos.mef_eline.models.evc.EVC._install_nni_flows")
617 1
    @patch("napps.kytos.mef_eline.models.evc.EVC._install_uni_flows")
618 1
    @patch("napps.kytos.mef_eline.models.path.Path.status", EntityStatus.DOWN)
619 1
    async def test_handle_link_up_case_4(self, *args):
620
        """Test if not path is found a dynamic path is used."""
621 1
        (
622
            _install_uni_flows_mocked,
623
            _install_nni_flows_mocked,
624
            get_best_path_mocked,
625
            deploy_to_path_mocked,
626
        ) = args
627
628 1
        deploy_to_path_mocked.return_value = True
629
630 1
        primary_path = [
631
            get_link_mocked(
632
                endpoint_a_port=9,
633
                endpoint_b_port=10,
634
                metadata={"s_vlan": 5},
635
                status=EntityStatus.UP,
636
            ),
637
            get_link_mocked(
638
                endpoint_a_port=11,
639
                endpoint_b_port=12,
640
                metadata={"s_vlan": 6},
641
                status=EntityStatus.DOWN,
642
            ),
643
        ]
644 1
        backup_path = [
645
            get_link_mocked(
646
                endpoint_a_port=13,
647
                endpoint_b_port=14,
648
                metadata={"s_vlan": 5},
649
                status=EntityStatus.DOWN,
650
            ),
651
            get_link_mocked(
652
                endpoint_a_port=11,
653
                endpoint_b_port=12,
654
                metadata={"s_vlan": 6},
655
                status=EntityStatus.DOWN,
656
            ),
657
        ]
658
659
        # Setup best_path mock
660 1
        best_path = Path()
661 1
        best_path.append(primary_path[0])
662 1
        get_best_path_mocked.return_value = best_path
663
664 1
        attributes = {
665
            "controller": get_controller_mock(),
666
            "name": "circuit_12",
667
            "uni_a": get_uni_mocked(is_valid=True),
668
            "uni_z": get_uni_mocked(is_valid=True),
669
            "primary_path": primary_path,
670
            "backup_path": backup_path,
671
            "enabled": True,
672
            "dynamic_backup_path": True,
673
        }
674
675 1
        evc = EVC(**attributes)
676 1
        evc.current_path = Path([])
677
678 1
        deploy_to_path_mocked.reset_mock()
679 1
        current_handle_link_up = evc.handle_link_up(backup_path[0])
680
681 1
        assert get_best_path_mocked.call_count == 0
682 1
        assert deploy_to_path_mocked.call_count == 1
683 1
        deploy_to_path_mocked.assert_called_once_with()
684 1
        assert current_handle_link_up
685
686 1
    async def test_handle_link_up_case_5(self):
687
        """Test handle_link_up method."""
688 1
        return_false_mock = MagicMock(return_value=False)
689 1
        self.evc.is_using_primary_path = return_false_mock
690 1
        self.evc.primary_path.is_affected_by_link = return_false_mock
691 1
        self.evc.is_using_backup_path = MagicMock(return_value=True)
692 1
        assert self.evc.handle_link_up(MagicMock())
693
694
        # not possible to deploy this evc (it will not benefit from link up)
695 1
        self.evc.is_using_backup_path = return_false_mock
696 1
        self.evc.is_using_dynamic_path = return_false_mock
697 1
        self.evc.backup_path.is_affected_by_link = return_false_mock
698 1
        self.evc.dynamic_backup_path = True
699 1
        self.evc.deploy_to_path = return_false_mock
700
        assert self.evc.handle_link_up(MagicMock())
701