Passed
Pull Request — master (#383)
by
unknown
03:34
created

TestLinkProtection.test_handle_interface_link()   A

Complexity

Conditions 1

Size

Total Lines 46
Code Lines 26

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 26
CRAP Score 1

Importance

Changes 0
Metric Value
cc 1
eloc 26
nop 1
dl 0
loc 46
ccs 26
cts 26
cp 1
crap 1
rs 9.256
c 0
b 0
f 0
1
"""Module to test the LinkProtection class."""
2 1
import sys
3 1
from unittest.mock import MagicMock, patch
4
5 1
from kytos.core.common import EntityStatus
6 1
from kytos.lib.helpers import get_controller_mock
7 1
from napps.kytos.mef_eline.models import EVC, Path  # NOQA pycodestyle
8 1
from napps.kytos.mef_eline.tests.helpers import (
9
    get_link_mocked,
10
    get_uni_mocked,
11
    get_mocked_requests,
12
    id_to_interface_mock
13
)  # NOQA pycodestyle
14
15
16 1
sys.path.insert(0, "/var/lib/kytos/napps/..")
17
18
19 1
DEPLOY_TO_PRIMARY_PATH = (
20
    "napps.kytos.mef_eline.models.evc.LinkProtection.deploy_to_primary_path"
21
)
22 1
DEPLOY_TO_BACKUP_PATH = (
23
    "napps.kytos.mef_eline.models.evc.LinkProtection.deploy_to_backup_path"
24
)
25 1
GET_BEST_PATH = (
26
    "napps.kytos.mef_eline.models.path.DynamicPathManager.get_best_path"
27
)
28
29
30 1
class TestLinkProtection():  # pylint: disable=too-many-public-methods
31
    """Tests to validate LinkProtection class."""
32
33 1
    def setup_method(self):
34
        """Set up method"""
35 1
        primary_path = [
36
            get_link_mocked(
37
                endpoint_a_port=9,
38
                endpoint_b_port=10,
39
                metadata={"s_vlan": 5},
40
                status=EntityStatus.UP,
41
            ),
42
            get_link_mocked(
43
                endpoint_a_port=11,
44
                endpoint_b_port=12,
45
                metadata={"s_vlan": 6},
46
                status=EntityStatus.DOWN,
47
            ),
48
        ]
49 1
        backup_path = [
50
            get_link_mocked(
51
                endpoint_a_port=13,
52
                endpoint_b_port=14,
53
                metadata={"s_vlan": 5},
54
                status=EntityStatus.DOWN,
55
            ),
56
            get_link_mocked(
57
                endpoint_a_port=11,
58
                endpoint_b_port=12,
59
                metadata={"s_vlan": 6},
60
                status=EntityStatus.DOWN,
61
            ),
62
        ]
63 1
        attributes = {
64
            "controller": get_controller_mock(),
65
            "name": "circuit_1",
66
            "uni_a": get_uni_mocked(is_valid=True),
67
            "uni_z": get_uni_mocked(is_valid=True),
68
            "primary_path": primary_path,
69
            "backup_path": backup_path,
70
            "enabled": True,
71
            "dynamic_backup_path": True,
72
        }
73 1
        self.evc = EVC(**attributes)
74
75 1
    async def test_is_using_backup_path(self):
76
        """Test test is using backup path."""
77
78 1
        attributes = {
79
            "controller": get_controller_mock(),
80
            "name": "circuit_1",
81
            "uni_a": get_uni_mocked(is_valid=True),
82
            "uni_z": get_uni_mocked(is_valid=True),
83
            "backup_path": [
84
                get_link_mocked(
85
                    endpoint_a_port=10,
86
                    endpoint_b_port=9,
87
                    metadata={"s_vlan": 5},
88
                ),
89
                get_link_mocked(
90
                    endpoint_a_port=12,
91
                    endpoint_b_port=11,
92
                    metadata={"s_vlan": 6},
93
                ),
94
            ],
95
        }
96
97 1
        evc = EVC(**attributes)
98 1
        assert evc.is_using_backup_path() is False
99 1
        evc.current_path = evc.backup_path
100 1
        assert evc.is_using_backup_path()
101
102 1
    async def test_is_using_primary_path(self):
103
        """Test test is using primary path."""
104 1
        primary_path = [
105
            get_link_mocked(
106
                endpoint_a_port=10, endpoint_b_port=9, metadata={"s_vlan": 5}
107
            ),
108
            get_link_mocked(
109
                endpoint_a_port=12, endpoint_b_port=11, metadata={"s_vlan": 6}
110
            ),
111
        ]
112
113 1
        attributes = {
114
            "controller": get_controller_mock(),
115
            "name": "circuit_2",
116
            "uni_a": get_uni_mocked(is_valid=True),
117
            "uni_z": get_uni_mocked(is_valid=True),
118
            "primary_path": primary_path,
119
        }
120 1
        evc = EVC(**attributes)
121 1
        assert evc.is_using_primary_path() is False
122 1
        evc.current_path = evc.primary_path
123 1
        assert evc.is_using_primary_path()
124
125 1
    @patch("napps.kytos.mef_eline.models.evc.log")
126 1
    async def test_deploy_to_case_1(self, log_mocked):
127
        """Test if the path is equal to current_path."""
128 1
        primary_path = [
129
            get_link_mocked(
130
                endpoint_a_port=10, endpoint_b_port=9, metadata={"s_vlan": 5}
131
            ),
132
            get_link_mocked(
133
                endpoint_a_port=12, endpoint_b_port=11, metadata={"s_vlan": 6}
134
            ),
135
        ]
136 1
        attributes = {
137
            "controller": get_controller_mock(),
138
            "name": "circuit_3",
139
            "uni_a": get_uni_mocked(is_valid=True),
140
            "uni_z": get_uni_mocked(is_valid=True),
141
            "primary_path": primary_path,
142
        }
143 1
        evc = EVC(**attributes)
144 1
        evc.current_path = evc.primary_path
145
146 1
        expected_deployed = evc.deploy_to("primary_path", evc.primary_path)
147 1
        expected_msg = "primary_path is equal to current_path."
148 1
        log_mocked.debug.assert_called_with(expected_msg)
149 1
        assert expected_deployed
150
151
    # pylint: disable=too-many-arguments
152 1
    @patch("requests.post")
153 1
    @patch("napps.kytos.mef_eline.controllers.ELineController.upsert_evc")
154 1
    @patch("napps.kytos.mef_eline.models.evc.EVCDeploy.deploy")
155 1
    @patch("napps.kytos.mef_eline.models.evc.EVC._install_nni_flows")
156 1
    @patch("napps.kytos.mef_eline.models.evc.EVC._install_uni_flows")
157 1
    @patch("napps.kytos.mef_eline.models.path.Path.status", EntityStatus.UP)
158 1
    async def test_deploy_to_case_2(
159
        self,
160
        install_uni_flows_mocked,
161
        install_nni_flows_mocked,
162
        deploy_mocked,
163
        _,
164
        requests_mock,
165
    ):
166
        """Test deploy with all links up."""
167 1
        deploy_mocked.return_value = True
168 1
        response = MagicMock()
169 1
        response.status_code = 201
170 1
        requests_mock.return_value = response
171
172 1
        primary_path = [
173
            get_link_mocked(status=EntityStatus.UP),
174
            get_link_mocked(status=EntityStatus.UP),
175
        ]
176 1
        attributes = {
177
            "controller": get_controller_mock(),
178
            "name": "circuit_4",
179
            "uni_a": get_uni_mocked(is_valid=True),
180
            "uni_z": get_uni_mocked(is_valid=True),
181
            "primary_path": primary_path,
182
            "enabled": True,
183
        }
184 1
        evc = EVC(**attributes)
185
186 1
        deployed = evc.deploy_to("primary_path", evc.primary_path)
187 1
        install_uni_flows_mocked.assert_called_with(evc.primary_path)
188 1
        install_nni_flows_mocked.assert_called_with(evc.primary_path)
189 1
        assert deployed
190
191 1
    @patch("requests.get", side_effect=get_mocked_requests)
192 1
    async def test_deploy_to_case_3(self, requests_mocked):
193
        # pylint: disable=unused-argument
194
        """Test deploy with one link down."""
195 1
        link1 = get_link_mocked()
196 1
        link2 = get_link_mocked()
197 1
        link1.id = "abc"
198 1
        link2.id = "def"
199 1
        primary_path = [link1, link2]
200 1
        attributes = {
201
            "controller": get_controller_mock(),
202
            "name": "circuit_5",
203
            "uni_a": get_uni_mocked(is_valid=True),
204
            "uni_z": get_uni_mocked(is_valid=True),
205
            "primary_path": primary_path,
206
            "enabled": True,
207
        }
208 1
        evc = EVC(**attributes)
209
210 1
        deployed = evc.deploy_to("primary_path", evc.primary_path)
211 1
        assert deployed is False
212
213 1
    @patch("napps.kytos.mef_eline.models.evc.log")
214 1
    @patch("napps.kytos.mef_eline.models.evc.EVCDeploy._send_flow_mods")
215 1
    @patch(DEPLOY_TO_BACKUP_PATH)
216 1
    @patch("napps.kytos.mef_eline.models.evc.EVCDeploy.deploy")
217 1
    @patch("napps.kytos.mef_eline.models.path.Path.status")
218 1
    async def test_handle_link_down_case_1(
219
        self,
220
        path_status_mocked,
221
        deploy_mocked,
222
        deploy_to_mocked,
223
        _send_flow_mods_mocked,
224
        log_mocked,
225
    ):
226
        """Test if deploy_to backup path is called."""
227 1
        deploy_mocked.return_value = True
228 1
        path_status_mocked.side_effect = [EntityStatus.DOWN, EntityStatus.UP]
229
230 1
        self.evc.current_path = self.evc.primary_path
231 1
        self.evc.activate()
232 1
        deploy_to_mocked.reset_mock()
233 1
        current_handle_link_down = self.evc.handle_link_down()
234 1
        assert deploy_mocked.call_count == 0
235 1
        deploy_to_mocked.assert_called_once()
236
237 1
        assert current_handle_link_down
238 1
        msg = f"{self.evc} deployed after link down."
239 1
        log_mocked.debug.assert_called_once_with(msg)
240
241 1 View Code Duplication
    @patch("napps.kytos.mef_eline.models.evc.log")
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
242 1
    @patch("napps.kytos.mef_eline.models.evc.EVCDeploy.deploy")
243 1
    @patch(DEPLOY_TO_PRIMARY_PATH)
244 1
    @patch("napps.kytos.mef_eline.models.path.Path.status")
245 1
    async def test_handle_link_down_case_2(
246
        self, path_status_mocked, deploy_to_mocked, deploy_mocked, log_mocked
247
    ):
248
        """Test if deploy_to backup path is called."""
249 1
        deploy_mocked.return_value = True
250 1
        deploy_to_mocked.return_value = True
251 1
        path_status_mocked.side_effect = [EntityStatus.UP, EntityStatus.DOWN]
252 1
        primary_path = [
253
            get_link_mocked(
254
                endpoint_a_port=7,
255
                endpoint_b_port=8,
256
                metadata={"s_vlan": 5},
257
                status=EntityStatus.UP,
258
            ),
259
            get_link_mocked(
260
                endpoint_a_port=11,
261
                endpoint_b_port=12,
262
                metadata={"s_vlan": 6},
263
                status=EntityStatus.UP,
264
            ),
265
        ]
266 1
        backup_path = [
267
            get_link_mocked(
268
                endpoint_a_port=7,
269
                endpoint_b_port=10,
270
                metadata={"s_vlan": 5},
271
                status=EntityStatus.DOWN,
272
            ),
273
            get_link_mocked(
274
                endpoint_a_port=15,
275
                endpoint_b_port=12,
276
                metadata={"s_vlan": 6},
277
                status=EntityStatus.UP,
278
            ),
279
        ]
280 1
        attributes = {
281
            "controller": get_controller_mock(),
282
            "name": "circuit_13",
283
            "uni_a": get_uni_mocked(is_valid=True),
284
            "uni_z": get_uni_mocked(is_valid=True),
285
            "primary_path": primary_path,
286
            "backup_path": backup_path,
287
            "enabled": True,
288
        }
289
290 1
        evc = EVC(**attributes)
291 1
        evc.current_path = evc.backup_path
292 1
        deploy_to_mocked.reset_mock()
293 1
        current_handle_link_down = evc.handle_link_down()
294 1
        assert deploy_mocked.call_count == 0
295 1
        deploy_to_mocked.assert_called_once()
296 1
        assert current_handle_link_down
297 1
        msg = f"{evc} deployed after link down."
298 1
        log_mocked.debug.assert_called_once_with(msg)
299
300 1
    @patch("napps.kytos.mef_eline.controllers.ELineController.upsert_evc")
301 1
    @patch("napps.kytos.mef_eline.models.evc.log")
302 1
    @patch("napps.kytos.mef_eline.models.evc.EVCDeploy.deploy")
303 1
    @patch(DEPLOY_TO_PRIMARY_PATH)
304 1
    @patch("napps.kytos.mef_eline.models.path.DynamicPathManager.get_paths")
305 1
    @patch("napps.kytos.mef_eline.models.path.Path.status", EntityStatus.DOWN)
306 1
    async def test_handle_link_down_case_3(
307
        self, get_paths_mocked, deploy_to_mocked, deploy_mocked, log_mocked, _
308
    ):
309
        """Test if circuit without dynamic path is return failed."""
310 1
        deploy_mocked.return_value = False
311 1
        deploy_to_mocked.return_value = False
312 1
        primary_path = [
313
            get_link_mocked(
314
                endpoint_a_port=9,
315
                endpoint_b_port=10,
316
                metadata={"s_vlan": 5},
317
                status=EntityStatus.DOWN,
318
            ),
319
            get_link_mocked(
320
                endpoint_a_port=11,
321
                endpoint_b_port=12,
322
                metadata={"s_vlan": 6},
323
                status=EntityStatus.UP,
324
            ),
325
        ]
326 1
        backup_path = [
327
            get_link_mocked(
328
                endpoint_a_port=9,
329
                endpoint_b_port=10,
330
                metadata={"s_vlan": 5},
331
                status=EntityStatus.DOWN,
332
            ),
333
            get_link_mocked(
334
                endpoint_a_port=13,
335
                endpoint_b_port=14,
336
                metadata={"s_vlan": 6},
337
                status=EntityStatus.UP,
338
            ),
339
        ]
340 1
        attributes = {
341
            "controller": get_controller_mock(),
342
            "name": "circuit_7",
343
            "uni_a": get_uni_mocked(is_valid=True),
344
            "uni_z": get_uni_mocked(is_valid=True),
345
            "primary_path": primary_path,
346
            "backup_path": backup_path,
347
            "enabled": True,
348
        }
349
350 1
        evc = EVC(**attributes)
351 1
        evc.current_path = evc.backup_path
352 1
        deploy_to_mocked.reset_mock()
353 1
        current_handle_link_down = evc.handle_link_down()
354
355 1
        assert get_paths_mocked.call_count == 0
356 1
        assert deploy_mocked.call_count == 0
357 1
        assert deploy_to_mocked.call_count == 1
358
359 1
        assert current_handle_link_down is False
360 1
        msg = f"Failed to re-deploy {evc} after link down."
361 1
        log_mocked.debug.assert_called_once_with(msg)
362
363 1 View Code Duplication
    @patch("napps.kytos.mef_eline.models.evc.log")
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
364 1
    @patch("napps.kytos.mef_eline.models.evc.EVCDeploy.deploy_to_path")
365 1
    @patch("napps.kytos.mef_eline.models.evc.EVCDeploy._send_flow_mods")
366 1
    @patch(DEPLOY_TO_PRIMARY_PATH)
367 1
    @patch("napps.kytos.mef_eline.models.path.Path.status", EntityStatus.DOWN)
368 1
    async def test_handle_link_down_case_4(
369
        self,
370
        deploy_to_mocked,
371
        _send_flow_mods_mocked,
372
        deploy_mocked,
373
        log_mocked,
374
    ):
375
        """Test if circuit with dynamic path is return success."""
376 1
        deploy_mocked.return_value = True
377 1
        deploy_to_mocked.return_value = False
378 1
        primary_path = [
379
            get_link_mocked(
380
                endpoint_a_port=9,
381
                endpoint_b_port=10,
382
                metadata={"s_vlan": 5},
383
                status=EntityStatus.DOWN,
384
            ),
385
            get_link_mocked(
386
                endpoint_a_port=11,
387
                endpoint_b_port=12,
388
                metadata={"s_vlan": 6},
389
                status=EntityStatus.UP,
390
            ),
391
        ]
392 1
        backup_path = [
393
            get_link_mocked(
394
                endpoint_a_port=9,
395
                endpoint_b_port=10,
396
                metadata={"s_vlan": 5},
397
                status=EntityStatus.DOWN,
398
            ),
399
            get_link_mocked(
400
                endpoint_a_port=13,
401
                endpoint_b_port=14,
402
                metadata={"s_vlan": 6},
403
                status=EntityStatus.UP,
404
            ),
405
        ]
406 1
        attributes = {
407
            "controller": get_controller_mock(),
408
            "name": "circuit_8",
409
            "uni_a": get_uni_mocked(is_valid=True),
410
            "uni_z": get_uni_mocked(is_valid=True),
411
            "primary_path": primary_path,
412
            "backup_path": backup_path,
413
            "enabled": True,
414
            "dynamic_backup_path": True,
415
        }
416
417 1
        evc = EVC(**attributes)
418 1
        evc.current_path = evc.backup_path
419
420 1
        deploy_to_mocked.reset_mock()
421 1
        current_handle_link_down = evc.handle_link_down()
422 1
        assert deploy_to_mocked.call_count == 1
423
424 1
        assert current_handle_link_down
425 1
        msg = f"{evc} deployed after link down."
426 1
        log_mocked.debug.assert_called_with(msg)
427
428 1
    @patch("napps.kytos.mef_eline.models.evc.EVCDeploy.deploy")
429 1
    @patch("napps.kytos.mef_eline.models.evc.LinkProtection.deploy_to")
430 1
    async def test_handle_link_up_case_1(
431
        self,
432
        deploy_to_mocked,
433
        deploy_mocked
434
    ):
435
        """Test if handle link up do nothing when is using primary path."""
436 1
        deploy_mocked.return_value = True
437 1
        deploy_to_mocked.return_value = True
438 1
        primary_path = [
439
            get_link_mocked(
440
                endpoint_a_port=9,
441
                endpoint_b_port=10,
442
                metadata={"s_vlan": 5},
443
                status=EntityStatus.UP,
444
            ),
445
            get_link_mocked(
446
                endpoint_a_port=11,
447
                endpoint_b_port=12,
448
                metadata={"s_vlan": 6},
449
                status=EntityStatus.UP,
450
            ),
451
        ]
452 1
        backup_path = [
453
            get_link_mocked(
454
                endpoint_a_port=9,
455
                endpoint_b_port=14,
456
                metadata={"s_vlan": 5},
457
                status=EntityStatus.UP,
458
            ),
459
            get_link_mocked(
460
                endpoint_a_port=15,
461
                endpoint_b_port=12,
462
                metadata={"s_vlan": 6},
463
                status=EntityStatus.UP,
464
            ),
465
        ]
466 1
        attributes = {
467
            "controller": get_controller_mock(),
468
            "name": "circuit_9",
469
            "uni_a": get_uni_mocked(is_valid=True),
470
            "uni_z": get_uni_mocked(is_valid=True),
471
            "primary_path": primary_path,
472
            "backup_path": backup_path,
473
            "enabled": True,
474
            "dynamic_backup_path": True,
475
        }
476
477 1
        evc = EVC(**attributes)
478 1
        evc.current_path = evc.primary_path
479 1
        deploy_to_mocked.reset_mock()
480 1
        current_handle_link_up = evc.handle_link_up(backup_path[0])
481 1
        assert deploy_mocked.call_count == 0
482 1
        assert deploy_to_mocked.call_count == 0
483 1
        assert current_handle_link_up
484
485 1
    @patch("napps.kytos.mef_eline.models.evc.EVCDeploy.deploy")
486 1
    @patch("napps.kytos.mef_eline.models.evc.EVCDeploy.deploy_to_path")
487 1
    @patch("napps.kytos.mef_eline.models.path.Path.status", EntityStatus.UP)
488 1
    async def test_handle_link_up_case_2(
489
        self,
490
        deploy_to_path_mocked,
491
        deploy_mocked
492
    ):
493
        """Test if it is changing from backup_path to primary_path."""
494 1
        deploy_mocked.return_value = True
495 1
        deploy_to_path_mocked.return_value = True
496 1
        primary_path = [
497
            get_link_mocked(
498
                endpoint_a_port=9,
499
                endpoint_b_port=10,
500
                metadata={"s_vlan": 5},
501
                status=EntityStatus.UP,
502
            ),
503
            get_link_mocked(
504
                endpoint_a_port=11,
505
                endpoint_b_port=12,
506
                metadata={"s_vlan": 6},
507
                status=EntityStatus.UP,
508
            ),
509
        ]
510 1
        backup_path = [
511
            get_link_mocked(
512
                endpoint_a_port=9,
513
                endpoint_b_port=14,
514
                metadata={"s_vlan": 5},
515
                status=EntityStatus.UP,
516
            ),
517
            get_link_mocked(
518
                endpoint_a_port=15,
519
                endpoint_b_port=12,
520
                metadata={"s_vlan": 6},
521
                status=EntityStatus.UP,
522
            ),
523
        ]
524 1
        attributes = {
525
            "controller": get_controller_mock(),
526
            "name": "circuit_10",
527
            "uni_a": get_uni_mocked(is_valid=True),
528
            "uni_z": get_uni_mocked(is_valid=True),
529
            "primary_path": primary_path,
530
            "backup_path": backup_path,
531
            "enabled": True,
532
            "dynamic_backup_path": True,
533
        }
534
535 1
        evc = EVC(**attributes)
536 1
        evc.current_path = evc.backup_path
537 1
        deploy_to_path_mocked.reset_mock()
538 1
        current_handle_link_up = evc.handle_link_up(primary_path[0])
539 1
        assert deploy_mocked.call_count == 0
540 1
        assert deploy_to_path_mocked.call_count == 1
541 1
        deploy_to_path_mocked.assert_called_once_with(evc.primary_path)
542 1
        assert current_handle_link_up
543
544 1
    @patch("napps.kytos.mef_eline.models.evc.EVCDeploy.deploy")
545 1
    @patch("napps.kytos.mef_eline.models.evc.EVCDeploy.deploy_to_path")
546 1
    @patch(GET_BEST_PATH)
547 1
    @patch("napps.kytos.mef_eline.models.evc.EVC._install_nni_flows")
548 1
    @patch("napps.kytos.mef_eline.models.evc.EVC._install_uni_flows")
549 1
    @patch("napps.kytos.mef_eline.models.path.Path.status", EntityStatus.UP)
550 1
    async def test_handle_link_up_case_3(
551
        self,
552
        _install_uni_flows_mocked,
553
        _install_nni_flows_mocked,
554
        get_best_path_mocked,
555
        deploy_to_path_mocked,
556
        deploy_mocked,
557
    ):
558
        """Test if it is deployed after the backup is up."""
559 1
        deploy_mocked.return_value = True
560 1
        deploy_to_path_mocked.return_value = True
561 1
        primary_path = [
562
            get_link_mocked(
563
                endpoint_a_port=9,
564
                endpoint_b_port=10,
565
                metadata={"s_vlan": 5},
566
                status=EntityStatus.DOWN,
567
            ),
568
            get_link_mocked(
569
                endpoint_a_port=11,
570
                endpoint_b_port=12,
571
                metadata={"s_vlan": 6},
572
                status=EntityStatus.UP,
573
            ),
574
        ]
575 1
        backup_path = [
576
            get_link_mocked(
577
                endpoint_a_port=9,
578
                endpoint_b_port=14,
579
                metadata={"s_vlan": 5},
580
                status=EntityStatus.DOWN,
581
            ),
582
            get_link_mocked(
583
                endpoint_a_port=15,
584
                endpoint_b_port=12,
585
                metadata={"s_vlan": 6},
586
                status=EntityStatus.UP,
587
            ),
588
        ]
589 1
        attributes = {
590
            "controller": get_controller_mock(),
591
            "name": "circuit_11",
592
            "uni_a": get_uni_mocked(is_valid=True),
593
            "uni_z": get_uni_mocked(is_valid=True),
594
            "primary_path": primary_path,
595
            "backup_path": backup_path,
596
            "enabled": True,
597
            "dynamic_backup_path": True,
598
        }
599
600 1
        evc = EVC(**attributes)
601
602 1
        evc.current_path = Path([])
603 1
        deploy_to_path_mocked.reset_mock()
604 1
        current_handle_link_up = evc.handle_link_up(backup_path[0])
605
606 1
        assert get_best_path_mocked.call_count == 0
607 1
        assert deploy_mocked.call_count == 0
608 1
        assert deploy_to_path_mocked.call_count == 1
609 1
        deploy_to_path_mocked.assert_called_once_with(evc.backup_path)
610 1
        assert current_handle_link_up
611
612 1
    @patch("napps.kytos.mef_eline.models.evc.EVCDeploy.deploy_to_path")
613 1
    @patch(GET_BEST_PATH)
614 1
    @patch("napps.kytos.mef_eline.models.evc.EVC._install_nni_flows")
615 1
    @patch("napps.kytos.mef_eline.models.evc.EVC._install_uni_flows")
616 1
    @patch("napps.kytos.mef_eline.models.path.Path.status", EntityStatus.DOWN)
617 1
    async def test_handle_link_up_case_4(self, *args):
618
        """Test if not path is found a dynamic path is used."""
619 1
        (
620
            _install_uni_flows_mocked,
621
            _install_nni_flows_mocked,
622
            get_best_path_mocked,
623
            deploy_to_path_mocked,
624
        ) = args
625
626 1
        deploy_to_path_mocked.return_value = True
627
628 1
        primary_path = [
629
            get_link_mocked(
630
                endpoint_a_port=9,
631
                endpoint_b_port=10,
632
                metadata={"s_vlan": 5},
633
                status=EntityStatus.UP,
634
            ),
635
            get_link_mocked(
636
                endpoint_a_port=11,
637
                endpoint_b_port=12,
638
                metadata={"s_vlan": 6},
639
                status=EntityStatus.DOWN,
640
            ),
641
        ]
642 1
        backup_path = [
643
            get_link_mocked(
644
                endpoint_a_port=13,
645
                endpoint_b_port=14,
646
                metadata={"s_vlan": 5},
647
                status=EntityStatus.DOWN,
648
            ),
649
            get_link_mocked(
650
                endpoint_a_port=11,
651
                endpoint_b_port=12,
652
                metadata={"s_vlan": 6},
653
                status=EntityStatus.DOWN,
654
            ),
655
        ]
656
657
        # Setup best_path mock
658 1
        best_path = Path()
659 1
        best_path.append(primary_path[0])
660 1
        get_best_path_mocked.return_value = best_path
661
662 1
        attributes = {
663
            "controller": get_controller_mock(),
664
            "name": "circuit_12",
665
            "uni_a": get_uni_mocked(is_valid=True),
666
            "uni_z": get_uni_mocked(is_valid=True),
667
            "primary_path": primary_path,
668
            "backup_path": backup_path,
669
            "enabled": True,
670
            "dynamic_backup_path": True,
671
        }
672
673 1
        evc = EVC(**attributes)
674 1
        evc.current_path = Path([])
675
676 1
        deploy_to_path_mocked.reset_mock()
677 1
        current_handle_link_up = evc.handle_link_up(backup_path[0])
678
679 1
        assert get_best_path_mocked.call_count == 0
680 1
        assert deploy_to_path_mocked.call_count == 1
681 1
        deploy_to_path_mocked.assert_called_once_with()
682 1
        assert current_handle_link_up
683
684 1
    async def test_handle_link_up_case_5(self):
685
        """Test handle_link_up method."""
686 1
        return_false_mock = MagicMock(return_value=False)
687 1
        self.evc.is_using_primary_path = return_false_mock
688 1
        self.evc.primary_path.is_affected_by_link = return_false_mock
689 1
        self.evc.is_using_backup_path = MagicMock(return_value=True)
690 1
        assert self.evc.handle_link_up(MagicMock())
691
692
        # not possible to deploy this evc (it will not benefit from link up)
693 1
        self.evc.is_using_backup_path = return_false_mock
694 1
        self.evc.is_using_dynamic_path = return_false_mock
695 1
        self.evc.backup_path.is_affected_by_link = return_false_mock
696 1
        self.evc.dynamic_backup_path = True
697 1
        self.evc.deploy_to_path = return_false_mock
698 1
        assert not self.evc.handle_link_up(MagicMock())
699
700 1
    async def test_handle_link_up_case_6(self):
701
        """Test handle_link_up method."""
702
        # not possible to deploy this evc (it will not benefit from link up)
703 1
        return_false_mock = MagicMock(return_value=False)
704 1
        return_true_mock = MagicMock(return_value=True)
705 1
        self.evc.is_using_primary_path = return_false_mock
706 1
        self.evc.primary_path.is_affected_by_link = return_false_mock
707 1
        self.evc.is_using_backup_path = return_false_mock
708 1
        self.evc.is_using_dynamic_path = return_false_mock
709 1
        self.evc.backup_path.is_affected_by_link = return_false_mock
710 1
        self.evc.dynamic_backup_path = True
711 1
        self.evc.deploy_to_path = return_false_mock
712 1
        assert not self.evc.handle_link_up(MagicMock())
713
714 1
        self.evc.is_enabled = return_true_mock
715 1
        self.evc.is_active = return_false_mock
716 1
        assert not self.evc.handle_link_up(MagicMock())
717
718 1
        self.evc.is_enabled = return_false_mock
719 1
        self.evc.is_active = return_true_mock
720 1
        assert not self.evc.handle_link_up(MagicMock())
721
722 1
        self.evc.is_enabled = return_true_mock
723 1
        self.evc.is_active = return_true_mock
724 1
        assert self.evc.handle_link_up(MagicMock())
725
726 1
        self.evc.is_enabled = return_false_mock
727 1
        self.evc.is_active = return_false_mock
728 1
        self.evc.is_intra_switch = return_true_mock
729 1
        assert self.evc.handle_link_up(MagicMock())
730
731 1
        self.evc.is_enabled = return_false_mock
732 1
        self.evc.is_active = return_false_mock
733 1
        self.evc.is_intra_switch = return_false_mock
734 1
        self.evc.is_using_primary_path = return_true_mock
735 1
        assert self.evc.handle_link_up(MagicMock())
736
737 1
    async def test_get_interface_from_switch(self):
738
        """Test get_interface_from_switch"""
739 1
        interface = id_to_interface_mock('00:01:1')
740 1
        interface.switch.interfaces = {1: interface}
741 1
        switches = {
742
            '00:01': interface.switch
743
        }
744 1
        uni = get_uni_mocked(is_valid=True, switch_dpid='00:01')
745 1
        actual_interface = self.evc.get_interface_from_switch(uni, switches)
746 1
        assert interface == actual_interface
747
748 1
    async def test_are_unis_active(self):
749
        """Test are_unis_active"""
750 1
        interface = id_to_interface_mock('00:01:1')
751
752 1
        interface.switch.interfaces = {1: interface}
753 1
        switches = {
754
            'custom_switch_dpid': interface.switch
755
        }
756
757 1
        interface.status = EntityStatus.UP
758 1
        assert self.evc.are_unis_active(switches) is True
759
760 1
        interface.status = EntityStatus.DOWN
761 1
        assert self.evc.are_unis_active(switches) is False
762
763 1
        interface.status = EntityStatus.DISABLED
764 1
        assert self.evc.are_unis_active(switches) is False
765
766 1
    async def test_is_uni_interface_active(self):
767
        """Test is_uni_interface_active"""
768 1
        interface_a = id_to_interface_mock('00:01:1')
769 1
        interface_a.status_reason = set()
770 1
        interface_z = id_to_interface_mock('00:03:1')
771 1
        interface_z.status_reason = set()
772
773 1
        interface_a.status = EntityStatus.UP
774 1
        interface_z.status = EntityStatus.UP
775 1
        actual = self.evc.is_uni_interface_active(interface_a, interface_z)
776 1
        interfaces = {
777
            '00:01:1': {"status": "UP", "status_reason": set()},
778
            '00:03:1': {"status": "UP", "status_reason": set()},
779
        }
780 1
        expected = (True, interfaces)
781 1
        assert actual == expected
782
783 1
        interface_a.status = EntityStatus.DOWN
784 1
        actual = self.evc.is_uni_interface_active(interface_a, interface_z)
785 1
        interfaces = {
786
            '00:01:1': {'status': 'DOWN', 'status_reason': set()}
787
        }
788 1
        expected = (False, interfaces)
789 1
        assert actual == expected
790
791 1
        interface_a.status = EntityStatus.UP
792 1
        interface_z.status = EntityStatus.DOWN
793 1
        actual = self.evc.is_uni_interface_active(interface_a, interface_z)
794 1
        interfaces = {
795
            '00:03:1': {'status': 'DOWN', 'status_reason': set()}
796
        }
797 1
        expected = (False, interfaces)
798 1
        assert actual == expected
799
800 1
    async def test_handle_interface_link(self):
801
        """
802
        Test Interface Link Up
803
        """
804 1
        return_false_mock = MagicMock(return_value=False)
805 1
        return_true_mock = MagicMock(return_value=True)
806 1
        interface_a = self.evc.uni_a.interface
807 1
        interface_a.enable()
808 1
        interface_b = self.evc.uni_z.interface
809 1
        interface_b.enable()
810
811 1
        self.evc.activate = MagicMock()
812 1
        self.evc.deactivate = MagicMock()
813 1
        self.evc.sync = MagicMock()
814
815
        # Test do nothing
816 1
        self.evc.is_active = return_true_mock
817
818 1
        self.evc.handle_interface_link_up(interface_a)
819
820 1
        self.evc.activate.assert_not_called()
821 1
        self.evc.sync.assert_not_called()
822
823
        # Test deactivating
824 1
        interface_a.deactivate()
825
826 1
        self.evc.handle_interface_link_down(interface_a)
827
828 1
        self.evc.deactivate.assert_called_once()
829 1
        self.evc.sync.assert_called_once()
830
831
        # Test do nothing
832 1
        self.evc.is_active = return_false_mock
833
834 1
        self.evc.handle_interface_link_down(interface_a)
835
836 1
        self.evc.deactivate.assert_called_once()
837 1
        self.evc.sync.assert_called_once()
838
839
        # Test activating
840 1
        interface_a.activate()
841
842 1
        self.evc.handle_interface_link_up(interface_a)
843
844 1
        self.evc.activate.assert_called_once()
845
        assert self.evc.sync.call_count == 2
846