TestLinkProtection.test_is_uni_interface_active()   A
last analyzed

Complexity

Conditions 1

Size

Total Lines 33
Code Lines 26

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 22
CRAP Score 1

Importance

Changes 0
Metric Value
cc 1
eloc 26
nop 1
dl 0
loc 33
ccs 22
cts 22
cp 1
crap 1
rs 9.256
c 0
b 0
f 0
1
"""Module to test the LinkProtection class."""
2 1
import sys
3 1
from unittest.mock import MagicMock, patch
4
5 1
from kytos.core.common import EntityStatus
6 1
from kytos.lib.helpers import get_controller_mock
7 1
from napps.kytos.mef_eline.models import EVC, Path  # NOQA pycodestyle
8 1
from napps.kytos.mef_eline.tests.helpers import (
9
    get_link_mocked,
10
    get_uni_mocked,
11
    id_to_interface_mock
12
)  # NOQA pycodestyle
13
14
15 1
sys.path.insert(0, "/var/lib/kytos/napps/..")
16
17
18 1
DEPLOY_TO_PRIMARY_PATH = (
19
    "napps.kytos.mef_eline.models.evc.LinkProtection.deploy_to_primary_path"
20
)
21 1
DEPLOY_TO_BACKUP_PATH = (
22
    "napps.kytos.mef_eline.models.evc.LinkProtection.deploy_to_backup_path"
23
)
24 1
GET_BEST_PATH = (
25
    "napps.kytos.mef_eline.models.path.DynamicPathManager.get_best_path"
26
)
27
28
29 1
class TestLinkProtection():  # pylint: disable=too-many-public-methods
30
    """Tests to validate LinkProtection class."""
31
32 1
    def setup_method(self):
33
        """Set up method"""
34 1
        primary_path = [
35
            get_link_mocked(
36
                endpoint_a_port=9,
37
                endpoint_b_port=10,
38
                metadata={"s_vlan": 5},
39
                status=EntityStatus.UP,
40
            ),
41
            get_link_mocked(
42
                endpoint_a_port=11,
43
                endpoint_b_port=12,
44
                metadata={"s_vlan": 6},
45
                status=EntityStatus.DOWN,
46
            ),
47
        ]
48 1
        backup_path = [
49
            get_link_mocked(
50
                endpoint_a_port=13,
51
                endpoint_b_port=14,
52
                metadata={"s_vlan": 5},
53
                status=EntityStatus.DOWN,
54
            ),
55
            get_link_mocked(
56
                endpoint_a_port=11,
57
                endpoint_b_port=12,
58
                metadata={"s_vlan": 6},
59
                status=EntityStatus.DOWN,
60
            ),
61
        ]
62 1
        attributes = {
63
            "controller": get_controller_mock(),
64
            "name": "circuit_1",
65
            "uni_a": get_uni_mocked(is_valid=True),
66
            "uni_z": get_uni_mocked(is_valid=True),
67
            "primary_path": primary_path,
68
            "backup_path": backup_path,
69
            "enabled": True,
70
            "dynamic_backup_path": True,
71
        }
72 1
        self.evc = EVC(**attributes)
73
74 1
    async def test_is_using_backup_path(self):
75
        """Test test is using backup path."""
76
77 1
        attributes = {
78
            "controller": get_controller_mock(),
79
            "name": "circuit_1",
80
            "uni_a": get_uni_mocked(is_valid=True),
81
            "uni_z": get_uni_mocked(is_valid=True),
82
            "backup_path": [
83
                get_link_mocked(
84
                    endpoint_a_port=10,
85
                    endpoint_b_port=9,
86
                    metadata={"s_vlan": 5},
87
                ),
88
                get_link_mocked(
89
                    endpoint_a_port=12,
90
                    endpoint_b_port=11,
91
                    metadata={"s_vlan": 6},
92
                ),
93
            ],
94
        }
95
96 1
        evc = EVC(**attributes)
97 1
        assert evc.is_using_backup_path() is False
98 1
        evc.current_path = evc.backup_path
99 1
        assert evc.is_using_backup_path()
100
101 1
    async def test_is_using_primary_path(self):
102
        """Test test is using primary path."""
103 1
        primary_path = [
104
            get_link_mocked(
105
                endpoint_a_port=10, endpoint_b_port=9, metadata={"s_vlan": 5}
106
            ),
107
            get_link_mocked(
108
                endpoint_a_port=12, endpoint_b_port=11, metadata={"s_vlan": 6}
109
            ),
110
        ]
111
112 1
        attributes = {
113
            "controller": get_controller_mock(),
114
            "name": "circuit_2",
115
            "uni_a": get_uni_mocked(is_valid=True),
116
            "uni_z": get_uni_mocked(is_valid=True),
117
            "primary_path": primary_path,
118
        }
119 1
        evc = EVC(**attributes)
120 1
        assert evc.is_using_primary_path() is False
121 1
        evc.current_path = evc.primary_path
122 1
        assert evc.is_using_primary_path()
123
124 1
    @patch("napps.kytos.mef_eline.models.evc.log")
