Passed
Push — master ( 5cb22e...3e1f49 )
by Vinicius
12:54 queued 10:13
created

)   A

Complexity

Conditions 1

Size

Total Lines 29
Code Lines 25

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 25
CRAP Score 1

Importance

Changes 0
Metric Value
cc 1
eloc 25
nop 2
dl 0
loc 29
ccs 25
cts 25
cp 1
crap 1
rs 9.28
c 0
b 0
f 0
1
"""Module to test the LinkProtection class."""
2 1
import sys
3 1
from unittest.mock import MagicMock, patch
4
5 1
from kytos.core.common import EntityStatus
6 1
from kytos.lib.helpers import get_controller_mock
7 1
from napps.kytos.mef_eline.models import EVC, Path  # NOQA pycodestyle
8 1
from napps.kytos.mef_eline.tests.helpers import (
9
    get_link_mocked,
10
    get_uni_mocked,
11
    get_mocked_requests,
12
    id_to_interface_mock
13
)  # NOQA pycodestyle
14
15
16 1
sys.path.insert(0, "/var/lib/kytos/napps/..")
17
18
19 1
DEPLOY_TO_PRIMARY_PATH = (
20
    "napps.kytos.mef_eline.models.evc.LinkProtection.deploy_to_primary_path"
21
)
22 1
DEPLOY_TO_BACKUP_PATH = (
23
    "napps.kytos.mef_eline.models.evc.LinkProtection.deploy_to_backup_path"
24
)
25 1
GET_BEST_PATH = (
26
    "napps.kytos.mef_eline.models.path.DynamicPathManager.get_best_path"
27
)
28
29
30 1
class TestLinkProtection():  # pylint: disable=too-many-public-methods
31
    """Tests to validate LinkProtection class."""
32
33 1
    def setup_method(self):
34
        """Set up method"""
35 1
        primary_path = [
36
            get_link_mocked(
37
                endpoint_a_port=9,
38
                endpoint_b_port=10,
39
                metadata={"s_vlan": 5},
40
                status=EntityStatus.UP,
41
            ),
42
            get_link_mocked(
43
                endpoint_a_port=11,
44
                endpoint_b_port=12,
45
                metadata={"s_vlan": 6},
46
                status=EntityStatus.DOWN,
47
            ),
48
        ]
49 1
        backup_path = [
50
            get_link_mocked(
51
                endpoint_a_port=13,
52
                endpoint_b_port=14,
53
                metadata={"s_vlan": 5},
54
                status=EntityStatus.DOWN,
55
            ),
56
            get_link_mocked(
57
                endpoint_a_port=11,
58
                endpoint_b_port=12,
59
                metadata={"s_vlan": 6},
60
                status=EntityStatus.DOWN,
61
            ),
62
        ]
63 1
        attributes = {
64
            "controller": get_controller_mock(),
65
            "name": "circuit_1",
66
            "uni_a": get_uni_mocked(is_valid=True),
67
            "uni_z": get_uni_mocked(is_valid=True),
68
            "primary_path": primary_path,
69
            "backup_path": backup_path,
70
            "enabled": True,
71
            "dynamic_backup_path": True,
72
        }
73 1
        self.evc = EVC(**attributes)
74
75 1
    async def test_is_using_backup_path(self):
76
        """Test test is using backup path."""
77
78 1
        attributes = {
79
            "controller": get_controller_mock(),
80
            "name": "circuit_1",
81
            "uni_a": get_uni_mocked(is_valid=True),
82
            "uni_z": get_uni_mocked(is_valid=True),
83
            "backup_path": [
84
                get_link_mocked(
85
                    endpoint_a_port=10,
86
                    endpoint_b_port=9,
87
                    metadata={"s_vlan": 5},
88
                ),
89
                get_link_mocked(
90
                    endpoint_a_port=12,
91
                    endpoint_b_port=11,
92
                    metadata={"s_vlan": 6},
93
                ),
94
            ],
95
        }
96
97 1
        evc = EVC(**attributes)
98 1
        assert evc.is_using_backup_path() is False
99 1
        evc.current_path = evc.backup_path
100 1
        assert evc.is_using_backup_path()
101
102 1
    async def test_is_using_primary_path(self):
103
        """Test test is using primary path."""
104 1
        primary_path = [
105
            get_link_mocked(
106
                endpoint_a_port=10, endpoint_b_port=9, metadata={"s_vlan": 5}
107
            ),
108
            get_link_mocked(
109
                endpoint_a_port=12, endpoint_b_port=11, metadata={"s_vlan": 6}
110
            ),
111
        ]
112
113 1
        attributes = {
114
            "controller": get_controller_mock(),
115
            "name": "circuit_2",
116
            "uni_a": get_uni_mocked(is_valid=True),
117
            "uni_z": get_uni_mocked(is_valid=True),
118
            "primary_path": primary_path,
119
        }
120 1
        evc = EVC(**attributes)
121 1
        assert evc.is_using_primary_path() is False
122 1
        evc.current_path = evc.primary_path
123 1
        assert evc.is_using_primary_path()
124
125 1
    @patch("napps.kytos.mef_eline.models.evc.log")
126 1
    async def test_deploy_to_case_1(self, log_mocked):
127
        """Test if the path is equal to current_path."""
