1
|
|
|
"""Module to test the LinkProtection class.""" |
2
|
|
|
import sys |
3
|
|
|
|
4
|
|
|
from unittest import TestCase |
5
|
|
|
from unittest.mock import MagicMock, patch |
6
|
|
|
from unittest.mock import Mock |
7
|
|
|
|
8
|
|
|
from napps.kytos.mef_eline.models import EVC, Path # NOQA pycodestyle |
9
|
|
|
from napps.kytos.mef_eline.tests.helpers import ( |
10
|
|
|
get_link_mocked, |
11
|
|
|
get_uni_mocked, |
12
|
|
|
get_controller_mock, |
13
|
|
|
get_mocked_requests, |
14
|
|
|
) # NOQA pycodestyle |
15
|
|
|
|
16
|
|
|
|
17
|
|
|
from kytos.core.common import EntityStatus |
18
|
|
|
|
19
|
|
|
sys.path.insert(0, "/var/lib/kytos/napps/..") |
20
|
|
|
|
21
|
|
|
|
22
|
|
|
DEPLOY_TO_PRIMARY_PATH = ( |
23
|
|
|
"napps.kytos.mef_eline.models.evc.LinkProtection.deploy_to_primary_path" |
24
|
|
|
) |
25
|
|
|
DEPLOY_TO_BACKUP_PATH = ( |
26
|
|
|
"napps.kytos.mef_eline.models.evc.LinkProtection.deploy_to_backup_path" |
27
|
|
|
) |
28
|
|
|
GET_BEST_PATH = ( |
29
|
|
|
"napps.kytos.mef_eline.models.path.DynamicPathManager.get_best_path" |
30
|
|
|
) |
31
|
|
|
|
32
|
|
|
|
33
|
|
|
class TestLinkProtection(TestCase): # pylint: disable=too-many-public-methods |
34
|
|
|
"""Tests to validate LinkProtection class.""" |
35
|
|
|
|
36
|
|
|
def test_is_using_backup_path(self): |
37
|
|
|
"""Test test is using backup path.""" |
38
|
|
|
|
39
|
|
|
attributes = { |
40
|
|
|
"controller": get_controller_mock(), |
41
|
|
|
"name": "circuit_1", |
42
|
|
|
"uni_a": get_uni_mocked(is_valid=True), |
43
|
|
|
"uni_z": get_uni_mocked(is_valid=True), |
44
|
|
|
"backup_path": [ |
45
|
|
|
get_link_mocked( |
46
|
|
|
endpoint_a_port=10, |
47
|
|
|
endpoint_b_port=9, |
48
|
|
|
metadata={"s_vlan": 5}, |
49
|
|
|
), |
50
|
|
|
get_link_mocked( |
51
|
|
|
endpoint_a_port=12, |
52
|
|
|
endpoint_b_port=11, |
53
|
|
|
metadata={"s_vlan": 6}, |
54
|
|
|
), |
55
|
|
|
], |
56
|
|
|
} |
57
|
|
|
|
58
|
|
|
evc = EVC(**attributes) |
59
|
|
|
self.assertFalse(evc.is_using_backup_path()) |
60
|
|
|
evc.current_path = evc.backup_path |
61
|
|
|
self.assertTrue(evc.is_using_backup_path()) |
62
|
|
|
|
63
|
|
|
def test_is_using_primary_path(self): |
64
|
|
|
"""Test test is using primary path.""" |
65
|
|
|
primary_path = [ |
66
|
|
|
get_link_mocked( |
67
|
|
|
endpoint_a_port=10, endpoint_b_port=9, metadata={"s_vlan": 5} |
68
|
|
|
), |
69
|
|
|
get_link_mocked( |
70
|
|
|
endpoint_a_port=12, endpoint_b_port=11, metadata={"s_vlan": 6} |
71
|
|
|
), |
72
|
|
|
] |
73
|
|
|
|
74
|
|
|
attributes = { |
75
|
|
|
"controller": get_controller_mock(), |
76
|
|
|
"name": "circuit_2", |
77
|
|
|
"uni_a": get_uni_mocked(is_valid=True), |
78
|
|
|
"uni_z": get_uni_mocked(is_valid=True), |
79
|
|
|
"primary_path": primary_path, |
80
|
|
|
} |
81
|
|
|
evc = EVC(**attributes) |
82
|
|
|
self.assertFalse(evc.is_using_primary_path()) |
83
|
|
|
evc.current_path = evc.primary_path |
84
|
|
|
self.assertTrue(evc.is_using_primary_path()) |
85
|
|
|
|
86
|
|
|
@patch("napps.kytos.mef_eline.models.evc.log") |
87
|
|
|
def test_deploy_to_case_1(self, log_mocked): |
88
|
|
|
"""Test if the path is equal to current_path.""" |
89
|
|
|
primary_path = [ |
90
|
|
|
get_link_mocked( |
91
|
|
|
endpoint_a_port=10, endpoint_b_port=9, metadata={"s_vlan": 5} |
92
|
|
|
), |
93
|
|
|
get_link_mocked( |
94
|
|
|
endpoint_a_port=12, endpoint_b_port=11, metadata={"s_vlan": 6} |
95
|
|
|
), |
96
|
|
|
] |
97
|
|
|
attributes = { |
98
|
|
|
"controller": get_controller_mock(), |
99
|
|
|
"name": "circuit_3", |
100
|
|
|
"uni_a": get_uni_mocked(is_valid=True), |
101
|
|
|
"uni_z": get_uni_mocked(is_valid=True), |
102
|
|
|
"primary_path": primary_path, |
103
|
|
|
} |
104
|
|
|
evc = EVC(**attributes) |
105
|
|
|
evc.current_path = evc.primary_path |
106
|
|
|
|
107
|
|
|
expected_deployed = evc.deploy_to("primary_path", evc.primary_path) |
108
|
|
|
expected_msg = "primary_path is equal to current_path." |
109
|
|
|
log_mocked.debug.assert_called_with(expected_msg) |
110
|
|
|
self.assertTrue(expected_deployed) |
111
|
|
|
|
112
|
|
|
# pylint: disable=too-many-arguments |
113
|
|
|
@patch("napps.kytos.mef_eline.models.evc.notify_link_available_tags") |
114
|
|
|
@patch("requests.post") |
115
|
|
|
@patch("napps.kytos.mef_eline.storehouse.StoreHouse.save_evc") |
116
|
|
|
@patch("napps.kytos.mef_eline.models.evc.EVCDeploy.deploy") |
117
|
|
|
@patch("napps.kytos.mef_eline.models.evc.EVC._install_nni_flows") |
118
|
|
|
@patch("napps.kytos.mef_eline.models.evc.EVC._install_uni_flows") |
119
|
|
|
@patch("napps.kytos.mef_eline.models.path.Path.status", EntityStatus.UP) |
120
|
|
|
def test_deploy_to_case_2( |
121
|
|
|
self, |
122
|
|
|
install_uni_flows_mocked, |
123
|
|
|
install_nni_flows_mocked, |
124
|
|
|
deploy_mocked, |
125
|
|
|
_, |
126
|
|
|
requests_mock, |
127
|
|
|
notify_mock |
128
|
|
|
): |
129
|
|
|
"""Test deploy with all links up.""" |
130
|
|
|
deploy_mocked.return_value = True |
131
|
|
|
response = MagicMock() |
132
|
|
|
response.status_code = 201 |
133
|
|
|
requests_mock.return_value = response |
134
|
|
|
|
135
|
|
|
primary_path = [ |
136
|
|
|
get_link_mocked(status=EntityStatus.UP), |
137
|
|
|
get_link_mocked(status=EntityStatus.UP), |
138
|
|
|
] |
139
|
|
|
attributes = { |
140
|
|
|
"controller": get_controller_mock(), |
141
|
|
|
"name": "circuit_4", |
142
|
|
|
"uni_a": get_uni_mocked(is_valid=True), |
143
|
|
|
"uni_z": get_uni_mocked(is_valid=True), |
144
|
|
|
"primary_path": primary_path, |
145
|
|
|
"enabled": True, |
146
|
|
|
} |
147
|
|
|
evc = EVC(**attributes) |
148
|
|
|
|
149
|
|
|
deployed = evc.deploy_to("primary_path", evc.primary_path) |
150
|
|
|
install_uni_flows_mocked.assert_called_with(evc.primary_path) |
151
|
|
|
install_nni_flows_mocked.assert_called_with(evc.primary_path) |
152
|
|
|
self.assertTrue(deployed) |
153
|
|
|
notify_mock.assert_called() |
154
|
|
|
|
155
|
|
|
@patch("requests.get", side_effect=get_mocked_requests) |
156
|
|
|
def test_deploy_to_case_3(self, requests_mocked): |
157
|
|
|
# pylint: disable=unused-argument |
158
|
|
|
"""Test deploy with one link down.""" |
159
|
|
|
link1 = get_link_mocked() |
160
|
|
|
link2 = get_link_mocked() |
161
|
|
|
link1.id = "abc" |
162
|
|
|
link2.id = "def" |
163
|
|
|
primary_path = [link1, link2] |
164
|
|
|
attributes = { |
165
|
|
|
"controller": get_controller_mock(), |
166
|
|
|
"name": "circuit_5", |
167
|
|
|
"uni_a": get_uni_mocked(is_valid=True), |
168
|
|
|
"uni_z": get_uni_mocked(is_valid=True), |
169
|
|
|
"primary_path": primary_path, |
170
|
|
|
"enabled": True, |
171
|
|
|
} |
172
|
|
|
evc = EVC(**attributes) |
173
|
|
|
|
174
|
|
|
# storehouse initialization mock |
175
|
|
|
evc._storehouse.box = Mock() # pylint: disable=protected-access |
176
|
|
|
evc._storehouse.box.data = {} # pylint: disable=protected-access |
177
|
|
|
|
178
|
|
|
deployed = evc.deploy_to("primary_path", evc.primary_path) |
179
|
|
|
self.assertFalse(deployed) |
180
|
|
|
|
181
|
|
View Code Duplication |
@patch("napps.kytos.mef_eline.models.evc.log") |
|
|
|
|
182
|
|
|
@patch("napps.kytos.mef_eline.models.evc.EVCDeploy._send_flow_mods") |
183
|
|
|
@patch(DEPLOY_TO_BACKUP_PATH) |
184
|
|
|
@patch("napps.kytos.mef_eline.models.evc.EVCDeploy.deploy") |
185
|
|
|
@patch("napps.kytos.mef_eline.models.path.Path.status") |
186
|
|
|
def test_handle_link_down_case_1( |
187
|
|
|
self, |
188
|
|
|
path_status_mocked, |
189
|
|
|
deploy_mocked, |
190
|
|
|
deploy_to_mocked, |
191
|
|
|
_send_flow_mods_mocked, |
192
|
|
|
log_mocked, |
193
|
|
|
): |
194
|
|
|
"""Test if deploy_to backup path is called.""" |
195
|
|
|
deploy_mocked.return_value = True |
196
|
|
|
path_status_mocked.side_effect = [EntityStatus.DOWN, EntityStatus.UP] |
197
|
|
|
|
198
|
|
|
primary_path = [ |
199
|
|
|
get_link_mocked( |
200
|
|
|
endpoint_a_port=9, |
201
|
|
|
endpoint_b_port=10, |
202
|
|
|
metadata={"s_vlan": 5}, |
203
|
|
|
status=EntityStatus.DOWN, |
204
|
|
|
), |
205
|
|
|
get_link_mocked( |
206
|
|
|
endpoint_a_port=11, |
207
|
|
|
endpoint_b_port=12, |
208
|
|
|
metadata={"s_vlan": 6}, |
209
|
|
|
status=EntityStatus.UP, |
210
|
|
|
), |
211
|
|
|
] |
212
|
|
|
backup_path = [ |
213
|
|
|
get_link_mocked( |
214
|
|
|
endpoint_a_port=9, |
215
|
|
|
endpoint_b_port=10, |
216
|
|
|
metadata={"s_vlan": 5}, |
217
|
|
|
status=EntityStatus.UP, |
218
|
|
|
), |
219
|
|
|
get_link_mocked( |
220
|
|
|
endpoint_a_port=11, |
221
|
|
|
endpoint_b_port=12, |
222
|
|
|
metadata={"s_vlan": 6}, |
223
|
|
|
status=EntityStatus.UP, |
224
|
|
|
), |
225
|
|
|
] |
226
|
|
|
attributes = { |
227
|
|
|
"controller": get_controller_mock(), |
228
|
|
|
"name": "circuit_6", |
229
|
|
|
"uni_a": get_uni_mocked(is_valid=True), |
230
|
|
|
"uni_z": get_uni_mocked(is_valid=True), |
231
|
|
|
"primary_path": primary_path, |
232
|
|
|
"backup_path": backup_path, |
233
|
|
|
"enabled": True, |
234
|
|
|
} |
235
|
|
|
evc = EVC(**attributes) |
236
|
|
|
|
237
|
|
|
evc.current_path = evc.primary_path |
238
|
|
|
evc.activate() |
239
|
|
|
deploy_to_mocked.reset_mock() |
240
|
|
|
current_handle_link_down = evc.handle_link_down() |
241
|
|
|
self.assertEqual(deploy_mocked.call_count, 0) |
242
|
|
|
deploy_to_mocked.assert_called_once() |
243
|
|
|
|
244
|
|
|
self.assertTrue(current_handle_link_down) |
245
|
|
|
msg = f"{evc} deployed after link down." |
246
|
|
|
log_mocked.debug.assert_called_once_with(msg) |
247
|
|
|
|
248
|
|
View Code Duplication |
@patch("napps.kytos.mef_eline.models.evc.log") |
|
|
|
|
249
|
|
|
@patch("napps.kytos.mef_eline.models.evc.EVCDeploy.deploy") |
250
|
|
|
@patch(DEPLOY_TO_PRIMARY_PATH) |
251
|
|
|
@patch("napps.kytos.mef_eline.models.path.Path.status") |
252
|
|
|
def test_handle_link_down_case_2( |
253
|
|
|
self, path_status_mocked, deploy_to_mocked, deploy_mocked, log_mocked |
254
|
|
|
): |
255
|
|
|
"""Test if deploy_to backup path is called.""" |
256
|
|
|
deploy_mocked.return_value = True |
257
|
|
|
deploy_to_mocked.return_value = True |
258
|
|
|
path_status_mocked.side_effect = [EntityStatus.UP, EntityStatus.DOWN] |
259
|
|
|
primary_path = [ |
260
|
|
|
get_link_mocked( |
261
|
|
|
endpoint_a_port=7, |
262
|
|
|
endpoint_b_port=8, |
263
|
|
|
metadata={"s_vlan": 5}, |
264
|
|
|
status=EntityStatus.UP, |
265
|
|
|
), |
266
|
|
|
get_link_mocked( |
267
|
|
|
endpoint_a_port=11, |
268
|
|
|
endpoint_b_port=12, |
269
|
|
|
metadata={"s_vlan": 6}, |
270
|
|
|
status=EntityStatus.UP, |
271
|
|
|
), |
272
|
|
|
] |
273
|
|
|
backup_path = [ |
274
|
|
|
get_link_mocked( |
275
|
|
|
endpoint_a_port=7, |
276
|
|
|
endpoint_b_port=10, |
277
|
|
|
metadata={"s_vlan": 5}, |
278
|
|
|
status=EntityStatus.DOWN, |
279
|
|
|
), |
280
|
|
|
get_link_mocked( |
281
|
|
|
endpoint_a_port=15, |
282
|
|
|
endpoint_b_port=12, |
283
|
|
|
metadata={"s_vlan": 6}, |
284
|
|
|
status=EntityStatus.UP, |
285
|
|
|
), |
286
|
|
|
] |
287
|
|
|
attributes = { |
288
|
|
|
"controller": get_controller_mock(), |
289
|
|
|
"name": "circuit_13", |
290
|
|
|
"uni_a": get_uni_mocked(is_valid=True), |
291
|
|
|
"uni_z": get_uni_mocked(is_valid=True), |
292
|
|
|
"primary_path": primary_path, |
293
|
|
|
"backup_path": backup_path, |
294
|
|
|
"enabled": True, |
295
|
|
|
} |
296
|
|
|
|
297
|
|
|
evc = EVC(**attributes) |
298
|
|
|
evc.current_path = evc.backup_path |
299
|
|
|
deploy_to_mocked.reset_mock() |
300
|
|
|
current_handle_link_down = evc.handle_link_down() |
301
|
|
|
self.assertEqual(deploy_mocked.call_count, 0) |
302
|
|
|
deploy_to_mocked.assert_called_once() |
303
|
|
|
self.assertTrue(current_handle_link_down) |
304
|
|
|
msg = f"{evc} deployed after link down." |
305
|
|
|
log_mocked.debug.assert_called_once_with(msg) |
306
|
|
|
|
307
|
|
|
@patch("napps.kytos.mef_eline.models.evc.log") |
308
|
|
|
@patch("napps.kytos.mef_eline.models.evc.EVCDeploy.deploy") |
309
|
|
|
@patch(DEPLOY_TO_PRIMARY_PATH) |
310
|
|
|
@patch("napps.kytos.mef_eline.models.path.DynamicPathManager.get_paths") |
311
|
|
|
@patch("napps.kytos.mef_eline.models.path.Path.status", EntityStatus.DOWN) |
312
|
|
|
def test_handle_link_down_case_3( |
313
|
|
|
self, get_paths_mocked, deploy_to_mocked, deploy_mocked, log_mocked |
314
|
|
|
): |
315
|
|
|
"""Test if circuit without dynamic path is return failed.""" |
316
|
|
|
deploy_mocked.return_value = False |
317
|
|
|
deploy_to_mocked.return_value = False |
318
|
|
|
primary_path = [ |
319
|
|
|
get_link_mocked( |
320
|
|
|
endpoint_a_port=9, |
321
|
|
|
endpoint_b_port=10, |
322
|
|
|
metadata={"s_vlan": 5}, |
323
|
|
|
status=EntityStatus.DOWN, |
324
|
|
|
), |
325
|
|
|
get_link_mocked( |
326
|
|
|
endpoint_a_port=11, |
327
|
|
|
endpoint_b_port=12, |
328
|
|
|
metadata={"s_vlan": 6}, |
329
|
|
|
status=EntityStatus.UP, |
330
|
|
|
), |
331
|
|
|
] |
332
|
|
|
backup_path = [ |
333
|
|
|
get_link_mocked( |
334
|
|
|
endpoint_a_port=9, |
335
|
|
|
endpoint_b_port=10, |
336
|
|
|
metadata={"s_vlan": 5}, |
337
|
|
|
status=EntityStatus.DOWN, |
338
|
|
|
), |
339
|
|
|
get_link_mocked( |
340
|
|
|
endpoint_a_port=13, |
341
|
|
|
endpoint_b_port=14, |
342
|
|
|
metadata={"s_vlan": 6}, |
343
|
|
|
status=EntityStatus.UP, |
344
|
|
|
), |
345
|
|
|
] |
346
|
|
|
attributes = { |
347
|
|
|
"controller": get_controller_mock(), |
348
|
|
|
"name": "circuit_7", |
349
|
|
|
"uni_a": get_uni_mocked(is_valid=True), |
350
|
|
|
"uni_z": get_uni_mocked(is_valid=True), |
351
|
|
|
"primary_path": primary_path, |
352
|
|
|
"backup_path": backup_path, |
353
|
|
|
"enabled": True, |
354
|
|
|
} |
355
|
|
|
|
356
|
|
|
evc = EVC(**attributes) |
357
|
|
|
evc.current_path = evc.backup_path |
358
|
|
|
deploy_to_mocked.reset_mock() |
359
|
|
|
current_handle_link_down = evc.handle_link_down() |
360
|
|
|
|
361
|
|
|
self.assertEqual(get_paths_mocked.call_count, 0) |
362
|
|
|
self.assertEqual(deploy_mocked.call_count, 0) |
363
|
|
|
self.assertEqual(deploy_to_mocked.call_count, 1) |
364
|
|
|
|
365
|
|
|
self.assertFalse(current_handle_link_down) |
366
|
|
|
msg = f"Failed to re-deploy {evc} after link down." |
367
|
|
|
log_mocked.debug.assert_called_once_with(msg) |
368
|
|
|
|
369
|
|
View Code Duplication |
@patch("napps.kytos.mef_eline.models.evc.log") |
|
|
|
|
370
|
|
|
@patch("napps.kytos.mef_eline.models.evc.EVCDeploy.deploy_to_path") |
371
|
|
|
@patch("napps.kytos.mef_eline.models.evc.EVCDeploy._send_flow_mods") |
372
|
|
|
@patch(DEPLOY_TO_PRIMARY_PATH) |
373
|
|
|
@patch("napps.kytos.mef_eline.models.path.Path.status", EntityStatus.DOWN) |
374
|
|
|
def test_handle_link_down_case_4( |
375
|
|
|
self, |
376
|
|
|
deploy_to_mocked, |
377
|
|
|
_send_flow_mods_mocked, |
378
|
|
|
deploy_mocked, |
379
|
|
|
log_mocked, |
380
|
|
|
): |
381
|
|
|
"""Test if circuit with dynamic path is return success.""" |
382
|
|
|
deploy_mocked.return_value = True |
383
|
|
|
deploy_to_mocked.return_value = False |
384
|
|
|
primary_path = [ |
385
|
|
|
get_link_mocked( |
386
|
|
|
endpoint_a_port=9, |
387
|
|
|
endpoint_b_port=10, |
388
|
|
|
metadata={"s_vlan": 5}, |
389
|
|
|
status=EntityStatus.DOWN, |
390
|
|
|
), |
391
|
|
|
get_link_mocked( |
392
|
|
|
endpoint_a_port=11, |
393
|
|
|
endpoint_b_port=12, |
394
|
|
|
metadata={"s_vlan": 6}, |
395
|
|
|
status=EntityStatus.UP, |
396
|
|
|
), |
397
|
|
|
] |
398
|
|
|
backup_path = [ |
399
|
|
|
get_link_mocked( |
400
|
|
|
endpoint_a_port=9, |
401
|
|
|
endpoint_b_port=10, |
402
|
|
|
metadata={"s_vlan": 5}, |
403
|
|
|
status=EntityStatus.DOWN, |
404
|
|
|
), |
405
|
|
|
get_link_mocked( |
406
|
|
|
endpoint_a_port=13, |
407
|
|
|
endpoint_b_port=14, |
408
|
|
|
metadata={"s_vlan": 6}, |
409
|
|
|
status=EntityStatus.UP, |
410
|
|
|
), |
411
|
|
|
] |
412
|
|
|
attributes = { |
413
|
|
|
"controller": get_controller_mock(), |
414
|
|
|
"name": "circuit_8", |
415
|
|
|
"uni_a": get_uni_mocked(is_valid=True), |
416
|
|
|
"uni_z": get_uni_mocked(is_valid=True), |
417
|
|
|
"primary_path": primary_path, |
418
|
|
|
"backup_path": backup_path, |
419
|
|
|
"enabled": True, |
420
|
|
|
"dynamic_backup_path": True, |
421
|
|
|
} |
422
|
|
|
|
423
|
|
|
evc = EVC(**attributes) |
424
|
|
|
evc.current_path = evc.backup_path |
425
|
|
|
|
426
|
|
|
# storehouse mock |
427
|
|
|
evc._storehouse.box = Mock() # pylint: disable=protected-access |
428
|
|
|
evc._storehouse.box.data = {} # pylint: disable=protected-access |
429
|
|
|
|
430
|
|
|
deploy_to_mocked.reset_mock() |
431
|
|
|
current_handle_link_down = evc.handle_link_down() |
432
|
|
|
self.assertEqual(deploy_to_mocked.call_count, 1) |
433
|
|
|
|
434
|
|
|
self.assertTrue(current_handle_link_down) |
435
|
|
|
msg = f"{evc} deployed after link down." |
436
|
|
|
log_mocked.debug.assert_called_with(msg) |
437
|
|
|
|
438
|
|
|
@patch("napps.kytos.mef_eline.models.evc.EVCDeploy.deploy") |
439
|
|
|
@patch("napps.kytos.mef_eline.models.evc.LinkProtection.deploy_to") |
440
|
|
|
def test_handle_link_up_case_1(self, deploy_to_mocked, deploy_mocked): |
441
|
|
|
"""Test if handle link up do nothing when is using primary path.""" |
442
|
|
|
deploy_mocked.return_value = True |
443
|
|
|
deploy_to_mocked.return_value = True |
444
|
|
|
primary_path = [ |
445
|
|
|
get_link_mocked( |
446
|
|
|
endpoint_a_port=9, |
447
|
|
|
endpoint_b_port=10, |
448
|
|
|
metadata={"s_vlan": 5}, |
449
|
|
|
status=EntityStatus.UP, |
450
|
|
|
), |
451
|
|
|
get_link_mocked( |
452
|
|
|
endpoint_a_port=11, |
453
|
|
|
endpoint_b_port=12, |
454
|
|
|
metadata={"s_vlan": 6}, |
455
|
|
|
status=EntityStatus.UP, |
456
|
|
|
), |
457
|
|
|
] |
458
|
|
|
backup_path = [ |
459
|
|
|
get_link_mocked( |
460
|
|
|
endpoint_a_port=9, |
461
|
|
|
endpoint_b_port=14, |
462
|
|
|
metadata={"s_vlan": 5}, |
463
|
|
|
status=EntityStatus.UP, |
464
|
|
|
), |
465
|
|
|
get_link_mocked( |
466
|
|
|
endpoint_a_port=15, |
467
|
|
|
endpoint_b_port=12, |
468
|
|
|
metadata={"s_vlan": 6}, |
469
|
|
|
status=EntityStatus.UP, |
470
|
|
|
), |
471
|
|
|
] |
472
|
|
|
attributes = { |
473
|
|
|
"controller": get_controller_mock(), |
474
|
|
|
"name": "circuit_9", |
475
|
|
|
"uni_a": get_uni_mocked(is_valid=True), |
476
|
|
|
"uni_z": get_uni_mocked(is_valid=True), |
477
|
|
|
"primary_path": primary_path, |
478
|
|
|
"backup_path": backup_path, |
479
|
|
|
"enabled": True, |
480
|
|
|
"dynamic_backup_path": True, |
481
|
|
|
} |
482
|
|
|
|
483
|
|
|
evc = EVC(**attributes) |
484
|
|
|
evc.current_path = evc.primary_path |
485
|
|
|
deploy_to_mocked.reset_mock() |
486
|
|
|
current_handle_link_up = evc.handle_link_up(backup_path[0]) |
487
|
|
|
self.assertEqual(deploy_mocked.call_count, 0) |
488
|
|
|
self.assertEqual(deploy_to_mocked.call_count, 0) |
489
|
|
|
self.assertTrue(current_handle_link_up) |
490
|
|
|
|
491
|
|
|
@patch("napps.kytos.mef_eline.models.evc.EVCDeploy.deploy") |
492
|
|
|
@patch("napps.kytos.mef_eline.models.evc.EVCDeploy.deploy_to_path") |
493
|
|
|
@patch("napps.kytos.mef_eline.models.path.Path.status", EntityStatus.UP) |
494
|
|
|
def test_handle_link_up_case_2(self, deploy_to_path_mocked, deploy_mocked): |
495
|
|
|
"""Test if it is changing from backup_path to primary_path.""" |
496
|
|
|
deploy_mocked.return_value = True |
497
|
|
|
deploy_to_path_mocked.return_value = True |
498
|
|
|
primary_path = [ |
499
|
|
|
get_link_mocked( |
500
|
|
|
endpoint_a_port=9, |
501
|
|
|
endpoint_b_port=10, |
502
|
|
|
metadata={"s_vlan": 5}, |
503
|
|
|
status=EntityStatus.UP, |
504
|
|
|
), |
505
|
|
|
get_link_mocked( |
506
|
|
|
endpoint_a_port=11, |
507
|
|
|
endpoint_b_port=12, |
508
|
|
|
metadata={"s_vlan": 6}, |
509
|
|
|
status=EntityStatus.UP, |
510
|
|
|
), |
511
|
|
|
] |
512
|
|
|
backup_path = [ |
513
|
|
|
get_link_mocked( |
514
|
|
|
endpoint_a_port=9, |
515
|
|
|
endpoint_b_port=14, |
516
|
|
|
metadata={"s_vlan": 5}, |
517
|
|
|
status=EntityStatus.UP, |
518
|
|
|
), |
519
|
|
|
get_link_mocked( |
520
|
|
|
endpoint_a_port=15, |
521
|
|
|
endpoint_b_port=12, |
522
|
|
|
metadata={"s_vlan": 6}, |
523
|
|
|
status=EntityStatus.UP, |
524
|
|
|
), |
525
|
|
|
] |
526
|
|
|
attributes = { |
527
|
|
|
"controller": get_controller_mock(), |
528
|
|
|
"name": "circuit_10", |
529
|
|
|
"uni_a": get_uni_mocked(is_valid=True), |
530
|
|
|
"uni_z": get_uni_mocked(is_valid=True), |
531
|
|
|
"primary_path": primary_path, |
532
|
|
|
"backup_path": backup_path, |
533
|
|
|
"enabled": True, |
534
|
|
|
"dynamic_backup_path": True, |
535
|
|
|
} |
536
|
|
|
|
537
|
|
|
evc = EVC(**attributes) |
538
|
|
|
evc.current_path = evc.backup_path |
539
|
|
|
deploy_to_path_mocked.reset_mock() |
540
|
|
|
current_handle_link_up = evc.handle_link_up(primary_path[0]) |
541
|
|
|
self.assertEqual(deploy_mocked.call_count, 0) |
542
|
|
|
self.assertEqual(deploy_to_path_mocked.call_count, 1) |
543
|
|
|
deploy_to_path_mocked.assert_called_once_with(evc.primary_path) |
544
|
|
|
self.assertTrue(current_handle_link_up) |
545
|
|
|
|
546
|
|
|
@patch("napps.kytos.mef_eline.models.evc.EVCDeploy.deploy") |
547
|
|
|
@patch("napps.kytos.mef_eline.models.evc.EVCDeploy.deploy_to_path") |
548
|
|
|
@patch(GET_BEST_PATH) |
549
|
|
|
@patch("napps.kytos.mef_eline.models.evc.EVC._install_nni_flows") |
550
|
|
|
@patch("napps.kytos.mef_eline.models.evc.EVC._install_uni_flows") |
551
|
|
|
@patch("napps.kytos.mef_eline.models.path.Path.status", EntityStatus.UP) |
552
|
|
|
def test_handle_link_up_case_3( |
553
|
|
|
self, |
554
|
|
|
_install_uni_flows_mocked, |
555
|
|
|
_install_nni_flows_mocked, |
556
|
|
|
get_best_path_mocked, |
557
|
|
|
deploy_to_path_mocked, |
558
|
|
|
deploy_mocked, |
559
|
|
|
): |
560
|
|
|
"""Test if it is deployed after the backup is up.""" |
561
|
|
|
deploy_mocked.return_value = True |
562
|
|
|
deploy_to_path_mocked.return_value = True |
563
|
|
|
primary_path = [ |
564
|
|
|
get_link_mocked( |
565
|
|
|
endpoint_a_port=9, |
566
|
|
|
endpoint_b_port=10, |
567
|
|
|
metadata={"s_vlan": 5}, |
568
|
|
|
status=EntityStatus.DOWN, |
569
|
|
|
), |
570
|
|
|
get_link_mocked( |
571
|
|
|
endpoint_a_port=11, |
572
|
|
|
endpoint_b_port=12, |
573
|
|
|
metadata={"s_vlan": 6}, |
574
|
|
|
status=EntityStatus.UP, |
575
|
|
|
), |
576
|
|
|
] |
577
|
|
|
backup_path = [ |
578
|
|
|
get_link_mocked( |
579
|
|
|
endpoint_a_port=9, |
580
|
|
|
endpoint_b_port=14, |
581
|
|
|
metadata={"s_vlan": 5}, |
582
|
|
|
status=EntityStatus.DOWN, |
583
|
|
|
), |
584
|
|
|
get_link_mocked( |
585
|
|
|
endpoint_a_port=15, |
586
|
|
|
endpoint_b_port=12, |
587
|
|
|
metadata={"s_vlan": 6}, |
588
|
|
|
status=EntityStatus.UP, |
589
|
|
|
), |
590
|
|
|
] |
591
|
|
|
attributes = { |
592
|
|
|
"controller": get_controller_mock(), |
593
|
|
|
"name": "circuit_11", |
594
|
|
|
"uni_a": get_uni_mocked(is_valid=True), |
595
|
|
|
"uni_z": get_uni_mocked(is_valid=True), |
596
|
|
|
"primary_path": primary_path, |
597
|
|
|
"backup_path": backup_path, |
598
|
|
|
"enabled": True, |
599
|
|
|
"dynamic_backup_path": True, |
600
|
|
|
} |
601
|
|
|
|
602
|
|
|
evc = EVC(**attributes) |
603
|
|
|
|
604
|
|
|
# storehouse initialization mock |
605
|
|
|
evc._storehouse.box = Mock() # pylint: disable=protected-access |
606
|
|
|
evc._storehouse.box.data = {} # pylint: disable=protected-access |
607
|
|
|
|
608
|
|
|
evc.current_path = Path([]) |
609
|
|
|
deploy_to_path_mocked.reset_mock() |
610
|
|
|
current_handle_link_up = evc.handle_link_up(backup_path[0]) |
611
|
|
|
|
612
|
|
|
self.assertEqual(get_best_path_mocked.call_count, 0) |
613
|
|
|
self.assertEqual(deploy_mocked.call_count, 0) |
614
|
|
|
self.assertEqual(deploy_to_path_mocked.call_count, 1) |
615
|
|
|
deploy_to_path_mocked.assert_called_once_with(evc.backup_path) |
616
|
|
|
self.assertTrue(current_handle_link_up) |
617
|
|
|
|
618
|
|
|
@patch("napps.kytos.mef_eline.models.evc.EVCDeploy.deploy_to_path") |
619
|
|
|
@patch(GET_BEST_PATH) |
620
|
|
|
@patch("napps.kytos.mef_eline.models.evc.EVC._install_nni_flows") |
621
|
|
|
@patch("napps.kytos.mef_eline.models.evc.EVC._install_uni_flows") |
622
|
|
|
@patch("napps.kytos.mef_eline.models.path.Path.status", EntityStatus.DOWN) |
623
|
|
|
def test_handle_link_up_case_4(self, *args): |
624
|
|
|
"""Test if not path is found a dynamic path is used.""" |
625
|
|
|
( |
626
|
|
|
_install_uni_flows_mocked, |
627
|
|
|
_install_nni_flows_mocked, |
628
|
|
|
get_best_path_mocked, |
629
|
|
|
deploy_to_path_mocked, |
630
|
|
|
) = args |
631
|
|
|
|
632
|
|
|
deploy_to_path_mocked.return_value = True |
633
|
|
|
|
634
|
|
|
primary_path = [ |
635
|
|
|
get_link_mocked( |
636
|
|
|
endpoint_a_port=9, |
637
|
|
|
endpoint_b_port=10, |
638
|
|
|
metadata={"s_vlan": 5}, |
639
|
|
|
status=EntityStatus.UP, |
640
|
|
|
), |
641
|
|
|
get_link_mocked( |
642
|
|
|
endpoint_a_port=11, |
643
|
|
|
endpoint_b_port=12, |
644
|
|
|
metadata={"s_vlan": 6}, |
645
|
|
|
status=EntityStatus.DOWN, |
646
|
|
|
), |
647
|
|
|
] |
648
|
|
|
backup_path = [ |
649
|
|
|
get_link_mocked( |
650
|
|
|
endpoint_a_port=13, |
651
|
|
|
endpoint_b_port=14, |
652
|
|
|
metadata={"s_vlan": 5}, |
653
|
|
|
status=EntityStatus.DOWN, |
654
|
|
|
), |
655
|
|
|
get_link_mocked( |
656
|
|
|
endpoint_a_port=11, |
657
|
|
|
endpoint_b_port=12, |
658
|
|
|
metadata={"s_vlan": 6}, |
659
|
|
|
status=EntityStatus.DOWN, |
660
|
|
|
), |
661
|
|
|
] |
662
|
|
|
|
663
|
|
|
# Setup best_path mock |
664
|
|
|
best_path = Path() |
665
|
|
|
best_path.append(primary_path[0]) |
666
|
|
|
get_best_path_mocked.return_value = best_path |
667
|
|
|
|
668
|
|
|
attributes = { |
669
|
|
|
"controller": get_controller_mock(), |
670
|
|
|
"name": "circuit_12", |
671
|
|
|
"uni_a": get_uni_mocked(is_valid=True), |
672
|
|
|
"uni_z": get_uni_mocked(is_valid=True), |
673
|
|
|
"primary_path": primary_path, |
674
|
|
|
"backup_path": backup_path, |
675
|
|
|
"enabled": True, |
676
|
|
|
"dynamic_backup_path": True, |
677
|
|
|
} |
678
|
|
|
|
679
|
|
|
evc = EVC(**attributes) |
680
|
|
|
evc.current_path = Path([]) |
681
|
|
|
|
682
|
|
|
# storehouse mock |
683
|
|
|
evc._storehouse.box = Mock() # pylint: disable=protected-access |
684
|
|
|
evc._storehouse.box.data = {} # pylint: disable=protected-access |
685
|
|
|
|
686
|
|
|
deploy_to_path_mocked.reset_mock() |
687
|
|
|
current_handle_link_up = evc.handle_link_up(backup_path[0]) |
688
|
|
|
|
689
|
|
|
self.assertEqual(get_best_path_mocked.call_count, 0) |
690
|
|
|
self.assertEqual(deploy_to_path_mocked.call_count, 1) |
691
|
|
|
deploy_to_path_mocked.assert_called_once_with() |
692
|
|
|
self.assertTrue(current_handle_link_up) |
693
|
|
|
|