125 1
    @patch("napps.kytos.mef_eline.models.evc.EVCDeploy._send_flow_mods")
126 1
    @patch(DEPLOY_TO_BACKUP_PATH)
127 1
    @patch("napps.kytos.mef_eline.models.evc.EVCDeploy.deploy")
128 1
    @patch("napps.kytos.mef_eline.models.path.Path.status")
129 1
    async def test_handle_link_down_case_1(
130
        self,
131
        path_status_mocked,
132
        deploy_mocked,
133
        deploy_to_mocked,
134
        _send_flow_mods_mocked,
135
        log_mocked,
136
    ):
137
        """Test if deploy_to backup path is called."""
138 1
        deploy_mocked.return_value = True
139 1
        path_status_mocked.side_effect = [EntityStatus.DOWN, EntityStatus.UP]
140
141 1
        self.evc.current_path = self.evc.primary_path
142 1
        self.evc.activate()
143 1
        deploy_to_mocked.reset_mock()
144 1
        current_handle_link_down = self.evc.handle_link_down()
145 1
        assert deploy_mocked.call_count == 0
146 1
        deploy_to_mocked.assert_called_once()
147
148 1
        assert current_handle_link_down
149 1
        msg = f"{self.evc} deployed after link down."
150 1
        log_mocked.debug.assert_called_once_with(msg)
151
152 1 View Code Duplication
    @patch("napps.kytos.mef_eline.models.evc.log")
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
153 1
    @patch("napps.kytos.mef_eline.models.evc.EVCDeploy.deploy")
154 1
    @patch(DEPLOY_TO_PRIMARY_PATH)
155 1
    @patch("napps.kytos.mef_eline.models.path.Path.status")
156 1
    async def test_handle_link_down_case_2(
157
        self, path_status_mocked, deploy_to_mocked, deploy_mocked, log_mocked
158
    ):
159
        """Test if deploy_to backup path is called."""
160 1
        deploy_mocked.return_value = True
161 1
        deploy_to_mocked.return_value = True
162 1
        path_status_mocked.side_effect = [EntityStatus.UP, EntityStatus.DOWN]
163 1
        primary_path = [
164
            get_link_mocked(
165
                endpoint_a_port=7,
166
                endpoint_b_port=8,
167
                metadata={"s_vlan": 5},
168
                status=EntityStatus.UP,
169
            ),
170
            get_link_mocked(
171
                endpoint_a_port=11,
172
                endpoint_b_port=12,
173
                metadata={"s_vlan": 6},
174
                status=EntityStatus.UP,
175
            ),
176
        ]
177 1
        backup_path = [
178
            get_link_mocked(
179
                endpoint_a_port=7,
180
                endpoint_b_port=10,
181
                metadata={"s_vlan": 5},
182
                status=EntityStatus.DOWN,
183
            ),
184
            get_link_mocked(
185
                endpoint_a_port=15,
186
                endpoint_b_port=12,
187
                metadata={"s_vlan": 6},
188
                status=EntityStatus.UP,
189
            ),
190
        ]
191 1
        attributes = {
192
            "controller": get_controller_mock(),
193
            "name": "circuit_13",
194
            "uni_a": get_uni_mocked(is_valid=True),
195
            "uni_z": get_uni_mocked(is_valid=True),
196
            "primary_path": primary_path,
197
            "backup_path": backup_path,
198
            "enabled": True,
199
        }
200
201 1
        evc = EVC(**attributes)
202 1
        evc.current_path = evc.backup_path
203 1
        deploy_to_mocked.reset_mock()
204 1
        current_handle_link_down = evc.handle_link_down()
205 1
        assert deploy_mocked.call_count == 0
206 1
        deploy_to_mocked.assert_called_once()
207 1
        assert current_handle_link_down
208 1
        msg = f"{evc} deployed after link down."
209 1
        log_mocked.debug.assert_called_once_with(msg)
210
211
    # pylint: disable=too-many-arguments
212 1
    @patch("napps.kytos.mef_eline.models.evc.EVCDeploy.remove_current_flows")
213 1
    @patch("napps.kytos.mef_eline.controllers.ELineController.upsert_evc")
214 1
    @patch("napps.kytos.mef_eline.models.evc.log")
215 1
    @patch("napps.kytos.mef_eline.models.evc.EVCDeploy.deploy")
216 1
    @patch(DEPLOY_TO_PRIMARY_PATH)
217 1
    @patch("napps.kytos.mef_eline.models.path.DynamicPathManager.get_paths")
218 1
    @patch("napps.kytos.mef_eline.models.path.Path.status", EntityStatus.DOWN)
219 1
    async def test_handle_link_down_case_3(
220
        self, get_paths_mocked, deploy_to_mocked, deploy_mocked,
221
        log_mocked, _, mock_remove_current
222
    ):
223
        """Test if circuit without dynamic path is return failed."""
224 1
        deploy_mocked.return_value = False
225 1
        mock_remove_current.return_value = True
226 1
        deploy_to_mocked.return_value = False