128 1
        primary_path = [
129
            get_link_mocked(
130
                endpoint_a_port=10, endpoint_b_port=9, metadata={"s_vlan": 5}
131
            ),
132
            get_link_mocked(
133
                endpoint_a_port=12, endpoint_b_port=11, metadata={"s_vlan": 6}
134
            ),
135
        ]
136 1
        attributes = {
137
            "controller": get_controller_mock(),
138
            "name": "circuit_3",
139
            "uni_a": get_uni_mocked(is_valid=True),
140
            "uni_z": get_uni_mocked(is_valid=True),
141
            "primary_path": primary_path,
142
        }
143 1
        evc = EVC(**attributes)
144 1
        evc.current_path = evc.primary_path
145
146 1
        expected_deployed = evc.deploy_to("primary_path", evc.primary_path)
147 1
        expected_msg = "primary_path is equal to current_path."
148 1
        log_mocked.debug.assert_called_with(expected_msg)
149 1
        assert expected_deployed
150
151
    # pylint: disable=too-many-arguments
152 1
    @patch("requests.post")
153 1
    @patch("napps.kytos.mef_eline.controllers.ELineController.upsert_evc")
154 1
    @patch("napps.kytos.mef_eline.models.evc.EVCDeploy.deploy")
155 1
    @patch("napps.kytos.mef_eline.models.evc.EVC._install_nni_flows")
156 1
    @patch("napps.kytos.mef_eline.models.evc.EVC._install_uni_flows")
157 1
    @patch("napps.kytos.mef_eline.models.path.Path.status", EntityStatus.UP)
158 1
    async def test_deploy_to_case_2(
159
        self,
160
        install_uni_flows_mocked,
161
        install_nni_flows_mocked,
162
        deploy_mocked,
163
        _,
164
        requests_mock,
165
    ):
166
        """Test deploy with all links up."""
167 1
        deploy_mocked.return_value = True
168 1
        response = MagicMock()
169 1
        response.status_code = 201
170 1
        requests_mock.return_value = response
171
172 1
        primary_path = [
173
            get_link_mocked(status=EntityStatus.UP),
174
            get_link_mocked(status=EntityStatus.UP),
175
        ]
176 1
        attributes = {
177
            "controller": get_controller_mock(),
178
            "name": "circuit_4",
179
            "uni_a": get_uni_mocked(is_valid=True),
180
            "uni_z": get_uni_mocked(is_valid=True),
181
            "primary_path": primary_path,
182
            "enabled": True,
183
        }
184 1
        evc = EVC(**attributes)
185
186 1
        deployed = evc.deploy_to("primary_path", evc.primary_path)
187 1
        install_uni_flows_mocked.assert_called_with(evc.primary_path)
188 1
        install_nni_flows_mocked.assert_called_with(evc.primary_path)
189 1
        assert deployed
190
191 1
    @patch("requests.get", side_effect=get_mocked_requests)
192 1
    async def test_deploy_to_case_3(self, requests_mocked):
193
        # pylint: disable=unused-argument
194
        """Test deploy with one link down."""
195 1
        link1 = get_link_mocked()
196 1
        link2 = get_link_mocked()
197 1
        link1.id = "abc"
198 1
        link2.id = "def"
199 1
        primary_path = [link1, link2]
200 1
        attributes = {
201
            "controller": get_controller_mock(),
202
            "name": "circuit_5",
203
            "uni_a": get_uni_mocked(is_valid=True),
204
            "uni_z": get_uni_mocked(is_valid=True),
205
            "primary_path": primary_path,
206
            "enabled": True,
207
        }
208 1
        evc = EVC(**attributes)
209
210 1
        deployed = evc.deploy_to("primary_path", evc.primary_path)
211 1
        assert deployed is False
212
213 1
    @patch("napps.kytos.mef_eline.models.evc.log")
214 1
    @patch("napps.kytos.mef_eline.models.evc.EVCDeploy._send_flow_mods")
215 1
    @patch(DEPLOY_TO_BACKUP_PATH)
216 1
    @patch("napps.kytos.mef_eline.models.evc.EVCDeploy.deploy")
217 1
    @patch("napps.kytos.mef_eline.models.path.Path.status")
218 1
    async def test_handle_link_down_case_1(
219
        self,
220
        path_status_mocked,
221
        deploy_mocked,
222
        deploy_to_mocked,
223
        _send_flow_mods_mocked,
224
        log_mocked,
225
    ):
226
        """Test if deploy_to backup path is called."""
227 1
        deploy_mocked.return_value = True
228 1
        path_status_mocked.side_effect = [EntityStatus.DOWN, EntityStatus.UP]
229
230 1
        self.evc.current_path = self.evc.primary_path
231 1
        self.evc.activate()
232 1
        deploy_to_mocked.reset_mock()
233 1
        current_handle_link_down = self.evc.handle_link_down()
234 1
        assert deploy_mocked.call_count == 0
235 1
        deploy_to_mocked.assert_called_once()
236
237 1
        assert current_handle_link_down
238 1
        msg = f"{self.evc} deployed after link down."
239 1
        log_mocked.debug.assert_called_once_with(msg)
240
241 1 View Code Duplication
    @patch("napps.kytos.mef_eline.models.evc.log")
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
242 1
    @patch("napps.kytos.mef_eline.models.evc.EVCDeploy.deploy")
