Test Failed
Pull Request — master (#70)
by macartur
05:46 queued 03:30
created

build.tests.test_models   C

Complexity

Total Complexity 56

Size/Duplication

Total Lines 1084
Duplicated Lines 81 %

Importance

Changes 0
Metric Value
eloc 847
dl 878
loc 1084
rs 5.273
c 0
b 0
f 0
wmc 56

44 Methods

Rating   Name   Duplication   Size   Complexity  
A TestEVC.test_update_name() 15 15 2
A TestEVC.test_attributes_empty() 0 7 2
A TestEVC.test_update_uni_z() 15 15 2
A TestEVC.test_update_uni_a() 15 15 2
A TestEVC.test_circuit_representation() 0 10 1
A TestEVC.test_without_uni_a() 0 7 2
A TestEVC.test_primary_links_zipped() 0 3 1
A TestEVC.test_handle_link_up_case_4() 0 40 1
A TestEVC.test_deploy_successfully() 39 39 1
B TestEVC.test_handle_link_down_case_1() 42 42 1
B TestEVC.test_handle_link_down_case_2() 42 42 1
A TestEVC.test_deploy_fail() 39 39 1
B TestEVC.test_handle_link_down_case_4() 0 44 1
A TestEVC.test_handle_link_up_case_3() 0 40 1
A TestEVC.test_handle_link_up_case_2() 40 40 1
A TestEVC.test_handle_link_up_case_1() 38 38 1
B TestEVC.test_handle_link_down_case_3() 43 43 1
A TestEVC.test_prepare_pop_flow() 22 22 1
A TestPath.test_status_case_3() 8 8 1
A TestEVC.test_with_invalid_uni_z() 11 11 2
A TestEVC.test_deploy_to_case_2() 20 20 1
A TestEVC.test_should_deploy_case3() 14 14 1
A TestEVC.test_should_deploy_case4() 14 14 1
A TestEVC.test_deploy_to_case_3() 16 16 1
A TestEVC.test_without_uni_z() 10 10 2
A TestEVC.test_should_deploy_case1() 14 14 1
A TestPath.test_status_case_4() 8 8 1
A TestEVC.test_send_flow_mods() 11 11 1
A TestEVC.test_prepare_push_flow() 29 29 1
A TestEVC.test_is_using_primary_path() 20 20 1
B TestEVC.test_install_uni_flows() 76 76 1
B TestEVC.test_as_dict() 64 64 5
A TestPath.test_status_case_2() 8 8 1
A TestEVC.test_prepare_flow_mod() 13 13 1
A TestEVC.test_should_deploy_case2() 14 14 1
A TestEVC.test_deploy_to_case_1() 22 22 1
A TestEVC.test_comparison_method() 22 22 1
A TestEVC.test_with_invalid_uni_a() 10 10 2
A TestEVC.test_is_using_backup_path() 20 20 1
A TestPath.test_compare_same_paths() 12 12 1
A TestPath.test_compare_different_paths() 18 18 1
A TestEVC.test_install_nni_flows() 51 51 1
A TestPath.test_status_case_1() 4 4 1
A TestPath.test_as_dict() 10 10 1

How to fix   Duplicated Code    Complexity   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

Complexity

 Tip:   Before tackling complexity, make sure that you eliminate any duplication first. This often can reduce the size of classes significantly.

Complex classes like build.tests.test_models often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""Module to test the schedule.py file."""
2
from unittest import TestCase
3
from unittest.mock import Mock, patch
4
5
from kytos.core.interface import UNI, Interface
6
from kytos.core.switch import Switch
7
from kytos.core.common import EntityStatus
8
9
from napps.kytos.mef_eline.models import EVC, Path
10
from napps.kytos.mef_eline.settings import MANAGER_URL
11
from .helpers import get_link_mocked, get_uni_mocked
12
13
14 View Code Duplication
class TestPath(TestCase):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
15
    """"Class to test path methods."""
16
17
    def test_status_case_1(self):
18
        """Test if empty link is DISABLED."""
19
        current_path = Path()
20
        self.assertEqual(current_path.status, EntityStatus.DISABLED)
21
22
    def test_status_case_2(self):
23
        """Test if link status is DOWN."""
24
        links = [
25
                 get_link_mocked(status=EntityStatus.DOWN),
26
                 get_link_mocked(status=EntityStatus.UP)
27
        ]
28
        current_path = Path(links)
29
        self.assertEqual(current_path.status, EntityStatus.DOWN)
30
31
    def test_status_case_3(self):
32
        """Test if link status is DISABLED."""
33
        links = [
34
                 get_link_mocked(status=EntityStatus.DISABLED),
35
                 get_link_mocked(status=EntityStatus.UP)
36
        ]
37
        current_path = Path(links)
38
        self.assertEqual(current_path.status, EntityStatus.DISABLED)
39
40
    def test_status_case_4(self):
41
        """Test if link status is UP."""
42
        links = [
43
                 get_link_mocked(status=EntityStatus.UP),
44
                 get_link_mocked(status=EntityStatus.UP)
45
        ]
46
        current_path = Path(links)
47
        self.assertEqual(current_path.status, EntityStatus.UP)
48
49
    def test_compare_same_paths(self):
50
        """Test compare paths with same links."""
51
        links = [
52
                get_link_mocked(endpoint_a_port=9, endpoint_b_port=10,
53
                                metadata={"s_vlan": 5}),
54
                get_link_mocked(endpoint_a_port=11, endpoint_b_port=12,
55
                                metadata={"s_vlan": 6})
56
            ]
57
58
        path_1 = Path(links)
59
        path_2 = Path(links)
60
        self.assertEqual(path_1, path_2)
61
62
    def test_compare_different_paths(self):
63
        """Test compare paths with different links."""
64
        links_1 = [
65
                get_link_mocked(endpoint_a_port=9, endpoint_b_port=10,
66
                                metadata={"s_vlan": 5}),
67
                get_link_mocked(endpoint_a_port=11, endpoint_b_port=12,
68
                                metadata={"s_vlan": 6})
69
            ]
70
        links_2 = [
71
                get_link_mocked(endpoint_a_port=12, endpoint_b_port=11,
72
                                metadata={"s_vlan": 5}),
73
                get_link_mocked(endpoint_a_port=14, endpoint_b_port=16,
74
                                metadata={"s_vlan": 11})
75
            ]
76
77
        path_1 = Path(links_1)
78
        path_2 = Path(links_2)
79
        self.assertNotEqual(path_1, path_2)
80
81
    def test_as_dict(self):
82
        """Test path as dict."""
83
        links = [
84
                get_link_mocked(link_dict={"id": 3}),
85
                get_link_mocked(link_dict={"id": 2})
86
            ]
87
88
        current_path = Path(links)
89
        expected_dict = [{"id": 3}, {"id": 2}]
90
        self.assertEqual(expected_dict, current_path.as_dict())
91
92
93
class TestEVC(TestCase):  # pylint: disable=too-many-public-methods
94
    """Tests to verify EVC class."""
95
96
    def test_attributes_empty(self):
97
        """Test if the EVC raises an error with name is required."""
98
        attributes = {}
99
        error_message = "name is required."
100
        with self.assertRaises(ValueError) as handle_error:
101
            EVC(**attributes)
102
        self.assertEqual(str(handle_error.exception), error_message)
103
104
    def test_without_uni_a(self):
105
        """Test if the EVC raises and error with UNI A is required."""
106
        attributes = {"name": "circuit_name"}
107
        error_message = "uni_a is required."
108
        with self.assertRaises(ValueError) as handle_error:
109
            EVC(**attributes)
110
        self.assertEqual(str(handle_error.exception), error_message)
111
112 View Code Duplication
    def test_with_invalid_uni_a(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
113
        """Test if the EVC raises and error with invalid UNI A."""
114
        attributes = {
115
            "name": "circuit_name",
116
            "uni_a": get_uni_mocked(tag_value=82)
117
        }
118
        error_message = "VLAN tag 82 is not available in uni_a"
119
        with self.assertRaises(ValueError) as handle_error:
120
            EVC(**attributes)
121
        self.assertEqual(str(handle_error.exception), error_message)
122
123 View Code Duplication
    def test_without_uni_z(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
124
        """Test if the EVC raises and error with UNI Z is required."""
125
        attributes = {
126
            "name": "circuit_name",
127
            "uni_a": get_uni_mocked(is_valid=True)
128
        }
129
        error_message = "uni_z is required."
130
        with self.assertRaises(ValueError) as handle_error:
131
            EVC(**attributes)
132
        self.assertEqual(str(handle_error.exception), error_message)
133
134 View Code Duplication
    def test_with_invalid_uni_z(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
135
        """Test if the EVC raises and error with UNI Z is required."""
136
        attributes = {
137
            "name": "circuit_name",
138
            "uni_a": get_uni_mocked(is_valid=True),
139
            "uni_z": get_uni_mocked(tag_value=83)
140
        }
141
        error_message = "VLAN tag 83 is not available in uni_z"
142
        with self.assertRaises(ValueError) as handle_error:
143
            EVC(**attributes)
144
        self.assertEqual(str(handle_error.exception), error_message)
145
146 View Code Duplication
    def test_update_name(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
147
        """Test if raises and error when trying to update the name."""
148
        attributes = {
149
            "name": "circuit_name",
150
            "uni_a": get_uni_mocked(is_valid=True),
151
            "uni_z": get_uni_mocked(is_valid=True)
152
        }
153
        update_dict = {
154
            "name": "circuit_name_2"
155
        }
156
        error_message = "name can't be be updated."
157
        with self.assertRaises(ValueError) as handle_error:
158
            evc = EVC(**attributes)
159
            evc.update(**update_dict)
160
        self.assertEqual(str(handle_error.exception), error_message)
161
162 View Code Duplication
    def test_update_uni_a(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
163
        """Test if raises and error when trying to update the uni_a."""
164
        attributes = {
165
            "name": "circuit_name",
166
            "uni_a": get_uni_mocked(is_valid=True),
167
            "uni_z": get_uni_mocked(is_valid=True)
168
        }
169
        update_dict = {
170
            "uni_a": get_uni_mocked(is_valid=True)
171
        }
172
        error_message = "uni_a can't be be updated."
173
        with self.assertRaises(ValueError) as handle_error:
174
            evc = EVC(**attributes)
175
            evc.update(**update_dict)
176
        self.assertEqual(str(handle_error.exception), error_message)
177
178 View Code Duplication
    def test_update_uni_z(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
179
        """Test if raises and error when trying to update the uni_z."""
180
        attributes = {
181
            "name": "circuit_name",
182
            "uni_a": get_uni_mocked(is_valid=True),
183
            "uni_z": get_uni_mocked(is_valid=True)
184
        }
185
        update_dict = {
186
            "uni_z": get_uni_mocked(is_valid=True)
187
        }
188
        error_message = "uni_z can't be be updated."
189
        with self.assertRaises(ValueError) as handle_error:
190
            evc = EVC(**attributes)
191
            evc.update(**update_dict)
192
        self.assertEqual(str(handle_error.exception), error_message)
193
194
    def test_circuit_representation(self):
195
        """Test the method __repr__."""
196
        attributes = {
197
            "name": "circuit_name",
198
            "uni_a": get_uni_mocked(is_valid=True),
199
            "uni_z": get_uni_mocked(is_valid=True)
200
        }
201
        evc = EVC(**attributes)
202
        expected_value = f'EVC({evc.id}, {evc.name})'
203
        self.assertEqual(str(evc), expected_value)
204
205 View Code Duplication
    def test_comparison_method(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
206
        """Test the method __eq__."""
207
        attributes = {
208
            "name": "circuit_name",
209
            "uni_a": get_uni_mocked(is_valid=True),
210
            "uni_z": get_uni_mocked(is_valid=True)
211
        }
212
        evc1 = EVC(**attributes)
213
        evc2 = EVC(**attributes)
214
215
        attributes = {
216
            "name": "circuit_name_2",
217
            "uni_a": get_uni_mocked(is_valid=True),
218
            "uni_z": get_uni_mocked(is_valid=True)
219
        }
220
        evc3 = EVC(**attributes)
221
        evc4 = EVC(**attributes)
222
223
        self.assertEqual(evc1 == evc2, True)
224
        self.assertEqual(evc1 == evc3, False)
225
        self.assertEqual(evc2 == evc3, False)
226
        self.assertEqual(evc3 == evc4, True)
227
228 View Code Duplication
    def test_as_dict(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
229
        """Test the method as_dict."""
230
        attributes = {
231
            "id": "custom_id",
232
            "name": "custom_name",
233
            "uni_a": get_uni_mocked(is_valid=True),
234
            "uni_z": get_uni_mocked(is_valid=True),
235
            "start_date": '2018-08-21T18:44:54',
236
            "end_date": '2018-08-21T18:44:55',
237
            'primary_links': [],
238
            'request_time': '2018-08-21T19:10:41',
239
            'creation_time': '2018-08-21T18:44:54',
240
            'owner': "my_name",
241
            'circuit_scheduler': [],
242
            'enabled': True,
243
            'priority': 2
244
        }
245
        evc = EVC(**attributes)
246
247
        expected_dict = {
248
            'id': 'custom_id',
249
            'name': 'custom_name',
250
            'uni_a': attributes['uni_a'].as_dict(),
251
            'uni_z': attributes['uni_z'].as_dict(),
252
            'start_date': '2018-08-21T18:44:54',
253
            'end_date': '2018-08-21T18:44:55',
254
            'bandwidth': 0,
255
            'primary_links': [],
256
            'backup_links': [],
257
            'current_path': [],
258
            'primary_path': [],
259
            'backup_path': [],
260
            'dynamic_backup_path': False,
261
            '_requested': {
262
                           "id": "custom_id",
263
                           "name": "custom_name",
264
                           "uni_a": attributes['uni_a'].as_dict(),
265
                           "uni_z": attributes['uni_z'].as_dict(),
266
                           "start_date": '2018-08-21T18:44:54',
267
                           "end_date": '2018-08-21T18:44:55',
268
                           'primary_links': [],
269
                           'request_time': '2018-08-21T19:10:41',
270
                           'creation_time': '2018-08-21T18:44:54',
271
                           'owner': "my_name",
272
                           'circuit_scheduler': [],
273
                           'enabled': True,
274
                           'priority': 2
275
            },
276
            'request_time': '2018-08-21T19:10:41',
277
            'creation_time': '2018-08-21T18:44:54',
278
            'owner': 'my_name',
279
            'circuit_scheduler': [],
280
            'active': False,
281
            'enabled': True,
282
            'priority': 2
283
        }
284
        actual_dict = evc.as_dict()
285
        for name, value in expected_dict.items():
286
            actual = actual_dict.get(name)
287
            if name == '_requested':
288
                for requested_name, requested_value in value.items():
289
                    if isinstance(requested_value, UNI):
290
                        value[requested_name] = requested_value.as_dict()
291
            self.assertEqual(value, actual)
292
293
    def test_primary_links_zipped(self):
294
        """Test primary links zipped method."""
295
        pass
296
297 View Code Duplication
    @staticmethod
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
298
    @patch('napps.kytos.mef_eline.models.log')
299
    def test_should_deploy_case1(log_mock):
300
        """Test should deploy method without primary links."""
301
        log_mock.debug.return_value = True
302
        attributes = {
303
            "name": "custom_name",
304
            "uni_a": get_uni_mocked(is_valid=True),
305
            "uni_z": get_uni_mocked(is_valid=True)
306
        }
307
308
        evc = EVC(**attributes)
309
        evc.should_deploy()
310
        log_mock.debug.assert_called_with('Path is empty.')
311
312 View Code Duplication
    @patch('napps.kytos.mef_eline.models.log')
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
313
    def test_should_deploy_case2(self, log_mock):
314
        """Test should deploy method with disable circuit."""
315
        log_mock.debug.return_value = True
316
        attributes = {
317
            "name": "custom_name",
318
            "uni_a": get_uni_mocked(is_valid=True),
319
            "uni_z": get_uni_mocked(is_valid=True),
320
            "primary_links": [get_link_mocked(), get_link_mocked()]
321
        }
322
        evc = EVC(**attributes)
323
324
        self.assertFalse(evc.should_deploy(attributes['primary_links']))
325
        log_mock.debug.assert_called_with(f'{evc} is disabled.')
326
327 View Code Duplication
    @patch('napps.kytos.mef_eline.models.log')
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
328
    def test_should_deploy_case3(self, log_mock):
329
        """Test should deploy method with enabled and not active circuit."""
330
        log_mock.debug.return_value = True
331
        attributes = {
332
            "name": "custom_name",
333
            "uni_a": get_uni_mocked(is_valid=True),
334
            "uni_z": get_uni_mocked(is_valid=True),
335
            "primary_links": [get_link_mocked(), get_link_mocked()],
336
            "enabled": True
337
        }
338
        evc = EVC(**attributes)
339
        self.assertTrue(evc.should_deploy(attributes['primary_links']))
340
        log_mock.debug.assert_called_with(f'{evc} will be deployed.')
341
342 View Code Duplication
    @patch('napps.kytos.mef_eline.models.log')
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
343
    def test_should_deploy_case4(self, log_mock):
344
        """Test should deploy method with enabled and active circuit."""
345
        log_mock.debug.return_value = True
346
        attributes = {
347
            "name": "custom_name",
348
            "uni_a": get_uni_mocked(is_valid=True),
349
            "uni_z": get_uni_mocked(is_valid=True),
350
            "primary_links": [get_link_mocked(), get_link_mocked()],
351
            "enabled": True,
352
            "active": True
353
        }
354
        evc = EVC(**attributes)
355
        self.assertFalse(evc.should_deploy(attributes['primary_links']))
356
357 View Code Duplication
    @patch('napps.kytos.mef_eline.models.requests')
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
358
    def test_send_flow_mods(self, requests_mock):
359
        """Test if you are sending flow_mods."""
360
        flow_mods = {"id": 20}
361
        switch = Mock(spec=Switch, id=1)
362
        EVC.send_flow_mods(switch, flow_mods)
363
        expected_endpoint = f"{MANAGER_URL}/flows/{switch.id}"
364
        expected_data = {"flows": flow_mods}
365
        self.assertEqual(requests_mock.post.call_count, 1)
366
        requests_mock.post.assert_called_once_with(expected_endpoint,
367
                                                   json=expected_data)
368
369 View Code Duplication
    def test_prepare_flow_mod(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
370
        """Test prepare flow_mod method."""
371
        interface_a = Interface('eth0', 1, Mock(spec=Switch))
372
        interface_z = Interface('eth1', 3, Mock(spec=Switch))
373
        flow_mod = EVC.prepare_flow_mod(interface_a, interface_z)
374
        expected_flow_mod = {
375
                           'match': {'in_port': interface_a.port_number},
376
                           'actions': [
377
                                       {'action_type': 'output',
378
                                        'port': interface_z.port_number}
379
                           ]
380
        }
381
        self.assertEqual(expected_flow_mod, flow_mod)
382
383 View Code Duplication
    def test_prepare_pop_flow(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
384
        """Test prepare pop flow  method."""
385
        attributes = {
386
            "name": "custom_name",
387
            "uni_a": get_uni_mocked(interface_port=1, is_valid=True),
388
            "uni_z": get_uni_mocked(interface_port=2, is_valid=True),
389
        }
390
        evc = EVC(**attributes)
391
        interface_a = evc.uni_a.interface
392
        interface_z = evc.uni_z.interface
393
        in_vlan = 10
394
        flow_mod = evc.prepare_pop_flow(interface_a, interface_z, in_vlan)
395
        expected_flow_mod = {
396
            'match': {'in_port': interface_a.port_number, 'dl_vlan': in_vlan},
397
            'actions': [
398
                        {'action_type': 'pop_vlan'},
399
                        {'action_type': 'output',
400
                         'port': interface_z.port_number
401
                         }
402
            ]
403
        }
404
        self.assertEqual(expected_flow_mod, flow_mod)
405
406 View Code Duplication
    def test_prepare_push_flow(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
407
        """Test prepare push flow method."""
408
        attributes = {
409
            "name": "custom_name",
410
            "uni_a": get_uni_mocked(interface_port=1, is_valid=True),
411
            "uni_z": get_uni_mocked(interface_port=2, is_valid=True),
412
        }
413
        evc = EVC(**attributes)
414
        interface_a = evc.uni_a.interface
415
        interface_z = evc.uni_z.interface
416
        in_vlan_a = 10
417
        out_vlan_a = 20
418
        in_vlan_z = 3
419
        flow_mod = evc.prepare_push_flow(interface_a, interface_z,
420
                                         in_vlan_a, out_vlan_a, in_vlan_z)
421
        expected_flow_mod = {
422
            'match': {'in_port': interface_a.port_number,
423
                      'dl_vlan': in_vlan_a
424
                      },
425
            'actions': [
426
                        {'action_type': 'set_vlan', 'vlan_id': in_vlan_z},
427
                        {'action_type': 'push_vlan', 'tag_type': 's'},
428
                        {'action_type': 'set_vlan', 'vlan_id': out_vlan_a},
429
                        {'action_type': 'output',
430
                         'port': interface_z.port_number
431
                         }
432
            ]
433
        }
434
        self.assertEqual(expected_flow_mod, flow_mod)
435
436 View Code Duplication
    @staticmethod
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
437
    @patch('napps.kytos.mef_eline.models.EVC.send_flow_mods')
438
    def test_install_uni_flows(send_flow_mods_mock):
439
        """Test install uni flows method.
440
441
        This test will verify the flows send to the send_flows_mods method.
442
        """
443
        uni_a = get_uni_mocked(interface_port=2, tag_value=82,
444
                               switch_id="switch_uni_a", is_valid=True)
445
        uni_z = get_uni_mocked(interface_port=3, tag_value=83,
446
                               switch_id="switch_uni_z", is_valid=True)
447
448
        attributes = {
449
            "name": "custom_name",
450
            "uni_a": uni_a,
451
            "uni_z": uni_z,
452
            "primary_links": [
453
                get_link_mocked(endpoint_a_port=9, endpoint_b_port=10,
454
                                metadata={"s_vlan": 5}),
455
                get_link_mocked(endpoint_a_port=11, endpoint_b_port=12,
456
                                metadata={"s_vlan": 6})
457
            ]
458
        }
459
        evc = EVC(**attributes)
460
        evc.install_uni_flows(attributes['primary_links'])
461
462
        expected_flow_mod_a = [
463
            {'match': {'in_port': uni_a.interface.port_number,
464
                       'dl_vlan': uni_a.user_tag.value},
465
             'actions': [
466
                {'action_type': 'set_vlan', 'vlan_id': uni_z.user_tag.value},
467
                {'action_type': 'push_vlan', 'tag_type': 's'},
468
                {'action_type': 'set_vlan',
469
                 'vlan_id': evc.primary_links[0].get_metadata('s_vlan').value},
470
                {'action_type': 'output',
471
                 'port': evc.primary_links[0].endpoint_a.port_number}
472
             ]},
473
            {'match': {
474
                'in_port': evc.primary_links[0].endpoint_a.port_number,
475
                'dl_vlan': evc.primary_links[0].get_metadata('s_vlan').value
476
             },
477
             'actions': [
478
                {'action_type': 'pop_vlan'},
479
                {'action_type': 'output', 'port': uni_a.interface.port_number}
480
             ]
481
             }
482
        ]
483
        send_flow_mods_mock.assert_any_call(uni_a.interface.switch,
484
                                            expected_flow_mod_a)
485
486
        expected_flow_mod_z = [
487
            {'match': {'in_port': uni_z.interface.port_number,
488
                       'dl_vlan': uni_z.user_tag.value},
489
             'actions': [
490
                {'action_type': 'set_vlan', 'vlan_id': uni_a.user_tag.value},
491
                {'action_type': 'push_vlan', 'tag_type': 's'},
492
                {'action_type': 'set_vlan',
493
                 'vlan_id': evc.primary_links[-1].get_metadata('s_vlan').value
494
                 },
495
                {'action_type': 'output',
496
                 'port': evc.primary_links[-1].endpoint_b.port_number}
497
              ]
498
             },
499
            {'match': {
500
                 'in_port': evc.primary_links[-1].endpoint_b.port_number,
501
                 'dl_vlan': evc.primary_links[-1].get_metadata('s_vlan').value
502
             },
503
             'actions': [
504
                {'action_type': 'pop_vlan'},
505
                {'action_type': 'output', 'port': uni_z.interface.port_number}
506
              ]
507
             }
508
        ]
509
510
        send_flow_mods_mock.assert_any_call(uni_z.interface.switch,
511
                                            expected_flow_mod_z)
512
513 View Code Duplication
    @staticmethod
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
514
    @patch('napps.kytos.mef_eline.models.EVC.send_flow_mods')
515
    def test_install_nni_flows(send_flow_mods_mock):
516
        """Test install nni flows method.
517
518
        This test will verify the flows send to the send_flows_mods method.
519
        """
520
        uni_a = get_uni_mocked(interface_port=2, tag_value=82,
521
                               switch_id="switch_uni_a", is_valid=True)
522
        uni_z = get_uni_mocked(interface_port=3, tag_value=83,
523
                               switch_id="switch_uni_z", is_valid=True)
524
525
        attributes = {
526
            "name": "custom_name",
527
            "uni_a": uni_a,
528
            "uni_z": uni_z,
529
            "primary_links": [
530
                get_link_mocked(endpoint_a_port=9, endpoint_b_port=10,
531
                                metadata={"s_vlan": 5}),
532
                get_link_mocked(endpoint_a_port=11, endpoint_b_port=12,
533
                                metadata={"s_vlan": 6})
534
            ]
535
        }
536
        evc = EVC(**attributes)
537
        evc.install_nni_flows(attributes['primary_links'])
538
539
        in_vlan = evc.primary_links[0].get_metadata('s_vlan').value
540
        out_vlan = evc.primary_links[-1].get_metadata('s_vlan').value
541
542
        in_port = evc.primary_links[0].endpoint_b.port_number
543
        out_port = evc.primary_links[-1].endpoint_a.port_number
544
545
        expected_flow_mods = [
546
            {'match': {
547
                'in_port': in_port,
548
                'dl_vlan': in_vlan},
549
                'actions': [
550
                    {'action_type': 'set_vlan', 'vlan_id': out_vlan},
551
                    {'action_type': 'output', 'port': out_port}
552
                ]
553
             },
554
            {'match': {'in_port': out_port, 'dl_vlan': out_vlan},
555
             'actions': [
556
                {'action_type': 'set_vlan', 'vlan_id': in_vlan},
557
                {'action_type': 'output', 'port': in_port}
558
              ]
559
             }
560
        ]
561
562
        switch = evc.primary_links[0].endpoint_b.switch
563
        send_flow_mods_mock.assert_called_once_with(switch, expected_flow_mods)
564
565 View Code Duplication
    @patch('napps.kytos.mef_eline.models.log')
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
566
    @patch('napps.kytos.mef_eline.models.EVC.choose_vlans')
567
    @patch('napps.kytos.mef_eline.models.EVC.install_nni_flows')
568
    @patch('napps.kytos.mef_eline.models.EVC.install_uni_flows')
569
    @patch('napps.kytos.mef_eline.models.EVC.activate')
570
    @patch('napps.kytos.mef_eline.models.EVC.should_deploy')
571
    def test_deploy_successfully(self, *args):
572
        """Test if all methods to deploy are called."""
573
        (should_deploy_mock, activate_mock, install_uni_flows_mock,
574
         install_nni_flows, chose_vlans_mock, log_mock) = args
575
576
        should_deploy_mock.return_value = True
577
        uni_a = get_uni_mocked(interface_port=2, tag_value=82,
578
                               switch_id="switch_uni_a", is_valid=True)
579
        uni_z = get_uni_mocked(interface_port=3, tag_value=83,
580
                               switch_id="switch_uni_z", is_valid=True)
581
582
        attributes = {
583
            "name": "custom_name",
584
            "uni_a": uni_a,
585
            "uni_z": uni_z,
586
            "primary_links": [
587
                get_link_mocked(endpoint_a_port=9, endpoint_b_port=10,
588
                                metadata={"s_vlan": 5}),
589
                get_link_mocked(endpoint_a_port=11, endpoint_b_port=12,
590
                                metadata={"s_vlan": 6})
591
            ]
592
        }
593
594
        evc = EVC(**attributes)
595
        deployed = evc.deploy()
596
597
        self.assertEqual(should_deploy_mock.call_count, 1)
598
        self.assertEqual(activate_mock.call_count, 1)
599
        self.assertEqual(install_uni_flows_mock.call_count, 1)
600
        self.assertEqual(install_nni_flows.call_count, 1)
601
        self.assertEqual(chose_vlans_mock.call_count, 1)
602
        log_mock.info.assert_called_once_with(f"{evc} was deployed.")
603
        self.assertTrue(deployed)
604
605 View Code Duplication
    @patch('napps.kytos.mef_eline.models.log')
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
606
    @patch('napps.kytos.mef_eline.models.EVC.choose_vlans')
607
    @patch('napps.kytos.mef_eline.models.EVC.install_nni_flows')
608
    @patch('napps.kytos.mef_eline.models.EVC.install_uni_flows')
609
    @patch('napps.kytos.mef_eline.models.EVC.activate')
610
    @patch('napps.kytos.mef_eline.models.EVC.should_deploy')
611
    def test_deploy_fail(self, *args):
612
        """Test if all methods is ignored when the should_deploy is false."""
613
        (should_deploy_mock, activate_mock, install_uni_flows_mock,
614
         install_nni_flows, chose_vlans_mock, log_mock) = args
615
616
        should_deploy_mock.return_value = False
617
        uni_a = get_uni_mocked(interface_port=2, tag_value=82,
618
                               switch_id="switch_uni_a", is_valid=True)
619
        uni_z = get_uni_mocked(interface_port=3, tag_value=83,
620
                               switch_id="switch_uni_z", is_valid=True)
621
622
        attributes = {
623
            "name": "custom_name",
624
            "uni_a": uni_a,
625
            "uni_z": uni_z,
626
            "primary_links": [
627
                get_link_mocked(endpoint_a_port=9, endpoint_b_port=10,
628
                                metadata={"s_vlan": 5}),
629
                get_link_mocked(endpoint_a_port=11, endpoint_b_port=12,
630
                                metadata={"s_vlan": 6})
631
            ]
632
        }
633
634
        evc = EVC(**attributes)
635
        deployed = evc.deploy()
636
637
        self.assertEqual(should_deploy_mock.call_count, 1)
638
        self.assertEqual(activate_mock.call_count, 0)
639
        self.assertEqual(install_uni_flows_mock.call_count, 0)
640
        self.assertEqual(install_nni_flows.call_count, 0)
641
        self.assertEqual(chose_vlans_mock.call_count, 0)
642
        self.assertEqual(log_mock.info.call_count, 0)
643
        self.assertFalse(deployed)
644
645 View Code Duplication
    def test_is_using_backup_path(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
646
        """Test test is using backup path."""
647
        backup_path = [
648
                get_link_mocked(endpoint_a_port=9, endpoint_b_port=10,
649
                                metadata={"s_vlan": 5}),
650
                get_link_mocked(endpoint_a_port=11, endpoint_b_port=12,
651
                                metadata={"s_vlan": 6})
652
        ]
653
654
        attributes = {
655
            "name": "circuit_name",
656
            "uni_a": get_uni_mocked(is_valid=True),
657
            "uni_z": get_uni_mocked(is_valid=True),
658
            "backup_path": backup_path
659
        }
660
661
        evc = EVC(**attributes)
662
        self.assertFalse(evc.is_using_backup_path())
663
        evc.current_path = evc.backup_path
664
        self.assertTrue(evc.is_using_backup_path())
665
666 View Code Duplication
    def test_is_using_primary_path(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
667
        """Test test is using primary path."""
668
        primary_path = [
669
                get_link_mocked(endpoint_a_port=9, endpoint_b_port=10,
670
                                metadata={"s_vlan": 5}),
671
                get_link_mocked(endpoint_a_port=11, endpoint_b_port=12,
672
                                metadata={"s_vlan": 6})
673
        ]
674
675
        attributes = {
676
            "name": "circuit_name",
677
            "uni_a": get_uni_mocked(is_valid=True),
678
            "uni_z": get_uni_mocked(is_valid=True),
679
            "primary_path": primary_path
680
        }
681
682
        evc = EVC(**attributes)
683
        self.assertFalse(evc.is_using_primary_path())
684
        evc.current_path = evc.primary_path
685
        self.assertTrue(evc.is_using_primary_path())
686
687 View Code Duplication
    @patch('napps.kytos.mef_eline.models.log')
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
688
    def test_deploy_to_case_1(self, log_mocked):
689
        """Test if the path is equal to current_path."""
690
        primary_path = [
691
                get_link_mocked(endpoint_a_port=9, endpoint_b_port=10,
692
                                metadata={"s_vlan": 5}),
693
                get_link_mocked(endpoint_a_port=11, endpoint_b_port=12,
694
                                metadata={"s_vlan": 6})
695
        ]
696
        attributes = {
697
            "name": "circuit_name",
698
            "uni_a": get_uni_mocked(is_valid=True),
699
            "uni_z": get_uni_mocked(is_valid=True),
700
            "primary_path": primary_path
701
        }
702
        evc = EVC(**attributes)
703
        evc.current_path = evc.primary_path
704
705
        expected_deployed = evc.deploy_to('primary_path', evc.primary_path)
706
        expected_msg = 'primary_path is equal to current_path.'
707
        log_mocked.debug.assert_called_with(expected_msg)
708
        self.assertTrue(expected_deployed)
709
710 View Code Duplication
    @patch('napps.kytos.mef_eline.models.EVCDeploy.deploy')
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
711
    def test_deploy_to_case_2(self, deploy_mocked):
712
        """Test deploy with all links up."""
713
        deploy_mocked.return_value = True
714
715
        primary_path = [
716
                 get_link_mocked(status=EntityStatus.UP),
717
                 get_link_mocked(status=EntityStatus.UP)
718
        ]
719
        attributes = {
720
            "name": "circuit_name",
721
            "uni_a": get_uni_mocked(is_valid=True),
722
            "uni_z": get_uni_mocked(is_valid=True),
723
            "primary_path": primary_path,
724
            "enabled": True
725
        }
726
        evc = EVC(**attributes)
727
        deployed = evc.deploy_to('primary_path', evc.primary_path)
728
        deploy_mocked.assert_called_with(evc.primary_path)
729
        self.assertTrue(deployed)
730
731 View Code Duplication
    def test_deploy_to_case_3(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
732
        """Test deploy with one link down."""
733
        primary_path = [
734
                 get_link_mocked(status=EntityStatus.DOWN),
735
                 get_link_mocked(status=EntityStatus.UP)
736
        ]
737
        attributes = {
738
            "name": "circuit_name",
739
            "uni_a": get_uni_mocked(is_valid=True),
740
            "uni_z": get_uni_mocked(is_valid=True),
741
            "primary_path": primary_path,
742
            "enabled": True
743
        }
744
        evc = EVC(**attributes)
745
        deployed = evc.deploy_to('primary_path', evc.primary_path)
746
        self.assertFalse(deployed)
747
748 View Code Duplication
    @patch('napps.kytos.mef_eline.models.log')
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
749
    @patch('napps.kytos.mef_eline.models.EVCDeploy.deploy')
750
    @patch('napps.kytos.mef_eline.models.LinkProtection.deploy_to')
751
    def test_handle_link_down_case_1(self, deploy_to_mocked, deploy_mocked,
752
                                     log_mocked):
753
        """Test if deploy_to backup path is called."""
754
        deploy_mocked.return_value = True
755
        deploy_to_mocked.return_value = True
756
        primary_path = [
757
                get_link_mocked(endpoint_a_port=9, endpoint_b_port=10,
758
                                metadata={"s_vlan": 5},
759
                                status=EntityStatus.DOWN),
760
                get_link_mocked(endpoint_a_port=11, endpoint_b_port=12,
761
                                metadata={"s_vlan": 6},
762
                                status=EntityStatus.UP),
763
        ]
764
        backup_path = [
765
                get_link_mocked(endpoint_a_port=9, endpoint_b_port=10,
766
                                metadata={"s_vlan": 5},
767
                                status=EntityStatus.UP),
768
                get_link_mocked(endpoint_a_port=11, endpoint_b_port=12,
769
                                metadata={"s_vlan": 6},
770
                                status=EntityStatus.UP),
771
        ]
772
        attributes = {
773
            "name": "circuit_name",
774
            "uni_a": get_uni_mocked(is_valid=True),
775
            "uni_z": get_uni_mocked(is_valid=True),
776
            "primary_path": primary_path,
777
            "backup_path": backup_path,
778
            "enabled": True
779
        }
780
        evc = EVC(**attributes)
781
782
        evc.current_path = evc.primary_path
783
        current_handle_link_down = evc.handle_link_down()
784
        self.assertEqual(deploy_mocked.call_count, 0)
785
        deploy_to_mocked.assert_called_once_with('backup_path',
786
                                                 evc.backup_path)
787
        self.assertTrue(current_handle_link_down)
788
        msg = f"{evc} deployed after link down."
789
        log_mocked.debug.assert_called_once_with(msg)
790
791 View Code Duplication
    @patch('napps.kytos.mef_eline.models.log')
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
792
    @patch('napps.kytos.mef_eline.models.EVCDeploy.deploy')
793
    @patch('napps.kytos.mef_eline.models.LinkProtection.deploy_to')
794
    def test_handle_link_down_case_2(self, deploy_to_mocked, deploy_mocked,
795
                                     log_mocked):
796
        """Test if deploy_to backup path is called."""
797
        deploy_mocked.return_value = True
798
        deploy_to_mocked.return_value = True
799
        primary_path = [
800
                get_link_mocked(endpoint_a_port=9, endpoint_b_port=10,
801
                                metadata={"s_vlan": 5},
802
                                status=EntityStatus.UP),
803
                get_link_mocked(endpoint_a_port=11, endpoint_b_port=12,
804
                                metadata={"s_vlan": 6},
805
                                status=EntityStatus.UP),
806
        ]
807
        backup_path = [
808
                get_link_mocked(endpoint_a_port=9, endpoint_b_port=10,
809
                                metadata={"s_vlan": 5},
810
                                status=EntityStatus.DOWN),
811
                get_link_mocked(endpoint_a_port=11, endpoint_b_port=12,
812
                                metadata={"s_vlan": 6},
813
                                status=EntityStatus.UP),
814
        ]
815
        attributes = {
816
            "name": "circuit_name",
817
            "uni_a": get_uni_mocked(is_valid=True),
818
            "uni_z": get_uni_mocked(is_valid=True),
819
            "primary_path": primary_path,
820
            "backup_path": backup_path,
821
            "enabled": True
822
        }
823
824
        evc = EVC(**attributes)
825
        evc.current_path = evc.backup_path
826
        current_handle_link_down = evc.handle_link_down()
827
        self.assertEqual(deploy_mocked.call_count, 0)
828
        deploy_to_mocked.assert_called_once_with('primary_path',
829
                                                 evc.primary_path)
830
        self.assertTrue(current_handle_link_down)
831
        msg = f"{evc} deployed after link down."
832
        log_mocked.debug.assert_called_once_with(msg)
833
834 View Code Duplication
    @patch('napps.kytos.mef_eline.models.log')
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
835
    @patch('napps.kytos.mef_eline.models.EVCDeploy.deploy')
836
    @patch('napps.kytos.mef_eline.models.LinkProtection.deploy_to')
837
    def test_handle_link_down_case_3(self, deploy_to_mocked, deploy_mocked,
838
                                     log_mocked):
839
        """Test if circuit without dynamic path is return failed."""
840
        deploy_mocked.return_value = False
841
        deploy_to_mocked.return_value = False
842
        primary_path = [
843
                get_link_mocked(endpoint_a_port=9, endpoint_b_port=10,
844
                                metadata={"s_vlan": 5},
845
                                status=EntityStatus.DOWN),
846
                get_link_mocked(endpoint_a_port=11, endpoint_b_port=12,
847
                                metadata={"s_vlan": 6},
848
                                status=EntityStatus.UP),
849
        ]
850
        backup_path = [
851
                get_link_mocked(endpoint_a_port=9, endpoint_b_port=10,
852
                                metadata={"s_vlan": 5},
853
                                status=EntityStatus.DOWN),
854
                get_link_mocked(endpoint_a_port=11, endpoint_b_port=12,
855
                                metadata={"s_vlan": 6},
856
                                status=EntityStatus.UP),
857
        ]
858
        attributes = {
859
            "name": "circuit_name",
860
            "uni_a": get_uni_mocked(is_valid=True),
861
            "uni_z": get_uni_mocked(is_valid=True),
862
            "primary_path": primary_path,
863
            "backup_path": backup_path,
864
            "enabled": True
865
        }
866
867
        evc = EVC(**attributes)
868
        evc.current_path = evc.backup_path
869
        current_handle_link_down = evc.handle_link_down()
870
        self.assertEqual(deploy_mocked.call_count, 0)
871
        self.assertEqual(deploy_to_mocked.call_count, 1)
872
        deploy_to_mocked.assert_called_once_with('primary_path',
873
                                                 evc.primary_path)
874
        self.assertFalse(current_handle_link_down)
875
        msg = f'Failed to re-deploy {evc} after link down.'
876
        log_mocked.debug.assert_called_once_with(msg)
877
878
    @patch('napps.kytos.mef_eline.models.log')
879
    @patch('napps.kytos.mef_eline.models.EVCDeploy.deploy')
880
    @patch('napps.kytos.mef_eline.models.LinkProtection.deploy_to')
881
    def test_handle_link_down_case_4(self, deploy_to_mocked, deploy_mocked,
882
                                     log_mocked):
883
        """Test if circuit without dynamic path is return failed."""
884
        deploy_mocked.return_value = True
885
        deploy_to_mocked.return_value = False
886
        primary_path = [
887
                get_link_mocked(endpoint_a_port=9, endpoint_b_port=10,
888
                                metadata={"s_vlan": 5},
889
                                status=EntityStatus.DOWN),
890
                get_link_mocked(endpoint_a_port=11, endpoint_b_port=12,
891
                                metadata={"s_vlan": 6},
892
                                status=EntityStatus.UP),
893
        ]
894
        backup_path = [
895
                get_link_mocked(endpoint_a_port=9, endpoint_b_port=10,
896
                                metadata={"s_vlan": 5},
897
                                status=EntityStatus.DOWN),
898
                get_link_mocked(endpoint_a_port=11, endpoint_b_port=12,
899
                                metadata={"s_vlan": 6},
900
                                status=EntityStatus.UP),
901
        ]
902
        attributes = {
903
            "name": "circuit_name",
904
            "uni_a": get_uni_mocked(is_valid=True),
905
            "uni_z": get_uni_mocked(is_valid=True),
906
            "primary_path": primary_path,
907
            "backup_path": backup_path,
908
            "enabled": True,
909
            "dynamic_backup_path": True
910
        }
911
912
        evc = EVC(**attributes)
913
        evc.current_path = evc.backup_path
914
        current_handle_link_down = evc.handle_link_down()
915
        self.assertEqual(deploy_mocked.call_count, 1)
916
        self.assertEqual(deploy_to_mocked.call_count, 1)
917
        deploy_to_mocked.assert_called_once_with('primary_path',
918
                                                 evc.primary_path)
919
        self.assertTrue(current_handle_link_down)
920
        msg = f"{evc} deployed after link down."
921
        log_mocked.debug.assert_called_once_with(msg)
922
923 View Code Duplication
    @patch('napps.kytos.mef_eline.models.EVCDeploy.deploy')
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
924
    @patch('napps.kytos.mef_eline.models.LinkProtection.deploy_to')
925
    def test_handle_link_up_case_1(self, deploy_to_mocked, deploy_mocked):
926
        """Test if handle link up do nothing when is using primary path."""
927
        deploy_mocked.return_value = True
928
        deploy_to_mocked.return_value = True
929
        primary_path = [
930
                get_link_mocked(endpoint_a_port=9, endpoint_b_port=10,
931
                                metadata={"s_vlan": 5},
932
                                status=EntityStatus.UP),
933
                get_link_mocked(endpoint_a_port=11, endpoint_b_port=12,
934
                                metadata={"s_vlan": 6},
935
                                status=EntityStatus.UP),
936
        ]
937
        backup_path = [
938
                get_link_mocked(endpoint_a_port=9, endpoint_b_port=10,
939
                                metadata={"s_vlan": 5},
940
                                status=EntityStatus.UP),
941
                get_link_mocked(endpoint_a_port=11, endpoint_b_port=12,
942
                                metadata={"s_vlan": 6},
943
                                status=EntityStatus.UP),
944
        ]
945
        attributes = {
946
            "name": "circuit_name",
947
            "uni_a": get_uni_mocked(is_valid=True),
948
            "uni_z": get_uni_mocked(is_valid=True),
949
            "primary_path": primary_path,
950
            "backup_path": backup_path,
951
            "enabled": True,
952
            "dynamic_backup_path": True
953
        }
954
955
        evc = EVC(**attributes)
956
        evc.current_path = evc.primary_path
957
        current_handle_link_up = evc.handle_link_up(backup_path[0])
958
        self.assertEqual(deploy_mocked.call_count, 0)
959
        self.assertEqual(deploy_to_mocked.call_count, 0)
960
        self.assertTrue(current_handle_link_up)
961
962 View Code Duplication
    @patch('napps.kytos.mef_eline.models.EVCDeploy.deploy')
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
963
    @patch('napps.kytos.mef_eline.models.LinkProtection.deploy_to')
964
    def test_handle_link_up_case_2(self, deploy_to_mocked, deploy_mocked):
965
        """Test if it is changing from backup_path to primary_path."""
966
        deploy_mocked.return_value = True
967
        deploy_to_mocked.return_value = True
968
        primary_path = [
969
                get_link_mocked(endpoint_a_port=9, endpoint_b_port=10,
970
                                metadata={"s_vlan": 5},
971
                                status=EntityStatus.UP),
972
                get_link_mocked(endpoint_a_port=11, endpoint_b_port=12,
973
                                metadata={"s_vlan": 6},
974
                                status=EntityStatus.UP),
975
        ]
976
        backup_path = [
977
                get_link_mocked(endpoint_a_port=9, endpoint_b_port=10,
978
                                metadata={"s_vlan": 5},
979
                                status=EntityStatus.UP),
980
                get_link_mocked(endpoint_a_port=11, endpoint_b_port=12,
981
                                metadata={"s_vlan": 6},
982
                                status=EntityStatus.UP),
983
        ]
984
        attributes = {
985
            "name": "circuit_name",
986
            "uni_a": get_uni_mocked(is_valid=True),
987
            "uni_z": get_uni_mocked(is_valid=True),
988
            "primary_path": primary_path,
989
            "backup_path": backup_path,
990
            "enabled": True,
991
            "dynamic_backup_path": True
992
        }
993
994
        evc = EVC(**attributes)
995
        evc.current_path = evc.backup_path
996
        current_handle_link_up = evc.handle_link_up(primary_path[0])
997
        self.assertEqual(deploy_mocked.call_count, 0)
998
        self.assertEqual(deploy_to_mocked.call_count, 1)
999
        deploy_to_mocked.assert_called_once_with('primary_path',
1000
                                                 evc.primary_path)
1001
        self.assertTrue(current_handle_link_up)
1002
1003
    @patch('napps.kytos.mef_eline.models.EVCDeploy.deploy')
1004
    @patch('napps.kytos.mef_eline.models.LinkProtection.deploy_to')
1005
    def test_handle_link_up_case_3(self, deploy_to_mocked, deploy_mocked):
1006
        """Test if it is deployed after the backup is up."""
1007
        deploy_mocked.return_value = True
1008
        deploy_to_mocked.return_value = True
1009
        primary_path = [
1010
                get_link_mocked(endpoint_a_port=9, endpoint_b_port=10,
1011
                                metadata={"s_vlan": 5},
1012
                                status=EntityStatus.DOWN),
1013
                get_link_mocked(endpoint_a_port=11, endpoint_b_port=12,
1014
                                metadata={"s_vlan": 6},
1015
                                status=EntityStatus.UP),
1016
        ]
1017
        backup_path = [
1018
                get_link_mocked(endpoint_a_port=9, endpoint_b_port=10,
1019
                                metadata={"s_vlan": 5},
1020
                                status=EntityStatus.DOWN),
1021
                get_link_mocked(endpoint_a_port=11, endpoint_b_port=12,
1022
                                metadata={"s_vlan": 6},
1023
                                status=EntityStatus.UP),
1024
        ]
1025
        attributes = {
1026
            "name": "circuit_name",
1027
            "uni_a": get_uni_mocked(is_valid=True),
1028
            "uni_z": get_uni_mocked(is_valid=True),
1029
            "primary_path": primary_path,
1030
            "backup_path": backup_path,
1031
            "enabled": True,
1032
            "dynamic_backup_path": True
1033
        }
1034
1035
        evc = EVC(**attributes)
1036
        evc.current_path = Path([])
1037
        current_handle_link_up = evc.handle_link_up(backup_path[0])
1038
        self.assertEqual(deploy_mocked.call_count, 0)
1039
        self.assertEqual(deploy_to_mocked.call_count, 1)
1040
        deploy_to_mocked.assert_called_once_with('backup_path',
1041
                                                 evc.backup_path)
1042
        self.assertTrue(current_handle_link_up)
1043
1044
    @patch('napps.kytos.mef_eline.models.EVCDeploy.deploy')
1045
    @patch('napps.kytos.mef_eline.models.LinkProtection.deploy_to')
1046
    def test_handle_link_up_case_4(self, deploy_to_mocked, deploy_mocked):
1047
        """Test if it is deployed after the dynamic_backup_path deploy."""
1048
        deploy_mocked.return_value = True
1049
        deploy_to_mocked.return_value = False
1050
        primary_path = [
1051
                get_link_mocked(endpoint_a_port=9, endpoint_b_port=10,
1052
                                metadata={"s_vlan": 5},
1053
                                status=EntityStatus.DOWN),
1054
                get_link_mocked(endpoint_a_port=11, endpoint_b_port=12,
1055
                                metadata={"s_vlan": 6},
1056
                                status=EntityStatus.UP),
1057
        ]
1058
        backup_path = [
1059
                get_link_mocked(endpoint_a_port=9, endpoint_b_port=10,
1060
                                metadata={"s_vlan": 5},
1061
                                status=EntityStatus.DOWN),
1062
                get_link_mocked(endpoint_a_port=11, endpoint_b_port=12,
1063
                                metadata={"s_vlan": 6},
1064
                                status=EntityStatus.UP),
1065
        ]
1066
        attributes = {
1067
            "name": "circuit_name",
1068
            "uni_a": get_uni_mocked(is_valid=True),
1069
            "uni_z": get_uni_mocked(is_valid=True),
1070
            "primary_path": primary_path,
1071
            "backup_path": backup_path,
1072
            "enabled": True,
1073
            "dynamic_backup_path": True
1074
        }
1075
1076
        evc = EVC(**attributes)
1077
        evc.current_path = Path([])
1078
        current_handle_link_up = evc.handle_link_up(backup_path[0])
1079
        self.assertEqual(deploy_mocked.call_count, 1)
1080
        self.assertEqual(deploy_to_mocked.call_count, 1)
1081
        deploy_to_mocked.assert_called_once_with('backup_path',
1082
                                                 evc.backup_path)
1083
        self.assertTrue(current_handle_link_up)
1084