227 1
        primary_path = [
228
            get_link_mocked(
229
                endpoint_a_port=9,
230
                endpoint_b_port=10,
231
                metadata={"s_vlan": 5},
232
                status=EntityStatus.DOWN,
233
            ),
234
            get_link_mocked(
235
                endpoint_a_port=11,
236
                endpoint_b_port=12,
237
                metadata={"s_vlan": 6},
238
                status=EntityStatus.UP,
239
            ),
240
        ]
241 1
        backup_path = [
242
            get_link_mocked(
243
                endpoint_a_port=9,
244
                endpoint_b_port=10,
245
                metadata={"s_vlan": 5},
246
                status=EntityStatus.DOWN,
247
            ),
248
            get_link_mocked(
249
                endpoint_a_port=13,
250
                endpoint_b_port=14,
251
                metadata={"s_vlan": 6},
252
                status=EntityStatus.UP,
253
            ),
254
        ]
255 1
        attributes = {
256
            "controller": get_controller_mock(),
257
            "name": "circuit_7",
258
            "uni_a": get_uni_mocked(is_valid=True),
259
            "uni_z": get_uni_mocked(is_valid=True),
260
            "primary_path": primary_path,
261
            "backup_path": backup_path,
262
            "enabled": True,
263
        }
264
265 1
        evc = EVC(**attributes)
266 1
        evc.current_path = evc.backup_path
267 1
        deploy_to_mocked.reset_mock()
268 1
        current_handle_link_down = evc.handle_link_down()
269
270 1
        assert get_paths_mocked.call_count == 0
271 1
        assert deploy_mocked.call_count == 0
272 1
        assert deploy_to_mocked.call_count == 1
273
274 1
        assert current_handle_link_down is False
275 1
        msg = f"Failed to re-deploy {evc} after link down."
276 1
        log_mocked.debug.assert_called_once_with(msg)
277
278 1 View Code Duplication
    @patch("napps.kytos.mef_eline.models.evc.log")
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
279 1
    @patch("napps.kytos.mef_eline.models.evc.EVCDeploy.deploy_to_path")
280 1
    @patch("napps.kytos.mef_eline.models.evc.EVCDeploy._send_flow_mods")
281 1
    @patch(DEPLOY_TO_PRIMARY_PATH)
282 1
    @patch("napps.kytos.mef_eline.models.path.Path.status", EntityStatus.DOWN)
283 1
    async def test_handle_link_down_case_4(
284
        self,
285
        deploy_to_mocked,
286
        _send_flow_mods_mocked,
287
        deploy_mocked,
288
        log_mocked,
289
    ):
290
        """Test if circuit with dynamic path is return success."""
291 1
        deploy_mocked.return_value = True
292 1
        deploy_to_mocked.return_value = False
293 1
        primary_path = [
294
            get_link_mocked(
295
                endpoint_a_port=9,
296
                endpoint_b_port=10,
297
                metadata={"s_vlan": 5},
298
                status=EntityStatus.DOWN,
299
            ),
300
            get_link_mocked(
301
                endpoint_a_port=11,
302
                endpoint_b_port=12,
303
                metadata={"s_vlan": 6},
304
                status=EntityStatus.UP,
305
            ),
306
        ]
307 1
        backup_path = [
308
            get_link_mocked(
309
                endpoint_a_port=9,
310
                endpoint_b_port=10,
311
                metadata={"s_vlan": 5},
312
                status=EntityStatus.DOWN,
313
            ),
314
            get_link_mocked(
315
                endpoint_a_port=13,
316
                endpoint_b_port=14,
317
                metadata={"s_vlan": 6},
318
                status=EntityStatus.UP,
319
            ),
320
        ]
321 1
        attributes = {
322
            "controller": get_controller_mock(),
323
            "name": "circuit_8",
324
            "uni_a": get_uni_mocked(is_valid=True),
325
            "uni_z": get_uni_mocked(is_valid=True),
326
            "primary_path": primary_path,
327
            "backup_path": backup_path,
328
            "enabled": True,
329
            "dynamic_backup_path": True,
330
        }
331
332 1
        evc = EVC(**attributes)
333 1
        evc.current_path = evc.backup_path
334
335 1
        deploy_to_mocked.reset_mock()
336 1
        current_handle_link_down = evc.handle_link_down()
337 1
        assert deploy_to_mocked.call_count == 1
338
339 1
        assert current_handle_link_down
340 1
        msg = f"{evc} deployed after link down."
341 1
        log_mocked.debug.assert_called_with(msg)
342
343 1
    @patch("napps.kytos.mef_eline.models.evc.EVCDeploy.deploy")
344 1
    async def test_handle_link_up_case_1(
345
        self,
346
        deploy_to_mocked,
347
    ):
348
        """Test if handle link up do nothing when is using primary path."""
349 1
        deploy_to_mocked.return_value = True
350 1
        primary_path = [
351
            get_link_mocked(
352
                endpoint_a_port=9,
353
                endpoint_b_port=10,
354
                metadata={"s_vlan": 5},
355
                status=EntityStatus.UP,
356
            ),
357
            get_link_mocked(
358
                endpoint_a_port=11,
359
                endpoint_b_port=12,
360
                metadata={"s_vlan": 6},
361
                status=EntityStatus.UP,
362
            ),
363
        ]