243 1
    @patch(DEPLOY_TO_PRIMARY_PATH)
244 1
    @patch("napps.kytos.mef_eline.models.path.Path.status")
245 1
    async def test_handle_link_down_case_2(
246
        self, path_status_mocked, deploy_to_mocked, deploy_mocked, log_mocked
247
    ):
248
        """Test if deploy_to backup path is called."""
249 1
        deploy_mocked.return_value = True
250 1
        deploy_to_mocked.return_value = True
251 1
        path_status_mocked.side_effect = [EntityStatus.UP, EntityStatus.DOWN]
252 1
        primary_path = [
253
            get_link_mocked(
254
                endpoint_a_port=7,
255
                endpoint_b_port=8,
256
                metadata={"s_vlan": 5},
257
                status=EntityStatus.UP,
258
            ),
259
            get_link_mocked(
260
                endpoint_a_port=11,
261
                endpoint_b_port=12,
262
                metadata={"s_vlan": 6},
263
                status=EntityStatus.UP,
264
            ),
265
        ]
266 1
        backup_path = [
267
            get_link_mocked(
268
                endpoint_a_port=7,
269
                endpoint_b_port=10,
270
                metadata={"s_vlan": 5},
271
                status=EntityStatus.DOWN,
272
            ),
273
            get_link_mocked(
274
                endpoint_a_port=15,
275
                endpoint_b_port=12,
276
                metadata={"s_vlan": 6},
277
                status=EntityStatus.UP,
278
            ),
279
        ]
280 1
        attributes = {
281
            "controller": get_controller_mock(),
282
            "name": "circuit_13",
283
            "uni_a": get_uni_mocked(is_valid=True),
284
            "uni_z": get_uni_mocked(is_valid=True),
285
            "primary_path": primary_path,
286
            "backup_path": backup_path,
287
            "enabled": True,
288
        }
289
290 1
        evc = EVC(**attributes)
291 1
        evc.current_path = evc.backup_path
292 1
        deploy_to_mocked.reset_mock()
293 1
        current_handle_link_down = evc.handle_link_down()
294 1
        assert deploy_mocked.call_count == 0
295 1
        deploy_to_mocked.assert_called_once()
296 1
        assert current_handle_link_down
297 1
        msg = f"{evc} deployed after link down."
298 1
        log_mocked.debug.assert_called_once_with(msg)
299
300 1
    @patch("napps.kytos.mef_eline.controllers.ELineController.upsert_evc")
301 1
    @patch("napps.kytos.mef_eline.models.evc.log")
302 1
    @patch("napps.kytos.mef_eline.models.evc.EVCDeploy.deploy")
303 1
    @patch(DEPLOY_TO_PRIMARY_PATH)
304 1
    @patch("napps.kytos.mef_eline.models.path.DynamicPathManager.get_paths")
305 1
    @patch("napps.kytos.mef_eline.models.path.Path.status", EntityStatus.DOWN)
306 1
    async def test_handle_link_down_case_3(
307
        self, get_paths_mocked, deploy_to_mocked, deploy_mocked, log_mocked, _
308
    ):
309
        """Test if circuit without dynamic path is return failed."""
310 1
        deploy_mocked.return_value = False
311 1
        deploy_to_mocked.return_value = False
312 1
        primary_path = [
313
            get_link_mocked(
314
                endpoint_a_port=9,
315
                endpoint_b_port=10,
316
                metadata={"s_vlan": 5},
317
                status=EntityStatus.DOWN,
318
            ),
319
            get_link_mocked(
320
                endpoint_a_port=11,
321
                endpoint_b_port=12,
322
                metadata={"s_vlan": 6},
323
                status=EntityStatus.UP,
324
            ),
325
        ]
326 1
        backup_path = [
327
            get_link_mocked(
328
                endpoint_a_port=9,
329
                endpoint_b_port=10,
330
                metadata={"s_vlan": 5},
331
                status=EntityStatus.DOWN,
332
            ),
333
            get_link_mocked(
334
                endpoint_a_port=13,
335
                endpoint_b_port=14,
336
                metadata={"s_vlan": 6},
337
                status=EntityStatus.UP,
338
            ),
339
        ]
340 1
        attributes = {
341
            "controller": get_controller_mock(),
342
            "name": "circuit_7",
343
            "uni_a": get_uni_mocked(is_valid=True),
344
            "uni_z": get_uni_mocked(is_valid=True),
345
            "primary_path": primary_path,
346
            "backup_path": backup_path,
347
            "enabled": True,
348
        }
349
350 1
        evc = EVC(**attributes)
351 1
        evc.current_path = evc.backup_path
352 1
        deploy_to_mocked.reset_mock()
353 1
        current_handle_link_down = evc.handle_link_down()
354
355 1
        assert get_paths_mocked.call_count == 0
356 1
        assert deploy_mocked.call_count == 0
357 1
        assert deploy_to_mocked.call_count == 1
358
359 1
        assert current_handle_link_down is False
360 1
        msg = f"Failed to re-deploy {evc} after link down."
361 1
        log_mocked.debug.assert_called_once_with(msg)