364 1
        backup_path = [
365
            get_link_mocked(
366
                endpoint_a_port=9,
367
                endpoint_b_port=14,
368
                metadata={"s_vlan": 5},
369
                status=EntityStatus.UP,
370
            ),
371
            get_link_mocked(
372
                endpoint_a_port=15,
373
                endpoint_b_port=12,
374
                metadata={"s_vlan": 6},
375
                status=EntityStatus.UP,
376
            ),
377
        ]
378 1
        attributes = {
379
            "controller": get_controller_mock(),
380
            "name": "circuit_9",
381
            "uni_a": get_uni_mocked(is_valid=True),
382
            "uni_z": get_uni_mocked(is_valid=True),
383
            "primary_path": primary_path,
384
            "backup_path": backup_path,
385
            "enabled": True,
386
            "dynamic_backup_path": True,
387
        }
388
389 1
        evc = EVC(**attributes)
390 1
        evc.current_path = evc.primary_path
391 1
        deploy_to_mocked.reset_mock()
392 1
        current_handle_link_up = evc.handle_link_up(backup_path[0])
393 1
        assert deploy_to_mocked.call_count == 0
394 1
        assert current_handle_link_up
395
396 1 View Code Duplication
    @patch("napps.kytos.mef_eline.models.evc.EVCDeploy.deploy")
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
397 1
    @patch("napps.kytos.mef_eline.models.evc.EVCDeploy.deploy_to_path")
398 1
    @patch("napps.kytos.mef_eline.models.path.Path.status", EntityStatus.UP)
399 1
    async def test_handle_link_up_case_2(
400
        self,
401
        deploy_to_path_mocked,
402
        deploy_mocked
403
    ):
404
        """Test if it is changing from backup_path to primary_path."""
405 1
        deploy_mocked.return_value = True
406 1
        deploy_to_path_mocked.return_value = True
407 1
        primary_path = [
408
            get_link_mocked(
409
                endpoint_a_port=9,
410
                endpoint_b_port=10,
411
                metadata={"s_vlan": 5},
412
                status=EntityStatus.UP,
413
            ),
414
            get_link_mocked(
415
                endpoint_a_port=11,
416
                endpoint_b_port=12,
417
                metadata={"s_vlan": 6},
418
                status=EntityStatus.UP,
419
            ),
420
        ]
421 1
        backup_path = [
422
            get_link_mocked(
423
                endpoint_a_port=9,
424
                endpoint_b_port=14,
425
                metadata={"s_vlan": 5},
426
                status=EntityStatus.UP,
427
            ),
428
            get_link_mocked(
429
                endpoint_a_port=15,
430
                endpoint_b_port=12,
431
                metadata={"s_vlan": 6},
432
                status=EntityStatus.UP,
433
            ),
434
        ]
435 1
        attributes = {
436
            "controller": get_controller_mock(),
437
            "name": "circuit_10",
438
            "uni_a": get_uni_mocked(is_valid=True),
439
            "uni_z": get_uni_mocked(is_valid=True),
440
            "primary_path": primary_path,
441
            "backup_path": backup_path,
442
            "enabled": True,
443
            "dynamic_backup_path": True,
444
        }
445
446 1
        evc = EVC(**attributes)
447 1
        evc.current_path = evc.backup_path
448 1
        deploy_to_path_mocked.reset_mock()
449 1
        current_handle_link_up = evc.handle_link_up(primary_path[0])
450 1
        assert deploy_mocked.call_count == 0
451 1
        assert deploy_to_path_mocked.call_count == 1
452 1
        deploy_to_path_mocked.assert_called_once_with(evc.primary_path, None)
453 1
        assert current_handle_link_up
454
455 1 View Code Duplication
    @patch("napps.kytos.mef_eline.models.evc.EVCDeploy.deploy")
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
456 1
    @patch("napps.kytos.mef_eline.models.evc.EVCDeploy.deploy_to_path")
457 1
    @patch("napps.kytos.mef_eline.models.evc.EVC._install_flows")
458 1
    @patch("napps.kytos.mef_eline.models.path.Path.status", EntityStatus.UP)
459 1
    async def test_handle_link_up_case_3(
460
        self,
461
        _install_flows_mocked,
462
        deploy_to_path_mocked,
463
        deploy_mocked,
464
    ):
465
        """Test if it is deployed after the backup is up."""
466 1
        deploy_mocked.return_value = True
467 1
        deploy_to_path_mocked.return_value = True
468 1
        primary_path = [
469
            get_link_mocked(
470
                endpoint_a_port=9,
471
                endpoint_b_port=10,
472
                metadata={"s_vlan": 5},
473
                status=EntityStatus.DOWN,
474
            ),
475
            get_link_mocked(
476
                endpoint_a_port=11,
477
                endpoint_b_port=12,
478
                metadata={"s_vlan": 6},
479
                status=EntityStatus.UP,
480
            ),
481
        ]