362
363 1 View Code Duplication
    @patch("napps.kytos.mef_eline.models.evc.log")
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
364 1
    @patch("napps.kytos.mef_eline.models.evc.EVCDeploy.deploy_to_path")
365 1
    @patch("napps.kytos.mef_eline.models.evc.EVCDeploy._send_flow_mods")
366 1
    @patch(DEPLOY_TO_PRIMARY_PATH)
367 1
    @patch("napps.kytos.mef_eline.models.path.Path.status", EntityStatus.DOWN)
368 1
    async def test_handle_link_down_case_4(
369
        self,
370
        deploy_to_mocked,
371
        _send_flow_mods_mocked,
372
        deploy_mocked,
373
        log_mocked,
374
    ):
375
        """Test if circuit with dynamic path is return success."""
376 1
        deploy_mocked.return_value = True
377 1
        deploy_to_mocked.return_value = False
378 1
        primary_path = [
379
            get_link_mocked(
380
                endpoint_a_port=9,
381
                endpoint_b_port=10,
382
                metadata={"s_vlan": 5},
383
                status=EntityStatus.DOWN,
384
            ),
385
            get_link_mocked(
386
                endpoint_a_port=11,
387
                endpoint_b_port=12,
388
                metadata={"s_vlan": 6},
389
                status=EntityStatus.UP,
390
            ),
391
        ]
392 1
        backup_path = [
393
            get_link_mocked(
394
                endpoint_a_port=9,
395
                endpoint_b_port=10,
396
                metadata={"s_vlan": 5},
397
                status=EntityStatus.DOWN,
398
            ),
399
            get_link_mocked(
400
                endpoint_a_port=13,
401
                endpoint_b_port=14,
402
                metadata={"s_vlan": 6},
403
                status=EntityStatus.UP,
404
            ),
405
        ]
406 1
        attributes = {
407
            "controller": get_controller_mock(),
408
            "name": "circuit_8",
409
            "uni_a": get_uni_mocked(is_valid=True),
410
            "uni_z": get_uni_mocked(is_valid=True),
411
            "primary_path": primary_path,
412
            "backup_path": backup_path,
413
            "enabled": True,
414
            "dynamic_backup_path": True,
415
        }
416
417 1
        evc = EVC(**attributes)
418 1
        evc.current_path = evc.backup_path
419
420 1
        deploy_to_mocked.reset_mock()
421 1
        current_handle_link_down = evc.handle_link_down()
422 1
        assert deploy_to_mocked.call_count == 1
423
424 1
        assert current_handle_link_down
425 1
        msg = f"{evc} deployed after link down."
426 1
        log_mocked.debug.assert_called_with(msg)
427
428 1
    @patch("napps.kytos.mef_eline.models.evc.EVCDeploy.deploy")
429 1
    @patch("napps.kytos.mef_eline.models.evc.LinkProtection.deploy_to")
430 1
    async def test_handle_link_up_case_1(
431
        self,
432
        deploy_to_mocked,
433
        deploy_mocked
434
    ):
435
        """Test if handle link up do nothing when is using primary path."""
436 1
        deploy_mocked.return_value = True
437 1
        deploy_to_mocked.return_value = True
438 1
        primary_path = [
439
            get_link_mocked(
440
                endpoint_a_port=9,
441
                endpoint_b_port=10,
442
                metadata={"s_vlan": 5},
443
                status=EntityStatus.UP,
444
            ),
445
            get_link_mocked(
446
                endpoint_a_port=11,
447
                endpoint_b_port=12,
448
                metadata={"s_vlan": 6},
449
                status=EntityStatus.UP,
450
            ),
451
        ]
452 1
        backup_path = [
453
            get_link_mocked(
454
                endpoint_a_port=9,
455
                endpoint_b_port=14,
456
                metadata={"s_vlan": 5},
457
                status=EntityStatus.UP,
458
            ),
459
            get_link_mocked(
460
                endpoint_a_port=15,
461
                endpoint_b_port=12,
462
                metadata={"s_vlan": 6},
463
                status=EntityStatus.UP,
464
            ),
465
        ]
466 1
        attributes = {
467
            "controller": get_controller_mock(),
468
            "name": "circuit_9",
469
            "uni_a": get_uni_mocked(is_valid=True),
470
            "uni_z": get_uni_mocked(is_valid=True),
471
            "primary_path": primary_path,
472
            "backup_path": backup_path,
473
            "enabled": True,
474
            "dynamic_backup_path": True,
475
        }
476
477 1
        evc = EVC(**attributes)
478 1
        evc.current_path = evc.primary_path
479 1
        deploy_to_mocked.reset_mock()
480 1
        current_handle_link_up = evc.handle_link_up(backup_path[0])
481 1
        assert deploy_mocked.call_count == 0
482 1
        assert deploy_to_mocked.call_count == 0
483 1
        assert current_handle_link_up
484
485 1
    @patch("napps.kytos.mef_eline.models.evc.EVCDeploy.deploy")
486 1
    @patch("napps.kytos.mef_eline.models.evc.EVCDeploy.deploy_to_path")
487 1
    @patch("napps.kytos.mef_eline.models.path.Path.status", EntityStatus.UP)
488 1
    async def test_handle_link_up_case_2(
489
        self,
490
        deploy_to_path_mocked,
491
        deploy_mocked
492
    ):
493
        """Test if it is changing from backup_path to primary_path."""