482 1
        backup_path = [
483
            get_link_mocked(
484
                endpoint_a_port=9,
485
                endpoint_b_port=14,
486
                metadata={"s_vlan": 5},
487
                status=EntityStatus.DOWN,
488
            ),
489
            get_link_mocked(
490
                endpoint_a_port=15,
491
                endpoint_b_port=12,
492
                metadata={"s_vlan": 6},
493
                status=EntityStatus.UP,
494
            ),
495
        ]
496 1
        attributes = {
497
            "controller": get_controller_mock(),
498
            "name": "circuit_11",
499
            "uni_a": get_uni_mocked(is_valid=True),
500
            "uni_z": get_uni_mocked(is_valid=True),
501
            "primary_path": primary_path,
502
            "backup_path": backup_path,
503
            "enabled": True,
504
            "dynamic_backup_path": True,
505
        }
506
507 1
        evc = EVC(**attributes)
508
509 1
        evc.current_path = Path([])
510 1
        deploy_to_path_mocked.reset_mock()
511 1
        current_handle_link_up = evc.handle_link_up(backup_path[0])
512
513 1
        assert deploy_mocked.call_count == 0
514 1
        assert deploy_to_path_mocked.call_count == 1
515 1
        deploy_to_path_mocked.assert_called_once_with(evc.backup_path, None)
516 1
        assert current_handle_link_up
517
518 1
    @patch("napps.kytos.mef_eline.models.evc.EVCDeploy.deploy_to_path")
519 1
    @patch("napps.kytos.mef_eline.models.evc.EVC._install_flows")
520 1
    @patch("napps.kytos.mef_eline.models.path.Path.status", EntityStatus.DOWN)
521 1
    async def test_handle_link_up_case_4(self, *args):
522
        """Test if not path is found a dynamic path is used."""
523 1
        (
524
            _install_flows_mocked,
525
            deploy_to_path_mocked,
526
        ) = args
527
528 1
        deploy_to_path_mocked.return_value = True
529
530 1
        primary_path = [
531
            get_link_mocked(
532
                endpoint_a_port=9,
533
                endpoint_b_port=10,
534
                metadata={"s_vlan": 5},
535
                status=EntityStatus.UP,
536
            ),
537
            get_link_mocked(
538
                endpoint_a_port=11,
539
                endpoint_b_port=12,
540
                metadata={"s_vlan": 6},
541
                status=EntityStatus.DOWN,
542
            ),
543
        ]
544 1
        backup_path = [
545
            get_link_mocked(
546
                endpoint_a_port=13,
547
                endpoint_b_port=14,
548
                metadata={"s_vlan": 5},
549
                status=EntityStatus.DOWN,
550
            ),
551
            get_link_mocked(
552
                endpoint_a_port=11,
553
                endpoint_b_port=12,
554
                metadata={"s_vlan": 6},
555
                status=EntityStatus.DOWN,
556
            ),
557
        ]
558
559 1
        attributes = {
560
            "controller": get_controller_mock(),
561
            "name": "circuit_12",
562
            "uni_a": get_uni_mocked(is_valid=True),
563
            "uni_z": get_uni_mocked(is_valid=True),
564
            "primary_path": primary_path,
565
            "backup_path": backup_path,
566
            "enabled": True,
567
            "dynamic_backup_path": True,
568
        }
569
570 1
        evc = EVC(**attributes)
571 1
        evc.current_path = Path([])
572
573 1
        deploy_to_path_mocked.reset_mock()
574 1
        current_handle_link_up = evc.handle_link_up(backup_path[0])
575
576 1
        assert deploy_to_path_mocked.call_count == 1
577 1
        deploy_to_path_mocked.assert_called_once_with(old_path_dict=None)
578 1
        assert current_handle_link_up
579
580 1
    async def test_handle_link_up_case_5(self):
581
        """Test handle_link_up method."""
582 1
        return_false_mock = MagicMock(return_value=False)
583 1
        self.evc.is_using_primary_path = return_false_mock
584 1
        self.evc.primary_path.is_affected_by_link = return_false_mock
585 1
        self.evc.is_using_backup_path = MagicMock(return_value=True)
586 1
        assert self.evc.handle_link_up(MagicMock())
587
588
        # not possible to deploy this evc (it will not benefit from link up)
589 1
        self.evc.is_using_backup_path = return_false_mock
590 1
        self.evc.is_using_dynamic_path = return_false_mock
591 1
        self.evc.backup_path.is_affected_by_link = return_false_mock
592 1
        self.evc.dynamic_backup_path = True
593 1
        self.evc.deploy_to_path = return_false_mock
594 1
        assert not self.evc.handle_link_up(MagicMock())
595
596
    # pylint: disable=too-many-statements
597 1
    async def test_handle_link_up_case_6(self):
598
        """Test handle_link_up method."""
599
        # not possible to deploy this evc (it will not benefit from link up)
600 1
        return_false_mock = MagicMock(return_value=False)
601 1
        return_true_mock = MagicMock(return_value=True)
602 1
        self.evc.is_using_primary_path = return_false_mock
603 1
        self.evc.primary_path.is_affected_by_link = return_false_mock
604 1
        self.evc.is_using_backup_path = return_false_mock
605 1
        self.evc.is_using_dynamic_path = return_false_mock
606 1
        self.evc.backup_path.is_affected_by_link = return_false_mock
607 1
        self.evc.dynamic_backup_path = True
608
609 1
        self.evc.deploy_to_primary_path = MagicMock(return_value=False)
610 1
        self.evc.deploy_to_backup_path = MagicMock(return_value=False)
611 1
        self.evc.deploy_to_path = MagicMock(return_value=False)
612
613 1
        assert not self.evc.handle_link_up(MagicMock())
614 1
        assert self.evc.deploy_to_path.call_count == 1
615
616 1
        self.evc.is_using_primary_path = return_true_mock
617 1
        assert self.evc.handle_link_up(MagicMock())
618 1
        assert self.evc.deploy_to_path.call_count == 1
619
620 1
        self.evc.is_using_primary_path = return_false_mock
621 1
        self.evc.is_intra_switch = return_true_mock
622 1
        assert self.evc.handle_link_up(MagicMock())
623 1
        assert self.evc.deploy_to_path.call_count == 1
624
625 1
        self.evc.is_using_primary_path = return_false_mock
626 1
        self.evc.is_intra_switch = return_false_mock
627 1
        self.evc.primary_path.is_affected_by_link = return_true_mock
628 1
        assert not self.evc.handle_link_up(MagicMock())
629 1
        assert self.evc.deploy_to_primary_path.call_count == 1
630 1
        assert self.evc.deploy_to_path.call_count == 2
631
632 1
        self.evc.is_using_primary_path = return_false_mock
633 1
        self.evc.is_intra_switch = return_false_mock
634 1
        self.evc.primary_path.is_affected_by_link = return_true_mock
635 1
        self.evc.deploy_to_primary_path.return_value = True
636 1
        assert self.evc.handle_link_up(MagicMock())
637 1
        assert self.evc.deploy_to_primary_path.call_count == 2
638 1
        assert self.evc.deploy_to_path.call_count == 2
639
640 1
        self.evc.is_using_primary_path = return_false_mock
641 1
        self.evc.is_intra_switch = return_false_mock
642 1
        self.evc.primary_path.is_affected_by_link = return_false_mock
643 1
        self.evc.deploy_to_primary_path.return_value = False
644 1
        self.evc.is_using_backup_path = return_true_mock
645 1
        assert self.evc.handle_link_up(MagicMock())
646 1
        assert self.evc.deploy_to_primary_path.call_count == 2
647 1
        assert self.evc.deploy_to_path.call_count == 2
648
649 1
        self.evc.is_using_primary_path = return_false_mock
650 1
        self.evc.is_intra_switch = return_false_mock
651 1
        self.evc.primary_path.is_affected_by_link = return_false_mock
652 1
        self.evc.deploy_to_primary_path.return_value = False
653 1
        self.evc.is_using_backup_path = return_false_mock
654 1
        self.evc.is_using_dynamic_path = return_true_mock
655 1
        assert self.evc.handle_link_up(MagicMock())
656 1
        assert self.evc.deploy_to_primary_path.call_count == 2
657 1
        assert self.evc.deploy_to_path.call_count == 2
658
659 1
        self.evc.is_using_primary_path = return_false_mock
660 1
        self.evc.is_intra_switch = return_false_mock
661 1
        self.evc.primary_path.is_affected_by_link = return_false_mock
662 1
        self.evc.deploy_to_primary_path.return_value = False
663 1
        self.evc.is_using_backup_path = return_false_mock
664 1
        self.evc.is_using_dynamic_path = return_false_mock
665 1
        self.evc.backup_path.is_affected_by_link = return_true_mock
666 1
        assert not self.evc.handle_link_up(MagicMock())
667 1
        assert self.evc.deploy_to_primary_path.call_count == 2
668 1
        assert self.evc.deploy_to_backup_path.call_count == 1
669 1
        assert self.evc.deploy_to_path.call_count == 3
670
671 1
        self.evc.is_using_primary_path = return_false_mock
672 1
        self.evc.is_intra_switch = return_false_mock
673 1
        self.evc.primary_path.is_affected_by_link = return_false_mock
674 1
        self.evc.deploy_to_primary_path.return_value = False
675 1
        self.evc.is_using_backup_path = return_false_mock
676 1
        self.evc.is_using_dynamic_path = return_false_mock
677 1
        self.evc.backup_path.is_affected_by_link = return_true_mock
678 1
        self.evc.deploy_to_backup_path.return_value = True
679 1
        assert self.evc.handle_link_up(MagicMock())
680 1
        assert self.evc.deploy_to_primary_path.call_count == 2
681 1
        assert self.evc.deploy_to_backup_path.call_count == 2
682 1
        assert self.evc.deploy_to_path.call_count == 3
683
684 1
        self.evc.is_using_primary_path = return_false_mock
685 1
        self.evc.is_intra_switch = return_false_mock