494 1
        deploy_mocked.return_value = True
495 1
        deploy_to_path_mocked.return_value = True
496 1
        primary_path = [
497
            get_link_mocked(
498
                endpoint_a_port=9,
499
                endpoint_b_port=10,
500
                metadata={"s_vlan": 5},
501
                status=EntityStatus.UP,
502
            ),
503
            get_link_mocked(
504
                endpoint_a_port=11,
505
                endpoint_b_port=12,
506
                metadata={"s_vlan": 6},
507
                status=EntityStatus.UP,
508
            ),
509
        ]
510 1
        backup_path = [
511
            get_link_mocked(
512
                endpoint_a_port=9,
513
                endpoint_b_port=14,
514
                metadata={"s_vlan": 5},
515
                status=EntityStatus.UP,
516
            ),
517
            get_link_mocked(
518
                endpoint_a_port=15,
519
                endpoint_b_port=12,
520
                metadata={"s_vlan": 6},
521
                status=EntityStatus.UP,
522
            ),
523
        ]
524 1
        attributes = {
525
            "controller": get_controller_mock(),
526
            "name": "circuit_10",
527
            "uni_a": get_uni_mocked(is_valid=True),
528
            "uni_z": get_uni_mocked(is_valid=True),
529
            "primary_path": primary_path,
530
            "backup_path": backup_path,
531
            "enabled": True,
532
            "dynamic_backup_path": True,
533
        }
534
535 1
        evc = EVC(**attributes)
536 1
        evc.current_path = evc.backup_path
537 1
        deploy_to_path_mocked.reset_mock()
538 1
        current_handle_link_up = evc.handle_link_up(primary_path[0])
539 1
        assert deploy_mocked.call_count == 0
540 1
        assert deploy_to_path_mocked.call_count == 1
541 1
        deploy_to_path_mocked.assert_called_once_with(evc.primary_path)
542 1
        assert current_handle_link_up
543
544 1
    @patch("napps.kytos.mef_eline.models.evc.EVCDeploy.deploy")
545 1
    @patch("napps.kytos.mef_eline.models.evc.EVCDeploy.deploy_to_path")
546 1
    @patch(GET_BEST_PATH)
547 1
    @patch("napps.kytos.mef_eline.models.evc.EVC._install_nni_flows")
548 1
    @patch("napps.kytos.mef_eline.models.evc.EVC._install_uni_flows")
549 1
    @patch("napps.kytos.mef_eline.models.path.Path.status", EntityStatus.UP)
550 1
    async def test_handle_link_up_case_3(
551
        self,
552
        _install_uni_flows_mocked,
553
        _install_nni_flows_mocked,
554
        get_best_path_mocked,
555
        deploy_to_path_mocked,
556
        deploy_mocked,
557
    ):
558
        """Test if it is deployed after the backup is up."""
559 1
        deploy_mocked.return_value = True
560 1
        deploy_to_path_mocked.return_value = True
561 1
        primary_path = [
562
            get_link_mocked(
563
                endpoint_a_port=9,
564
                endpoint_b_port=10,
565
                metadata={"s_vlan": 5},
566
                status=EntityStatus.DOWN,
567
            ),
568
            get_link_mocked(
569
                endpoint_a_port=11,
570
                endpoint_b_port=12,
571
                metadata={"s_vlan": 6},
572
                status=EntityStatus.UP,
573
            ),
574
        ]
575 1
        backup_path = [
576
            get_link_mocked(
577
                endpoint_a_port=9,
578
                endpoint_b_port=14,
579
                metadata={"s_vlan": 5},
580
                status=EntityStatus.DOWN,
581
            ),
582
            get_link_mocked(
583
                endpoint_a_port=15,
584
                endpoint_b_port=12,
585
                metadata={"s_vlan": 6},
586
                status=EntityStatus.UP,
587
            ),
588
        ]
589 1
        attributes = {
590
            "controller": get_controller_mock(),
591
            "name": "circuit_11",
592
            "uni_a": get_uni_mocked(is_valid=True),
593
            "uni_z": get_uni_mocked(is_valid=True),
594
            "primary_path": primary_path,
595
            "backup_path": backup_path,
596
            "enabled": True,
597
            "dynamic_backup_path": True,
598
        }
599
600 1
        evc = EVC(**attributes)
601
602 1
        evc.current_path = Path([])
603 1
        deploy_to_path_mocked.reset_mock()
604 1
        current_handle_link_up = evc.handle_link_up(backup_path[0])
605
606 1
        assert get_best_path_mocked.call_count == 0
607 1
        assert deploy_mocked.call_count == 0
608 1
        assert deploy_to_path_mocked.call_count == 1
609 1
        deploy_to_path_mocked.assert_called_once_with(evc.backup_path)
610 1
        assert current_handle_link_up
611
612 1
    @patch("napps.kytos.mef_eline.models.evc.EVCDeploy.deploy_to_path")
613 1
    @patch(GET_BEST_PATH)
614 1
    @patch("napps.kytos.mef_eline.models.evc.EVC._install_nni_flows")
615 1
    @patch("napps.kytos.mef_eline.models.evc.EVC._install_uni_flows")
616 1
    @patch("napps.kytos.mef_eline.models.path.Path.status", EntityStatus.DOWN)