686 1
        self.evc.primary_path.is_affected_by_link = return_false_mock
687 1
        self.evc.deploy_to_primary_path.return_value = False
688 1
        self.evc.is_using_backup_path = return_false_mock
689 1
        self.evc.is_using_dynamic_path = return_false_mock
690 1
        self.evc.backup_path.is_affected_by_link = return_false_mock
691 1
        self.evc.deploy_to_backup_path.return_value = False
692 1
        self.evc.dynamic_backup_path = True
693 1
        assert not self.evc.handle_link_up(MagicMock())
694 1
        assert self.evc.deploy_to_primary_path.call_count == 2
695 1
        assert self.evc.deploy_to_backup_path.call_count == 2
696 1
        assert self.evc.deploy_to_path.call_count == 4
697
698 1
        self.evc.is_using_primary_path = return_false_mock
699 1
        self.evc.is_intra_switch = return_false_mock
700 1
        self.evc.primary_path.is_affected_by_link = return_false_mock
701 1
        self.evc.deploy_to_primary_path.return_value = False
702 1
        self.evc.is_using_backup_path = return_false_mock
703 1
        self.evc.is_using_dynamic_path = return_false_mock
704 1
        self.evc.backup_path.is_affected_by_link = return_false_mock
705 1
        self.evc.deploy_to_backup_path.return_value = False
706 1
        self.evc.dynamic_backup_path = True
707 1
        self.evc.deploy_to_path.return_value = True
708 1
        assert self.evc.handle_link_up(MagicMock())
709 1
        assert self.evc.deploy_to_primary_path.call_count == 2
710 1
        assert self.evc.deploy_to_backup_path.call_count == 2
711 1
        assert self.evc.deploy_to_path.call_count == 5
712
713 1
        self.evc.is_using_primary_path = return_false_mock
714 1
        self.evc.is_intra_switch = return_false_mock
715 1
        self.evc.primary_path.is_affected_by_link = return_false_mock
716 1
        self.evc.deploy_to_primary_path.return_value = False
717 1
        self.evc.is_using_backup_path = return_false_mock
718 1
        self.evc.is_using_dynamic_path = return_false_mock
719 1
        self.evc.backup_path.is_affected_by_link = return_false_mock
720 1
        self.evc.deploy_to_backup_path.return_value = False
721 1
        self.evc.dynamic_backup_path = False
722 1
        self.evc.deploy_to_path.return_value = False
723 1
        assert not self.evc.handle_link_up(MagicMock())
724 1
        assert self.evc.deploy_to_primary_path.call_count == 2
725 1
        assert self.evc.deploy_to_backup_path.call_count == 2
726 1
        assert self.evc.deploy_to_path.call_count == 5
727
728 1
    async def test_handle_link_up_case_7(self):
729
        """Test handle_link_up method."""
730 1
        return_false_mock = MagicMock(return_value=False)
731 1
        self.evc.is_using_primary_path = return_false_mock
732 1
        self.evc.primary_path.is_affected_by_link = return_false_mock
733 1
        self.evc.is_using_dynamic_path = return_false_mock
734 1
        self.evc.backup_path.is_affected_by_link = return_false_mock
735 1
        self.evc.dynamic_backup_path = True
736 1
        self.evc.activate()
737 1
        assert self.evc.is_active()
738 1
        self.evc.deploy_to_path = MagicMock(return_value=True)
739 1
        assert not self.evc.handle_link_up(MagicMock())
740 1
        assert self.evc.deploy_to_path.call_count == 0
741
742 1
    @patch(DEPLOY_TO_BACKUP_PATH)
743 1
    @patch(DEPLOY_TO_PRIMARY_PATH)
744 1
    async def test_handle_link_up_case_8(
745
        self, deploy_primary_mock, deploy_backup_mock
746
    ):
747
        """Test when UNI is UP and dinamic primary_path from
748
        EVC is UP as well."""
749 1
        primary_path = [
750
            get_link_mocked(
751
                endpoint_a_port=9,
752
                endpoint_b_port=10,
753
                metadata={"s_vlan": 5},
754
                status=EntityStatus.UP,
755
            ),
756
            get_link_mocked(
757
                endpoint_a_port=11,
758
                endpoint_b_port=12,
759
                metadata={"s_vlan": 6},
760
                status=EntityStatus.UP,
761
            ),
762
        ]
763 1
        backup_path = [
764
            get_link_mocked(
765
                endpoint_a_port=13,
766
                endpoint_b_port=14,
767
                metadata={"s_vlan": 5},
768
                status=EntityStatus.UP,
769
            ),
770
            get_link_mocked(
771
                endpoint_a_port=11,
772
                endpoint_b_port=12,
773
                metadata={"s_vlan": 6},
774
                status=EntityStatus.DOWN,
775
            ),
776
        ]
777
778 1
        attributes = {
779
            "controller": get_controller_mock(),
780
            "name": "circuit",
781
            "uni_a": get_uni_mocked(is_valid=True),
782
            "uni_z": get_uni_mocked(is_valid=True),
783
            "primary_path": primary_path,
784
            "backup_path": backup_path,
785
            "enabled": True,
786
            "dynamic_backup_path": True,
787
        }