617 1
    async def test_handle_link_up_case_4(self, *args):
618
        """Test if not path is found a dynamic path is used."""
619 1
        (
620
            _install_uni_flows_mocked,
621
            _install_nni_flows_mocked,
622
            get_best_path_mocked,
623
            deploy_to_path_mocked,
624
        ) = args
625
626 1
        deploy_to_path_mocked.return_value = True
627
628 1
        primary_path = [
629
            get_link_mocked(
630
                endpoint_a_port=9,
631
                endpoint_b_port=10,
632
                metadata={"s_vlan": 5},
633
                status=EntityStatus.UP,
634
            ),
635
            get_link_mocked(
636
                endpoint_a_port=11,
637
                endpoint_b_port=12,
638
                metadata={"s_vlan": 6},
639
                status=EntityStatus.DOWN,
640
            ),
641
        ]
642 1
        backup_path = [
643
            get_link_mocked(
644
                endpoint_a_port=13,
645
                endpoint_b_port=14,
646
                metadata={"s_vlan": 5},
647
                status=EntityStatus.DOWN,
648
            ),
649
            get_link_mocked(
650
                endpoint_a_port=11,
651
                endpoint_b_port=12,
652
                metadata={"s_vlan": 6},
653
                status=EntityStatus.DOWN,
654
            ),
655
        ]
656
657
        # Setup best_path mock
658 1
        best_path = Path()
659 1
        best_path.append(primary_path[0])
660 1
        get_best_path_mocked.return_value = best_path
661
662 1
        attributes = {
663
            "controller": get_controller_mock(),
664
            "name": "circuit_12",
665
            "uni_a": get_uni_mocked(is_valid=True),
666
            "uni_z": get_uni_mocked(is_valid=True),
667
            "primary_path": primary_path,
668
            "backup_path": backup_path,
669
            "enabled": True,
670
            "dynamic_backup_path": True,
671
        }
672
673 1
        evc = EVC(**attributes)
674 1
        evc.current_path = Path([])
675
676 1
        deploy_to_path_mocked.reset_mock()
677 1
        current_handle_link_up = evc.handle_link_up(backup_path[0])
678
679 1
        assert get_best_path_mocked.call_count == 0
680 1
        assert deploy_to_path_mocked.call_count == 1
681 1
        deploy_to_path_mocked.assert_called_once_with()
682 1
        assert current_handle_link_up
683
684 1
    async def test_handle_link_up_case_5(self):
685
        """Test handle_link_up method."""
686 1
        return_false_mock = MagicMock(return_value=False)
687 1
        self.evc.is_using_primary_path = return_false_mock
688 1
        self.evc.primary_path.is_affected_by_link = return_false_mock
689 1
        self.evc.is_using_backup_path = MagicMock(return_value=True)
690 1
        assert self.evc.handle_link_up(MagicMock())
691
692
        # not possible to deploy this evc (it will not benefit from link up)
693 1
        self.evc.is_using_backup_path = return_false_mock
694 1
        self.evc.is_using_dynamic_path = return_false_mock
695 1
        self.evc.backup_path.is_affected_by_link = return_false_mock
696 1
        self.evc.dynamic_backup_path = True
697 1
        self.evc.deploy_to_path = return_false_mock
698 1
        assert not self.evc.handle_link_up(MagicMock())
699
700
    # pylint: disable=too-many-statements
701 1
    async def test_handle_link_up_case_6(self):
702
        """Test handle_link_up method."""
703
        # not possible to deploy this evc (it will not benefit from link up)
704 1
        return_false_mock = MagicMock(return_value=False)
705 1
        return_true_mock = MagicMock(return_value=True)
706 1
        self.evc.is_using_primary_path = return_false_mock
707 1
        self.evc.primary_path.is_affected_by_link = return_false_mock
708 1
        self.evc.is_using_backup_path = return_false_mock
709 1
        self.evc.is_using_dynamic_path = return_false_mock
710 1
        self.evc.backup_path.is_affected_by_link = return_false_mock
711 1
        self.evc.dynamic_backup_path = True
712
713 1
        self.evc.deploy_to_primary_path = MagicMock(return_value=False)
714 1
        self.evc.deploy_to_backup_path = MagicMock(return_value=False)
715 1
        self.evc.deploy_to_path = MagicMock(return_value=False)
716
717 1
        assert not self.evc.handle_link_up(MagicMock())
718 1
        assert self.evc.deploy_to_path.call_count == 1
719
720 1
        self.evc.is_using_primary_path = return_true_mock
721 1
        assert self.evc.handle_link_up(MagicMock())
722 1
        assert self.evc.deploy_to_path.call_count == 1
723
724 1
        self.evc.is_using_primary_path = return_false_mock
725 1
        self.evc.is_intra_switch = return_true_mock
726 1
        assert self.evc.handle_link_up(MagicMock())
727 1
        assert self.evc.deploy_to_path.call_count == 1
728
729 1
        self.evc.is_using_primary_path = return_false_mock
730 1
        self.evc.is_intra_switch = return_false_mock
731 1
        self.evc.primary_path.is_affected_by_link = return_true_mock
732 1
        assert not self.evc.handle_link_up(MagicMock())
733 1
        assert self.evc.deploy_to_primary_path.call_count == 1
734 1
        assert self.evc.deploy_to_path.call_count == 2
735
736 1
        self.evc.is_using_primary_path = return_false_mock
737 1
        self.evc.is_intra_switch = return_false_mock
738 1
        self.evc.primary_path.is_affected_by_link = return_true_mock
739 1
        self.evc.deploy_to_primary_path.return_value = True
740 1
        assert self.evc.handle_link_up(MagicMock())
741 1
        assert self.evc.deploy_to_primary_path.call_count == 2
742 1
        assert self.evc.deploy_to_path.call_count == 2
743
744 1
        self.evc.is_using_primary_path = return_false_mock
745 1
        self.evc.is_intra_switch = return_false_mock
746 1
        self.evc.primary_path.is_affected_by_link = return_false_mock
747 1
        self.evc.deploy_to_primary_path.return_value = False
748 1
        self.evc.is_using_backup_path = return_true_mock
749 1
        assert self.evc.handle_link_up(MagicMock())
750 1
        assert self.evc.deploy_to_primary_path.call_count == 2
751 1
        assert self.evc.deploy_to_path.call_count == 2
752
753 1
        self.evc.is_using_primary_path = return_false_mock
754 1
        self.evc.is_intra_switch = return_false_mock
755 1
        self.evc.primary_path.is_affected_by_link = return_false_mock
756 1
        self.evc.deploy_to_primary_path.return_value = False
757 1
        self.evc.is_using_backup_path = return_false_mock
758 1
        self.evc.is_using_dynamic_path = return_true_mock
759 1
        assert self.evc.handle_link_up(MagicMock())
760 1
        assert self.evc.deploy_to_primary_path.call_count == 2
761 1
        assert self.evc.deploy_to_path.call_count == 2
762
763 1
        self.evc.is_using_primary_path = return_false_mock
764 1
        self.evc.is_intra_switch = return_false_mock
765 1
        self.evc.primary_path.is_affected_by_link = return_false_mock
766 1
        self.evc.deploy_to_primary_path.return_value = False
767 1
        self.evc.is_using_backup_path = return_false_mock
768 1
        self.evc.is_using_dynamic_path = return_false_mock
769 1
        self.evc.backup_path.is_affected_by_link = return_true_mock
770 1
        assert not self.evc.handle_link_up(MagicMock())
771 1
        assert self.evc.deploy_to_primary_path.call_count == 2
772 1
        assert self.evc.deploy_to_backup_path.call_count == 1
773 1
        assert self.evc.deploy_to_path.call_count == 3
774
775 1
        self.evc.is_using_primary_path = return_false_mock
776 1
        self.evc.is_intra_switch = return_false_mock
777 1
        self.evc.primary_path.is_affected_by_link = return_false_mock
778 1
        self.evc.deploy_to_primary_path.return_value = False
779 1
        self.evc.is_using_backup_path = return_false_mock
780 1
        self.evc.is_using_dynamic_path = return_false_mock
781 1
        self.evc.backup_path.is_affected_by_link = return_true_mock
782 1
        self.evc.deploy_to_backup_path.return_value = True
783 1
        assert self.evc.handle_link_up(MagicMock())
784 1
        assert self.evc.deploy_to_primary_path.call_count == 2
785 1
        assert self.evc.deploy_to_backup_path.call_count == 2
786 1
        assert self.evc.deploy_to_path.call_count == 3
787
788 1
        self.evc.is_using_primary_path = return_false_mock
789 1
        self.evc.is_intra_switch = return_false_mock
790 1
        self.evc.primary_path.is_affected_by_link = return_false_mock
791 1
        self.evc.deploy_to_primary_path.return_value = False
792 1
        self.evc.is_using_backup_path = return_false_mock
793 1
        self.evc.is_using_dynamic_path = return_false_mock
794 1
        self.evc.backup_path.is_affected_by_link = return_false_mock
795 1
        self.evc.deploy_to_backup_path.return_value = False
796 1
        self.evc.dynamic_backup_path = True
797 1
        assert not self.evc.handle_link_up(MagicMock())
798 1
        assert self.evc.deploy_to_primary_path.call_count == 2
799 1
        assert self.evc.deploy_to_backup_path.call_count == 2
800 1
        assert self.evc.deploy_to_path.call_count == 4
801
802 1
        self.evc.is_using_primary_path = return_false_mock
803 1
        self.evc.is_intra_switch = return_false_mock
804 1
        self.evc.primary_path.is_affected_by_link = return_false_mock
805 1
        self.evc.deploy_to_primary_path.return_value = False
806 1
        self.evc.is_using_backup_path = return_false_mock
807 1
        self.evc.is_using_dynamic_path = return_false_mock
808 1
        self.evc.backup_path.is_affected_by_link = return_false_mock
809 1
        self.evc.deploy_to_backup_path.return_value = False
810 1
        self.evc.dynamic_backup_path = True
811 1
        self.evc.deploy_to_path.return_value = True
812 1
        assert self.evc.handle_link_up(MagicMock())
813 1
        assert self.evc.deploy_to_primary_path.call_count == 2
814 1
        assert self.evc.deploy_to_backup_path.call_count == 2
815 1
        assert self.evc.deploy_to_path.call_count == 5
816
817 1
        self.evc.is_using_primary_path = return_false_mock