788
789 1
        evc = EVC(**attributes)
790 1
        evc.handle_link_up(interface=evc.uni_a.interface)
791 1
        assert deploy_primary_mock.call_count == 1
792 1
        assert deploy_backup_mock.call_count == 0
793
794 1
        evc.primary_path[0].status = EntityStatus.DOWN
795 1
        evc.backup_path[1].status = EntityStatus.UP
796 1
        evc.handle_link_up(interface=evc.uni_a.interface)
797 1
        assert deploy_primary_mock.call_count == 1
798 1
        assert deploy_backup_mock.call_count == 1
799
800 1
    async def test_are_unis_active(self):
801
        """Test are_unis_active"""
802 1
        self.evc.uni_a.interface._enabled = True
803 1
        self.evc.uni_z.interface._enabled = True
804 1
        assert self.evc.are_unis_active() is True
805
806 1
        self.evc.uni_a.interface._active = False
807 1
        self.evc.uni_z.interface._active = False
808 1
        assert self.evc.are_unis_active() is False
809
810 1
        self.evc.uni_a.interface._enabled = False
811 1
        self.evc.uni_z.interface._enabled = False
812 1
        assert self.evc.are_unis_active() is False
813
814 1
    async def test_is_uni_interface_active(self):
815
        """Test is_uni_interface_active"""
816 1
        interface_a = id_to_interface_mock('00:01:1')
817 1
        interface_a.status_reason = set()
818 1
        interface_z = id_to_interface_mock('00:03:1')
819 1
        interface_z.status_reason = set()
820
821 1
        interface_a.status = EntityStatus.UP
822 1
        interface_z.status = EntityStatus.UP
823 1
        actual = self.evc.is_uni_interface_active(interface_a, interface_z)
824 1
        interfaces = {
825
            '00:01:1': {"status": "UP", "status_reason": set()},
826
            '00:03:1': {"status": "UP", "status_reason": set()},
827
        }
828 1
        expected = (True, interfaces)
829 1
        assert actual == expected
830
831 1
        interface_a.status = EntityStatus.DOWN
832 1
        actual = self.evc.is_uni_interface_active(interface_a, interface_z)
833 1
        interfaces = {
834
            '00:01:1': {'status': 'DOWN', 'status_reason': set()}
835
        }
836 1
        expected = (False, interfaces)
837 1
        assert actual == expected
838
839 1
        interface_a.status = EntityStatus.UP
840 1
        interface_z.status = EntityStatus.DOWN
841 1
        actual = self.evc.is_uni_interface_active(interface_a, interface_z)
842 1
        interfaces = {
843
            '00:03:1': {'status': 'DOWN', 'status_reason': set()}
844
        }
845 1
        expected = (False, interfaces)
846 1
        assert actual == expected
847
848 1
    async def test_handle_interface_link(self, monkeypatch):
849
        """
850
        Test Interface Link Up
851
        """
852 1
        return_false_mock = MagicMock(return_value=False)
853 1
        return_true_mock = MagicMock(return_value=True)
854 1
        interface_a = self.evc.uni_a.interface
855 1
        interface_a.enable()
856 1
        interface_b = self.evc.uni_z.interface
857 1
        interface_b.enable()
858 1
        emit_mock = MagicMock()
859 1
        monkeypatch.setattr("napps.kytos.mef_eline.models.evc.emit_event",
860
                            emit_mock)
861
862 1
        self.evc.try_to_activate = MagicMock()
863 1
        self.evc.deactivate = MagicMock()
864 1
        self.evc.sync = MagicMock()
865
866
        # Test do nothing
867 1
        self.evc.is_active = return_true_mock
868
869 1
        self.evc.handle_interface_link_up(interface_a)
870
871 1
        self.evc.try_to_activate.assert_not_called()
872 1
        self.evc.sync.assert_not_called()
873
874
        # Test deactivating
875 1
        interface_a.deactivate()
876
877 1
        assert emit_mock.call_count == 0
878 1
        self.evc.handle_interface_link_down(interface_a)
879 1
        assert emit_mock.call_count == 1
880
881 1
        self.evc.deactivate.assert_called_once()
882 1
        self.evc.sync.assert_called_once()
883
884
        # Test do nothing
885 1
        self.evc.is_active = return_false_mock
886
887 1
        self.evc.handle_interface_link_down(interface_a)
888
889 1
        self.evc.deactivate.assert_called_once()
890 1
        self.evc.sync.assert_called_once()
891
892
        # Test activating
893 1
        interface_a.activate()
894
895 1
        assert emit_mock.call_count == 1
896 1
        self.evc.try_to_handle_uni_as_link_up = MagicMock()
897 1
        self.evc.try_to_handle_uni_as_link_up.return_value = False
898 1
        self.evc.handle_interface_link_up(interface_a)
899
900 1
        self.evc.try_to_activate.assert_called_once()
901 1
        assert self.evc.sync.call_count == 2
902
        assert emit_mock.call_count == 2
903