818 1
        self.evc.is_intra_switch = return_false_mock
819 1
        self.evc.primary_path.is_affected_by_link = return_false_mock
820 1
        self.evc.deploy_to_primary_path.return_value = False
821 1
        self.evc.is_using_backup_path = return_false_mock
822 1
        self.evc.is_using_dynamic_path = return_false_mock
823 1
        self.evc.backup_path.is_affected_by_link = return_false_mock
824 1
        self.evc.deploy_to_backup_path.return_value = False
825 1
        self.evc.dynamic_backup_path = False
826 1
        self.evc.deploy_to_path.return_value = False
827 1
        assert not self.evc.handle_link_up(MagicMock())
828 1
        assert self.evc.deploy_to_primary_path.call_count == 2
829 1
        assert self.evc.deploy_to_backup_path.call_count == 2
830 1
        assert self.evc.deploy_to_path.call_count == 5
831
832 1
    async def test_get_interface_from_switch(self):
833
        """Test get_interface_from_switch"""
834 1
        interface = id_to_interface_mock('00:01:1')
835 1
        interface.switch.interfaces = {1: interface}
836 1
        switches = {
837
            '00:01': interface.switch
838
        }
839 1
        uni = get_uni_mocked(is_valid=True, switch_dpid='00:01')
840 1
        actual_interface = self.evc.get_interface_from_switch(uni, switches)
841 1
        assert interface == actual_interface
842
843 1
    async def test_are_unis_active(self):
844
        """Test are_unis_active"""
845 1
        interface = id_to_interface_mock('00:01:1')
846
847 1
        interface.switch.interfaces = {1: interface}
848 1
        switches = {
849
            'custom_switch_dpid': interface.switch
850
        }
851
852 1
        interface.status = EntityStatus.UP
853 1
        assert self.evc.are_unis_active(switches) is True
854
855 1
        interface.status = EntityStatus.DOWN
856 1
        assert self.evc.are_unis_active(switches) is False
857
858 1
        interface.status = EntityStatus.DISABLED
859 1
        assert self.evc.are_unis_active(switches) is False
860
861 1
    async def test_is_uni_interface_active(self):
862
        """Test is_uni_interface_active"""
863 1
        interface_a = id_to_interface_mock('00:01:1')
864 1
        interface_a.status_reason = set()
865 1
        interface_z = id_to_interface_mock('00:03:1')
866 1
        interface_z.status_reason = set()
867
868 1
        interface_a.status = EntityStatus.UP
869 1
        interface_z.status = EntityStatus.UP
870 1
        actual = self.evc.is_uni_interface_active(interface_a, interface_z)
871 1
        interfaces = {
872
            '00:01:1': {"status": "UP", "status_reason": set()},
873
            '00:03:1': {"status": "UP", "status_reason": set()},
874
        }
875 1
        expected = (True, interfaces)
876 1
        assert actual == expected
877
878 1
        interface_a.status = EntityStatus.DOWN
879 1
        actual = self.evc.is_uni_interface_active(interface_a, interface_z)
880 1
        interfaces = {
881
            '00:01:1': {'status': 'DOWN', 'status_reason': set()}
882
        }
883 1
        expected = (False, interfaces)
884 1
        assert actual == expected
885
886 1
        interface_a.status = EntityStatus.UP
887 1
        interface_z.status = EntityStatus.DOWN
888 1
        actual = self.evc.is_uni_interface_active(interface_a, interface_z)
889 1
        interfaces = {
890
            '00:03:1': {'status': 'DOWN', 'status_reason': set()}
891
        }
892 1
        expected = (False, interfaces)
893 1
        assert actual == expected
894
895 1
    async def test_handle_interface_link(self):
896
        """
897
        Test Interface Link Up
898
        """
899 1
        return_false_mock = MagicMock(return_value=False)
900 1
        return_true_mock = MagicMock(return_value=True)
901 1
        interface_a = self.evc.uni_a.interface
902 1
        interface_a.enable()
903 1
        interface_b = self.evc.uni_z.interface
904 1
        interface_b.enable()
905
906 1
        self.evc.activate = MagicMock()
907 1
        self.evc.deactivate = MagicMock()
908 1
        self.evc.sync = MagicMock()
909
910
        # Test do nothing
911 1
        self.evc.is_active = return_true_mock
912
913 1
        self.evc.handle_interface_link_up(interface_a)
914
915 1
        self.evc.activate.assert_not_called()
916 1
        self.evc.sync.assert_not_called()
917
918
        # Test deactivating
919 1
        interface_a.deactivate()
920
921 1
        self.evc.handle_interface_link_down(interface_a)
922
923 1
        self.evc.deactivate.assert_called_once()
924 1
        self.evc.sync.assert_called_once()
925
926
        # Test do nothing
927 1
        self.evc.is_active = return_false_mock
928
929 1
        self.evc.handle_interface_link_down(interface_a)
930
931 1
        self.evc.deactivate.assert_called_once()
932 1
        self.evc.sync.assert_called_once()
933
934
        # Test activating
935 1
        interface_a.activate()
936
937 1
        self.evc.handle_interface_link_up(interface_a)
938
939 1
        self.evc.activate.assert_called_once()
940
        assert self.evc.sync.call_count == 2
941