1
|
|
|
"""Module to test the LinkProtection class.""" |
2
|
1 |
|
import sys |
3
|
1 |
|
from unittest.mock import MagicMock, patch |
4
|
|
|
|
5
|
1 |
|
from kytos.core.common import EntityStatus |
6
|
1 |
|
from kytos.lib.helpers import get_controller_mock |
7
|
1 |
|
from napps.kytos.mef_eline.models import EVC, Path # NOQA pycodestyle |
8
|
1 |
|
from napps.kytos.mef_eline.tests.helpers import ( |
9
|
|
|
get_link_mocked, |
10
|
|
|
get_uni_mocked, |
11
|
|
|
id_to_interface_mock |
12
|
|
|
) # NOQA pycodestyle |
13
|
|
|
|
14
|
|
|
|
15
|
1 |
|
sys.path.insert(0, "/var/lib/kytos/napps/..") |
16
|
|
|
|
17
|
|
|
|
18
|
1 |
|
DEPLOY_TO_PRIMARY_PATH = ( |
19
|
|
|
"napps.kytos.mef_eline.models.evc.LinkProtection.deploy_to_primary_path" |
20
|
|
|
) |
21
|
1 |
|
DEPLOY_TO_BACKUP_PATH = ( |
22
|
|
|
"napps.kytos.mef_eline.models.evc.LinkProtection.deploy_to_backup_path" |
23
|
|
|
) |
24
|
1 |
|
GET_BEST_PATH = ( |
25
|
|
|
"napps.kytos.mef_eline.models.path.DynamicPathManager.get_best_path" |
26
|
|
|
) |
27
|
|
|
|
28
|
|
|
|
29
|
1 |
|
class TestLinkProtection(): # pylint: disable=too-many-public-methods |
30
|
|
|
"""Tests to validate LinkProtection class.""" |
31
|
|
|
|
32
|
1 |
|
def setup_method(self): |
33
|
|
|
"""Set up method""" |
34
|
1 |
|
primary_path = [ |
35
|
|
|
get_link_mocked( |
36
|
|
|
endpoint_a_port=9, |
37
|
|
|
endpoint_b_port=10, |
38
|
|
|
metadata={"s_vlan": 5}, |
39
|
|
|
status=EntityStatus.UP, |
40
|
|
|
), |
41
|
|
|
get_link_mocked( |
42
|
|
|
endpoint_a_port=11, |
43
|
|
|
endpoint_b_port=12, |
44
|
|
|
metadata={"s_vlan": 6}, |
45
|
|
|
status=EntityStatus.DOWN, |
46
|
|
|
), |
47
|
|
|
] |
48
|
1 |
|
backup_path = [ |
49
|
|
|
get_link_mocked( |
50
|
|
|
endpoint_a_port=13, |
51
|
|
|
endpoint_b_port=14, |
52
|
|
|
metadata={"s_vlan": 5}, |
53
|
|
|
status=EntityStatus.DOWN, |
54
|
|
|
), |
55
|
|
|
get_link_mocked( |
56
|
|
|
endpoint_a_port=11, |
57
|
|
|
endpoint_b_port=12, |
58
|
|
|
metadata={"s_vlan": 6}, |
59
|
|
|
status=EntityStatus.DOWN, |
60
|
|
|
), |
61
|
|
|
] |
62
|
1 |
|
attributes = { |
63
|
|
|
"controller": get_controller_mock(), |
64
|
|
|
"name": "circuit_1", |
65
|
|
|
"uni_a": get_uni_mocked(is_valid=True), |
66
|
|
|
"uni_z": get_uni_mocked(is_valid=True), |
67
|
|
|
"primary_path": primary_path, |
68
|
|
|
"backup_path": backup_path, |
69
|
|
|
"enabled": True, |
70
|
|
|
"dynamic_backup_path": True, |
71
|
|
|
} |
72
|
1 |
|
self.evc = EVC(**attributes) |
73
|
|
|
|
74
|
1 |
|
async def test_is_using_backup_path(self): |
75
|
|
|
"""Test test is using backup path.""" |
76
|
|
|
|
77
|
1 |
|
attributes = { |
78
|
|
|
"controller": get_controller_mock(), |
79
|
|
|
"name": "circuit_1", |
80
|
|
|
"uni_a": get_uni_mocked(is_valid=True), |
81
|
|
|
"uni_z": get_uni_mocked(is_valid=True), |
82
|
|
|
"backup_path": [ |
83
|
|
|
get_link_mocked( |
84
|
|
|
endpoint_a_port=10, |
85
|
|
|
endpoint_b_port=9, |
86
|
|
|
metadata={"s_vlan": 5}, |
87
|
|
|
), |
88
|
|
|
get_link_mocked( |
89
|
|
|
endpoint_a_port=12, |
90
|
|
|
endpoint_b_port=11, |
91
|
|
|
metadata={"s_vlan": 6}, |
92
|
|
|
), |
93
|
|
|
], |
94
|
|
|
} |
95
|
|
|
|
96
|
1 |
|
evc = EVC(**attributes) |
97
|
1 |
|
assert evc.is_using_backup_path() is False |
98
|
1 |
|
evc.current_path = evc.backup_path |
99
|
1 |
|
assert evc.is_using_backup_path() |
100
|
|
|
|
101
|
1 |
|
async def test_is_using_primary_path(self): |
102
|
|
|
"""Test test is using primary path.""" |
103
|
1 |
|
primary_path = [ |
104
|
|
|
get_link_mocked( |
105
|
|
|
endpoint_a_port=10, endpoint_b_port=9, metadata={"s_vlan": 5} |
106
|
|
|
), |
107
|
|
|
get_link_mocked( |
108
|
|
|
endpoint_a_port=12, endpoint_b_port=11, metadata={"s_vlan": 6} |
109
|
|
|
), |
110
|
|
|
] |
111
|
|
|
|
112
|
1 |
|
attributes = { |
113
|
|
|
"controller": get_controller_mock(), |
114
|
|
|
"name": "circuit_2", |
115
|
|
|
"uni_a": get_uni_mocked(is_valid=True), |
116
|
|
|
"uni_z": get_uni_mocked(is_valid=True), |
117
|
|
|
"primary_path": primary_path, |
118
|
|
|
} |
119
|
1 |
|
evc = EVC(**attributes) |
120
|
1 |
|
assert evc.is_using_primary_path() is False |
121
|
1 |
|
evc.current_path = evc.primary_path |
122
|
1 |
|
assert evc.is_using_primary_path() |
123
|
|
|
|
124
|
1 |
|
@patch("napps.kytos.mef_eline.models.evc.log") |
125
|
1 |
|
@patch("napps.kytos.mef_eline.models.evc.EVCDeploy._send_flow_mods") |
126
|
1 |
|
@patch(DEPLOY_TO_BACKUP_PATH) |
127
|
1 |
|
@patch("napps.kytos.mef_eline.models.evc.EVCDeploy.deploy") |
128
|
1 |
|
@patch("napps.kytos.mef_eline.models.path.Path.status") |
129
|
1 |
|
async def test_handle_link_down_case_1( |
130
|
|
|
self, |
131
|
|
|
path_status_mocked, |
132
|
|
|
deploy_mocked, |
133
|
|
|
deploy_to_mocked, |
134
|
|
|
_send_flow_mods_mocked, |
135
|
|
|
log_mocked, |
136
|
|
|
): |
137
|
|
|
"""Test if deploy_to backup path is called.""" |
138
|
1 |
|
deploy_mocked.return_value = True |
139
|
1 |
|
path_status_mocked.side_effect = [EntityStatus.DOWN, EntityStatus.UP] |
140
|
|
|
|
141
|
1 |
|
self.evc.current_path = self.evc.primary_path |
142
|
1 |
|
self.evc.activate() |
143
|
1 |
|
deploy_to_mocked.reset_mock() |
144
|
1 |
|
current_handle_link_down = self.evc.handle_link_down() |
145
|
1 |
|
assert deploy_mocked.call_count == 0 |
146
|
1 |
|
deploy_to_mocked.assert_called_once() |
147
|
|
|
|
148
|
1 |
|
assert current_handle_link_down |
149
|
1 |
|
msg = f"{self.evc} deployed after link down." |
150
|
1 |
|
log_mocked.debug.assert_called_once_with(msg) |
151
|
|
|
|
152
|
1 |
View Code Duplication |
@patch("napps.kytos.mef_eline.models.evc.log") |
|
|
|
|
153
|
1 |
|
@patch("napps.kytos.mef_eline.models.evc.EVCDeploy.deploy") |
154
|
1 |
|
@patch(DEPLOY_TO_PRIMARY_PATH) |
155
|
1 |
|
@patch("napps.kytos.mef_eline.models.path.Path.status") |
156
|
1 |
|
async def test_handle_link_down_case_2( |
157
|
|
|
self, path_status_mocked, deploy_to_mocked, deploy_mocked, log_mocked |
158
|
|
|
): |
159
|
|
|
"""Test if deploy_to backup path is called.""" |
160
|
1 |
|
deploy_mocked.return_value = True |
161
|
1 |
|
deploy_to_mocked.return_value = True |
162
|
1 |
|
path_status_mocked.side_effect = [EntityStatus.UP, EntityStatus.DOWN] |
163
|
1 |
|
primary_path = [ |
164
|
|
|
get_link_mocked( |
165
|
|
|
endpoint_a_port=7, |
166
|
|
|
endpoint_b_port=8, |
167
|
|
|
metadata={"s_vlan": 5}, |
168
|
|
|
status=EntityStatus.UP, |
169
|
|
|
), |
170
|
|
|
get_link_mocked( |
171
|
|
|
endpoint_a_port=11, |
172
|
|
|
endpoint_b_port=12, |
173
|
|
|
metadata={"s_vlan": 6}, |
174
|
|
|
status=EntityStatus.UP, |
175
|
|
|
), |
176
|
|
|
] |
177
|
1 |
|
backup_path = [ |
178
|
|
|
get_link_mocked( |
179
|
|
|
endpoint_a_port=7, |
180
|
|
|
endpoint_b_port=10, |
181
|
|
|
metadata={"s_vlan": 5}, |
182
|
|
|
status=EntityStatus.DOWN, |
183
|
|
|
), |
184
|
|
|
get_link_mocked( |
185
|
|
|
endpoint_a_port=15, |
186
|
|
|
endpoint_b_port=12, |
187
|
|
|
metadata={"s_vlan": 6}, |
188
|
|
|
status=EntityStatus.UP, |
189
|
|
|
), |
190
|
|
|
] |
191
|
1 |
|
attributes = { |
192
|
|
|
"controller": get_controller_mock(), |
193
|
|
|
"name": "circuit_13", |
194
|
|
|
"uni_a": get_uni_mocked(is_valid=True), |
195
|
|
|
"uni_z": get_uni_mocked(is_valid=True), |
196
|
|
|
"primary_path": primary_path, |
197
|
|
|
"backup_path": backup_path, |
198
|
|
|
"enabled": True, |
199
|
|
|
} |
200
|
|
|
|
201
|
1 |
|
evc = EVC(**attributes) |
202
|
1 |
|
evc.current_path = evc.backup_path |
203
|
1 |
|
deploy_to_mocked.reset_mock() |
204
|
1 |
|
current_handle_link_down = evc.handle_link_down() |
205
|
1 |
|
assert deploy_mocked.call_count == 0 |
206
|
1 |
|
deploy_to_mocked.assert_called_once() |
207
|
1 |
|
assert current_handle_link_down |
208
|
1 |
|
msg = f"{evc} deployed after link down." |
209
|
1 |
|
log_mocked.debug.assert_called_once_with(msg) |
210
|
|
|
|
211
|
|
|
# pylint: disable=too-many-arguments |
212
|
1 |
|
@patch("napps.kytos.mef_eline.models.evc.EVCDeploy.remove_current_flows") |
213
|
1 |
|
@patch("napps.kytos.mef_eline.controllers.ELineController.upsert_evc") |
214
|
1 |
|
@patch("napps.kytos.mef_eline.models.evc.log") |
215
|
1 |
|
@patch("napps.kytos.mef_eline.models.evc.EVCDeploy.deploy") |
216
|
1 |
|
@patch(DEPLOY_TO_PRIMARY_PATH) |
217
|
1 |
|
@patch("napps.kytos.mef_eline.models.path.DynamicPathManager.get_paths") |
218
|
1 |
|
@patch("napps.kytos.mef_eline.models.path.Path.status", EntityStatus.DOWN) |
219
|
1 |
|
async def test_handle_link_down_case_3( |
220
|
|
|
self, get_paths_mocked, deploy_to_mocked, deploy_mocked, |
221
|
|
|
log_mocked, _, mock_remove_current |
222
|
|
|
): |
223
|
|
|
"""Test if circuit without dynamic path is return failed.""" |
224
|
1 |
|
deploy_mocked.return_value = False |
225
|
1 |
|
mock_remove_current.return_value = True |
226
|
1 |
|
deploy_to_mocked.return_value = False |
227
|
1 |
|
primary_path = [ |
228
|
|
|
get_link_mocked( |
229
|
|
|
endpoint_a_port=9, |
230
|
|
|
endpoint_b_port=10, |
231
|
|
|
metadata={"s_vlan": 5}, |
232
|
|
|
status=EntityStatus.DOWN, |
233
|
|
|
), |
234
|
|
|
get_link_mocked( |
235
|
|
|
endpoint_a_port=11, |
236
|
|
|
endpoint_b_port=12, |
237
|
|
|
metadata={"s_vlan": 6}, |
238
|
|
|
status=EntityStatus.UP, |
239
|
|
|
), |
240
|
|
|
] |
241
|
1 |
|
backup_path = [ |
242
|
|
|
get_link_mocked( |
243
|
|
|
endpoint_a_port=9, |
244
|
|
|
endpoint_b_port=10, |
245
|
|
|
metadata={"s_vlan": 5}, |
246
|
|
|
status=EntityStatus.DOWN, |
247
|
|
|
), |
248
|
|
|
get_link_mocked( |
249
|
|
|
endpoint_a_port=13, |
250
|
|
|
endpoint_b_port=14, |
251
|
|
|
metadata={"s_vlan": 6}, |
252
|
|
|
status=EntityStatus.UP, |
253
|
|
|
), |
254
|
|
|
] |
255
|
1 |
|
attributes = { |
256
|
|
|
"controller": get_controller_mock(), |
257
|
|
|
"name": "circuit_7", |
258
|
|
|
"uni_a": get_uni_mocked(is_valid=True), |
259
|
|
|
"uni_z": get_uni_mocked(is_valid=True), |
260
|
|
|
"primary_path": primary_path, |
261
|
|
|
"backup_path": backup_path, |
262
|
|
|
"enabled": True, |
263
|
|
|
} |
264
|
|
|
|
265
|
1 |
|
evc = EVC(**attributes) |
266
|
1 |
|
evc.current_path = evc.backup_path |
267
|
1 |
|
deploy_to_mocked.reset_mock() |
268
|
1 |
|
current_handle_link_down = evc.handle_link_down() |
269
|
|
|
|
270
|
1 |
|
assert get_paths_mocked.call_count == 0 |
271
|
1 |
|
assert deploy_mocked.call_count == 0 |
272
|
1 |
|
assert deploy_to_mocked.call_count == 1 |
273
|
|
|
|
274
|
1 |
|
assert current_handle_link_down is False |
275
|
1 |
|
msg = f"Failed to re-deploy {evc} after link down." |
276
|
1 |
|
log_mocked.debug.assert_called_once_with(msg) |
277
|
|
|
|
278
|
1 |
View Code Duplication |
@patch("napps.kytos.mef_eline.models.evc.log") |
|
|
|
|
279
|
1 |
|
@patch("napps.kytos.mef_eline.models.evc.EVCDeploy.deploy_to_path") |
280
|
1 |
|
@patch("napps.kytos.mef_eline.models.evc.EVCDeploy._send_flow_mods") |
281
|
1 |
|
@patch(DEPLOY_TO_PRIMARY_PATH) |
282
|
1 |
|
@patch("napps.kytos.mef_eline.models.path.Path.status", EntityStatus.DOWN) |
283
|
1 |
|
async def test_handle_link_down_case_4( |
284
|
|
|
self, |
285
|
|
|
deploy_to_mocked, |
286
|
|
|
_send_flow_mods_mocked, |
287
|
|
|
deploy_mocked, |
288
|
|
|
log_mocked, |
289
|
|
|
): |
290
|
|
|
"""Test if circuit with dynamic path is return success.""" |
291
|
1 |
|
deploy_mocked.return_value = True |
292
|
1 |
|
deploy_to_mocked.return_value = False |
293
|
1 |
|
primary_path = [ |
294
|
|
|
get_link_mocked( |
295
|
|
|
endpoint_a_port=9, |
296
|
|
|
endpoint_b_port=10, |
297
|
|
|
metadata={"s_vlan": 5}, |
298
|
|
|
status=EntityStatus.DOWN, |
299
|
|
|
), |
300
|
|
|
get_link_mocked( |
301
|
|
|
endpoint_a_port=11, |
302
|
|
|
endpoint_b_port=12, |
303
|
|
|
metadata={"s_vlan": 6}, |
304
|
|
|
status=EntityStatus.UP, |
305
|
|
|
), |
306
|
|
|
] |
307
|
1 |
|
backup_path = [ |
308
|
|
|
get_link_mocked( |
309
|
|
|
endpoint_a_port=9, |
310
|
|
|
endpoint_b_port=10, |
311
|
|
|
metadata={"s_vlan": 5}, |
312
|
|
|
status=EntityStatus.DOWN, |
313
|
|
|
), |
314
|
|
|
get_link_mocked( |
315
|
|
|
endpoint_a_port=13, |
316
|
|
|
endpoint_b_port=14, |
317
|
|
|
metadata={"s_vlan": 6}, |
318
|
|
|
status=EntityStatus.UP, |
319
|
|
|
), |
320
|
|
|
] |
321
|
1 |
|
attributes = { |
322
|
|
|
"controller": get_controller_mock(), |
323
|
|
|
"name": "circuit_8", |
324
|
|
|
"uni_a": get_uni_mocked(is_valid=True), |
325
|
|
|
"uni_z": get_uni_mocked(is_valid=True), |
326
|
|
|
"primary_path": primary_path, |
327
|
|
|
"backup_path": backup_path, |
328
|
|
|
"enabled": True, |
329
|
|
|
"dynamic_backup_path": True, |
330
|
|
|
} |
331
|
|
|
|
332
|
1 |
|
evc = EVC(**attributes) |
333
|
1 |
|
evc.current_path = evc.backup_path |
334
|
|
|
|
335
|
1 |
|
deploy_to_mocked.reset_mock() |
336
|
1 |
|
current_handle_link_down = evc.handle_link_down() |
337
|
1 |
|
assert deploy_to_mocked.call_count == 1 |
338
|
|
|
|
339
|
1 |
|
assert current_handle_link_down |
340
|
1 |
|
msg = f"{evc} deployed after link down." |
341
|
1 |
|
log_mocked.debug.assert_called_with(msg) |
342
|
|
|
|
343
|
1 |
|
@patch("napps.kytos.mef_eline.models.evc.EVCDeploy.deploy") |
344
|
1 |
|
async def test_handle_link_up_case_1( |
345
|
|
|
self, |
346
|
|
|
deploy_to_mocked, |
347
|
|
|
): |
348
|
|
|
"""Test if handle link up do nothing when is using primary path.""" |
349
|
1 |
|
deploy_to_mocked.return_value = True |
350
|
1 |
|
primary_path = [ |
351
|
|
|
get_link_mocked( |
352
|
|
|
endpoint_a_port=9, |
353
|
|
|
endpoint_b_port=10, |
354
|
|
|
metadata={"s_vlan": 5}, |
355
|
|
|
status=EntityStatus.UP, |
356
|
|
|
), |
357
|
|
|
get_link_mocked( |
358
|
|
|
endpoint_a_port=11, |
359
|
|
|
endpoint_b_port=12, |
360
|
|
|
metadata={"s_vlan": 6}, |
361
|
|
|
status=EntityStatus.UP, |
362
|
|
|
), |
363
|
|
|
] |
364
|
1 |
|
backup_path = [ |
365
|
|
|
get_link_mocked( |
366
|
|
|
endpoint_a_port=9, |
367
|
|
|
endpoint_b_port=14, |
368
|
|
|
metadata={"s_vlan": 5}, |
369
|
|
|
status=EntityStatus.UP, |
370
|
|
|
), |
371
|
|
|
get_link_mocked( |
372
|
|
|
endpoint_a_port=15, |
373
|
|
|
endpoint_b_port=12, |
374
|
|
|
metadata={"s_vlan": 6}, |
375
|
|
|
status=EntityStatus.UP, |
376
|
|
|
), |
377
|
|
|
] |
378
|
1 |
|
attributes = { |
379
|
|
|
"controller": get_controller_mock(), |
380
|
|
|
"name": "circuit_9", |
381
|
|
|
"uni_a": get_uni_mocked(is_valid=True), |
382
|
|
|
"uni_z": get_uni_mocked(is_valid=True), |
383
|
|
|
"primary_path": primary_path, |
384
|
|
|
"backup_path": backup_path, |
385
|
|
|
"enabled": True, |
386
|
|
|
"dynamic_backup_path": True, |
387
|
|
|
} |
388
|
|
|
|
389
|
1 |
|
evc = EVC(**attributes) |
390
|
1 |
|
evc.current_path = evc.primary_path |
391
|
1 |
|
deploy_to_mocked.reset_mock() |
392
|
1 |
|
current_handle_link_up = evc.handle_link_up(backup_path[0]) |
393
|
1 |
|
assert deploy_to_mocked.call_count == 0 |
394
|
1 |
|
assert current_handle_link_up |
395
|
|
|
|
396
|
1 |
View Code Duplication |
@patch("napps.kytos.mef_eline.models.evc.EVCDeploy.deploy") |
|
|
|
|
397
|
1 |
|
@patch("napps.kytos.mef_eline.models.evc.EVCDeploy.deploy_to_path") |
398
|
1 |
|
@patch("napps.kytos.mef_eline.models.path.Path.status", EntityStatus.UP) |
399
|
1 |
|
async def test_handle_link_up_case_2( |
400
|
|
|
self, |
401
|
|
|
deploy_to_path_mocked, |
402
|
|
|
deploy_mocked |
403
|
|
|
): |
404
|
|
|
"""Test if it is changing from backup_path to primary_path.""" |
405
|
1 |
|
deploy_mocked.return_value = True |
406
|
1 |
|
deploy_to_path_mocked.return_value = True |
407
|
1 |
|
primary_path = [ |
408
|
|
|
get_link_mocked( |
409
|
|
|
endpoint_a_port=9, |
410
|
|
|
endpoint_b_port=10, |
411
|
|
|
metadata={"s_vlan": 5}, |
412
|
|
|
status=EntityStatus.UP, |
413
|
|
|
), |
414
|
|
|
get_link_mocked( |
415
|
|
|
endpoint_a_port=11, |
416
|
|
|
endpoint_b_port=12, |
417
|
|
|
metadata={"s_vlan": 6}, |
418
|
|
|
status=EntityStatus.UP, |
419
|
|
|
), |
420
|
|
|
] |
421
|
1 |
|
backup_path = [ |
422
|
|
|
get_link_mocked( |
423
|
|
|
endpoint_a_port=9, |
424
|
|
|
endpoint_b_port=14, |
425
|
|
|
metadata={"s_vlan": 5}, |
426
|
|
|
status=EntityStatus.UP, |
427
|
|
|
), |
428
|
|
|
get_link_mocked( |
429
|
|
|
endpoint_a_port=15, |
430
|
|
|
endpoint_b_port=12, |
431
|
|
|
metadata={"s_vlan": 6}, |
432
|
|
|
status=EntityStatus.UP, |
433
|
|
|
), |
434
|
|
|
] |
435
|
1 |
|
attributes = { |
436
|
|
|
"controller": get_controller_mock(), |
437
|
|
|
"name": "circuit_10", |
438
|
|
|
"uni_a": get_uni_mocked(is_valid=True), |
439
|
|
|
"uni_z": get_uni_mocked(is_valid=True), |
440
|
|
|
"primary_path": primary_path, |
441
|
|
|
"backup_path": backup_path, |
442
|
|
|
"enabled": True, |
443
|
|
|
"dynamic_backup_path": True, |
444
|
|
|
} |
445
|
|
|
|
446
|
1 |
|
evc = EVC(**attributes) |
447
|
1 |
|
evc.current_path = evc.backup_path |
448
|
1 |
|
deploy_to_path_mocked.reset_mock() |
449
|
1 |
|
current_handle_link_up = evc.handle_link_up(primary_path[0]) |
450
|
1 |
|
assert deploy_mocked.call_count == 0 |
451
|
1 |
|
assert deploy_to_path_mocked.call_count == 1 |
452
|
1 |
|
deploy_to_path_mocked.assert_called_once_with(evc.primary_path, None) |
453
|
1 |
|
assert current_handle_link_up |
454
|
|
|
|
455
|
1 |
View Code Duplication |
@patch("napps.kytos.mef_eline.models.evc.EVCDeploy.deploy") |
|
|
|
|
456
|
1 |
|
@patch("napps.kytos.mef_eline.models.evc.EVCDeploy.deploy_to_path") |
457
|
1 |
|
@patch("napps.kytos.mef_eline.models.evc.EVC._install_flows") |
458
|
1 |
|
@patch("napps.kytos.mef_eline.models.path.Path.status", EntityStatus.UP) |
459
|
1 |
|
async def test_handle_link_up_case_3( |
460
|
|
|
self, |
461
|
|
|
_install_flows_mocked, |
462
|
|
|
deploy_to_path_mocked, |
463
|
|
|
deploy_mocked, |
464
|
|
|
): |
465
|
|
|
"""Test if it is deployed after the backup is up.""" |
466
|
1 |
|
deploy_mocked.return_value = True |
467
|
1 |
|
deploy_to_path_mocked.return_value = True |
468
|
1 |
|
primary_path = [ |
469
|
|
|
get_link_mocked( |
470
|
|
|
endpoint_a_port=9, |
471
|
|
|
endpoint_b_port=10, |
472
|
|
|
metadata={"s_vlan": 5}, |
473
|
|
|
status=EntityStatus.DOWN, |
474
|
|
|
), |
475
|
|
|
get_link_mocked( |
476
|
|
|
endpoint_a_port=11, |
477
|
|
|
endpoint_b_port=12, |
478
|
|
|
metadata={"s_vlan": 6}, |
479
|
|
|
status=EntityStatus.UP, |
480
|
|
|
), |
481
|
|
|
] |
482
|
1 |
|
backup_path = [ |
483
|
|
|
get_link_mocked( |
484
|
|
|
endpoint_a_port=9, |
485
|
|
|
endpoint_b_port=14, |
486
|
|
|
metadata={"s_vlan": 5}, |
487
|
|
|
status=EntityStatus.DOWN, |
488
|
|
|
), |
489
|
|
|
get_link_mocked( |
490
|
|
|
endpoint_a_port=15, |
491
|
|
|
endpoint_b_port=12, |
492
|
|
|
metadata={"s_vlan": 6}, |
493
|
|
|
status=EntityStatus.UP, |
494
|
|
|
), |
495
|
|
|
] |
496
|
1 |
|
attributes = { |
497
|
|
|
"controller": get_controller_mock(), |
498
|
|
|
"name": "circuit_11", |
499
|
|
|
"uni_a": get_uni_mocked(is_valid=True), |
500
|
|
|
"uni_z": get_uni_mocked(is_valid=True), |
501
|
|
|
"primary_path": primary_path, |
502
|
|
|
"backup_path": backup_path, |
503
|
|
|
"enabled": True, |
504
|
|
|
"dynamic_backup_path": True, |
505
|
|
|
} |
506
|
|
|
|
507
|
1 |
|
evc = EVC(**attributes) |
508
|
|
|
|
509
|
1 |
|
evc.current_path = Path([]) |
510
|
1 |
|
deploy_to_path_mocked.reset_mock() |
511
|
1 |
|
current_handle_link_up = evc.handle_link_up(backup_path[0]) |
512
|
|
|
|
513
|
1 |
|
assert deploy_mocked.call_count == 0 |
514
|
1 |
|
assert deploy_to_path_mocked.call_count == 1 |
515
|
1 |
|
deploy_to_path_mocked.assert_called_once_with(evc.backup_path, None) |
516
|
1 |
|
assert current_handle_link_up |
517
|
|
|
|
518
|
1 |
|
@patch("napps.kytos.mef_eline.models.evc.EVCDeploy.deploy_to_path") |
519
|
1 |
|
@patch("napps.kytos.mef_eline.models.evc.EVC._install_flows") |
520
|
1 |
|
@patch("napps.kytos.mef_eline.models.path.Path.status", EntityStatus.DOWN) |
521
|
1 |
|
async def test_handle_link_up_case_4(self, *args): |
522
|
|
|
"""Test if not path is found a dynamic path is used.""" |
523
|
1 |
|
( |
524
|
|
|
_install_flows_mocked, |
525
|
|
|
deploy_to_path_mocked, |
526
|
|
|
) = args |
527
|
|
|
|
528
|
1 |
|
deploy_to_path_mocked.return_value = True |
529
|
|
|
|
530
|
1 |
|
primary_path = [ |
531
|
|
|
get_link_mocked( |
532
|
|
|
endpoint_a_port=9, |
533
|
|
|
endpoint_b_port=10, |
534
|
|
|
metadata={"s_vlan": 5}, |
535
|
|
|
status=EntityStatus.UP, |
536
|
|
|
), |
537
|
|
|
get_link_mocked( |
538
|
|
|
endpoint_a_port=11, |
539
|
|
|
endpoint_b_port=12, |
540
|
|
|
metadata={"s_vlan": 6}, |
541
|
|
|
status=EntityStatus.DOWN, |
542
|
|
|
), |
543
|
|
|
] |
544
|
1 |
|
backup_path = [ |
545
|
|
|
get_link_mocked( |
546
|
|
|
endpoint_a_port=13, |
547
|
|
|
endpoint_b_port=14, |
548
|
|
|
metadata={"s_vlan": 5}, |
549
|
|
|
status=EntityStatus.DOWN, |
550
|
|
|
), |
551
|
|
|
get_link_mocked( |
552
|
|
|
endpoint_a_port=11, |
553
|
|
|
endpoint_b_port=12, |
554
|
|
|
metadata={"s_vlan": 6}, |
555
|
|
|
status=EntityStatus.DOWN, |
556
|
|
|
), |
557
|
|
|
] |
558
|
|
|
|
559
|
1 |
|
attributes = { |
560
|
|
|
"controller": get_controller_mock(), |
561
|
|
|
"name": "circuit_12", |
562
|
|
|
"uni_a": get_uni_mocked(is_valid=True), |
563
|
|
|
"uni_z": get_uni_mocked(is_valid=True), |
564
|
|
|
"primary_path": primary_path, |
565
|
|
|
"backup_path": backup_path, |
566
|
|
|
"enabled": True, |
567
|
|
|
"dynamic_backup_path": True, |
568
|
|
|
} |
569
|
|
|
|
570
|
1 |
|
evc = EVC(**attributes) |
571
|
1 |
|
evc.current_path = Path([]) |
572
|
|
|
|
573
|
1 |
|
deploy_to_path_mocked.reset_mock() |
574
|
1 |
|
current_handle_link_up = evc.handle_link_up(backup_path[0]) |
575
|
|
|
|
576
|
1 |
|
assert deploy_to_path_mocked.call_count == 1 |
577
|
1 |
|
deploy_to_path_mocked.assert_called_once_with(old_path_dict=None) |
578
|
1 |
|
assert current_handle_link_up |
579
|
|
|
|
580
|
1 |
|
async def test_handle_link_up_case_5(self): |
581
|
|
|
"""Test handle_link_up method.""" |
582
|
1 |
|
return_false_mock = MagicMock(return_value=False) |
583
|
1 |
|
self.evc.is_using_primary_path = return_false_mock |
584
|
1 |
|
self.evc.primary_path.is_affected_by_link = return_false_mock |
585
|
1 |
|
self.evc.is_using_backup_path = MagicMock(return_value=True) |
586
|
1 |
|
assert self.evc.handle_link_up(MagicMock()) |
587
|
|
|
|
588
|
|
|
# not possible to deploy this evc (it will not benefit from link up) |
589
|
1 |
|
self.evc.is_using_backup_path = return_false_mock |
590
|
1 |
|
self.evc.is_using_dynamic_path = return_false_mock |
591
|
1 |
|
self.evc.backup_path.is_affected_by_link = return_false_mock |
592
|
1 |
|
self.evc.dynamic_backup_path = True |
593
|
1 |
|
self.evc.deploy_to_path = return_false_mock |
594
|
1 |
|
assert not self.evc.handle_link_up(MagicMock()) |
595
|
|
|
|
596
|
|
|
# pylint: disable=too-many-statements |
597
|
1 |
|
async def test_handle_link_up_case_6(self): |
598
|
|
|
"""Test handle_link_up method.""" |
599
|
|
|
# not possible to deploy this evc (it will not benefit from link up) |
600
|
1 |
|
return_false_mock = MagicMock(return_value=False) |
601
|
1 |
|
return_true_mock = MagicMock(return_value=True) |
602
|
1 |
|
self.evc.is_using_primary_path = return_false_mock |
603
|
1 |
|
self.evc.primary_path.is_affected_by_link = return_false_mock |
604
|
1 |
|
self.evc.is_using_backup_path = return_false_mock |
605
|
1 |
|
self.evc.is_using_dynamic_path = return_false_mock |
606
|
1 |
|
self.evc.backup_path.is_affected_by_link = return_false_mock |
607
|
1 |
|
self.evc.dynamic_backup_path = True |
608
|
|
|
|
609
|
1 |
|
self.evc.deploy_to_primary_path = MagicMock(return_value=False) |
610
|
1 |
|
self.evc.deploy_to_backup_path = MagicMock(return_value=False) |
611
|
1 |
|
self.evc.deploy_to_path = MagicMock(return_value=False) |
612
|
|
|
|
613
|
1 |
|
assert not self.evc.handle_link_up(MagicMock()) |
614
|
1 |
|
assert self.evc.deploy_to_path.call_count == 1 |
615
|
|
|
|
616
|
1 |
|
self.evc.is_using_primary_path = return_true_mock |
617
|
1 |
|
assert self.evc.handle_link_up(MagicMock()) |
618
|
1 |
|
assert self.evc.deploy_to_path.call_count == 1 |
619
|
|
|
|
620
|
1 |
|
self.evc.is_using_primary_path = return_false_mock |
621
|
1 |
|
self.evc.is_intra_switch = return_true_mock |
622
|
1 |
|
assert self.evc.handle_link_up(MagicMock()) |
623
|
1 |
|
assert self.evc.deploy_to_path.call_count == 1 |
624
|
|
|
|
625
|
1 |
|
self.evc.is_using_primary_path = return_false_mock |
626
|
1 |
|
self.evc.is_intra_switch = return_false_mock |
627
|
1 |
|
self.evc.primary_path.is_affected_by_link = return_true_mock |
628
|
1 |
|
assert not self.evc.handle_link_up(MagicMock()) |
629
|
1 |
|
assert self.evc.deploy_to_primary_path.call_count == 1 |
630
|
1 |
|
assert self.evc.deploy_to_path.call_count == 2 |
631
|
|
|
|
632
|
1 |
|
self.evc.is_using_primary_path = return_false_mock |
633
|
1 |
|
self.evc.is_intra_switch = return_false_mock |
634
|
1 |
|
self.evc.primary_path.is_affected_by_link = return_true_mock |
635
|
1 |
|
self.evc.deploy_to_primary_path.return_value = True |
636
|
1 |
|
assert self.evc.handle_link_up(MagicMock()) |
637
|
1 |
|
assert self.evc.deploy_to_primary_path.call_count == 2 |
638
|
1 |
|
assert self.evc.deploy_to_path.call_count == 2 |
639
|
|
|
|
640
|
1 |
|
self.evc.is_using_primary_path = return_false_mock |
641
|
1 |
|
self.evc.is_intra_switch = return_false_mock |
642
|
1 |
|
self.evc.primary_path.is_affected_by_link = return_false_mock |
643
|
1 |
|
self.evc.deploy_to_primary_path.return_value = False |
644
|
1 |
|
self.evc.is_using_backup_path = return_true_mock |
645
|
1 |
|
assert self.evc.handle_link_up(MagicMock()) |
646
|
1 |
|
assert self.evc.deploy_to_primary_path.call_count == 2 |
647
|
1 |
|
assert self.evc.deploy_to_path.call_count == 2 |
648
|
|
|
|
649
|
1 |
|
self.evc.is_using_primary_path = return_false_mock |
650
|
1 |
|
self.evc.is_intra_switch = return_false_mock |
651
|
1 |
|
self.evc.primary_path.is_affected_by_link = return_false_mock |
652
|
1 |
|
self.evc.deploy_to_primary_path.return_value = False |
653
|
1 |
|
self.evc.is_using_backup_path = return_false_mock |
654
|
1 |
|
self.evc.is_using_dynamic_path = return_true_mock |
655
|
1 |
|
assert self.evc.handle_link_up(MagicMock()) |
656
|
1 |
|
assert self.evc.deploy_to_primary_path.call_count == 2 |
657
|
1 |
|
assert self.evc.deploy_to_path.call_count == 2 |
658
|
|
|
|
659
|
1 |
|
self.evc.is_using_primary_path = return_false_mock |
660
|
1 |
|
self.evc.is_intra_switch = return_false_mock |
661
|
1 |
|
self.evc.primary_path.is_affected_by_link = return_false_mock |
662
|
1 |
|
self.evc.deploy_to_primary_path.return_value = False |
663
|
1 |
|
self.evc.is_using_backup_path = return_false_mock |
664
|
1 |
|
self.evc.is_using_dynamic_path = return_false_mock |
665
|
1 |
|
self.evc.backup_path.is_affected_by_link = return_true_mock |
666
|
1 |
|
assert not self.evc.handle_link_up(MagicMock()) |
667
|
1 |
|
assert self.evc.deploy_to_primary_path.call_count == 2 |
668
|
1 |
|
assert self.evc.deploy_to_backup_path.call_count == 1 |
669
|
1 |
|
assert self.evc.deploy_to_path.call_count == 3 |
670
|
|
|
|
671
|
1 |
|
self.evc.is_using_primary_path = return_false_mock |
672
|
1 |
|
self.evc.is_intra_switch = return_false_mock |
673
|
1 |
|
self.evc.primary_path.is_affected_by_link = return_false_mock |
674
|
1 |
|
self.evc.deploy_to_primary_path.return_value = False |
675
|
1 |
|
self.evc.is_using_backup_path = return_false_mock |
676
|
1 |
|
self.evc.is_using_dynamic_path = return_false_mock |
677
|
1 |
|
self.evc.backup_path.is_affected_by_link = return_true_mock |
678
|
1 |
|
self.evc.deploy_to_backup_path.return_value = True |
679
|
1 |
|
assert self.evc.handle_link_up(MagicMock()) |
680
|
1 |
|
assert self.evc.deploy_to_primary_path.call_count == 2 |
681
|
1 |
|
assert self.evc.deploy_to_backup_path.call_count == 2 |
682
|
1 |
|
assert self.evc.deploy_to_path.call_count == 3 |
683
|
|
|
|
684
|
1 |
|
self.evc.is_using_primary_path = return_false_mock |
685
|
1 |
|
self.evc.is_intra_switch = return_false_mock |
686
|
1 |
|
self.evc.primary_path.is_affected_by_link = return_false_mock |
687
|
1 |
|
self.evc.deploy_to_primary_path.return_value = False |
688
|
1 |
|
self.evc.is_using_backup_path = return_false_mock |
689
|
1 |
|
self.evc.is_using_dynamic_path = return_false_mock |
690
|
1 |
|
self.evc.backup_path.is_affected_by_link = return_false_mock |
691
|
1 |
|
self.evc.deploy_to_backup_path.return_value = False |
692
|
1 |
|
self.evc.dynamic_backup_path = True |
693
|
1 |
|
assert not self.evc.handle_link_up(MagicMock()) |
694
|
1 |
|
assert self.evc.deploy_to_primary_path.call_count == 2 |
695
|
1 |
|
assert self.evc.deploy_to_backup_path.call_count == 2 |
696
|
1 |
|
assert self.evc.deploy_to_path.call_count == 4 |
697
|
|
|
|
698
|
1 |
|
self.evc.is_using_primary_path = return_false_mock |
699
|
1 |
|
self.evc.is_intra_switch = return_false_mock |
700
|
1 |
|
self.evc.primary_path.is_affected_by_link = return_false_mock |
701
|
1 |
|
self.evc.deploy_to_primary_path.return_value = False |
702
|
1 |
|
self.evc.is_using_backup_path = return_false_mock |
703
|
1 |
|
self.evc.is_using_dynamic_path = return_false_mock |
704
|
1 |
|
self.evc.backup_path.is_affected_by_link = return_false_mock |
705
|
1 |
|
self.evc.deploy_to_backup_path.return_value = False |
706
|
1 |
|
self.evc.dynamic_backup_path = True |
707
|
1 |
|
self.evc.deploy_to_path.return_value = True |
708
|
1 |
|
assert self.evc.handle_link_up(MagicMock()) |
709
|
1 |
|
assert self.evc.deploy_to_primary_path.call_count == 2 |
710
|
1 |
|
assert self.evc.deploy_to_backup_path.call_count == 2 |
711
|
1 |
|
assert self.evc.deploy_to_path.call_count == 5 |
712
|
|
|
|
713
|
1 |
|
self.evc.is_using_primary_path = return_false_mock |
714
|
1 |
|
self.evc.is_intra_switch = return_false_mock |
715
|
1 |
|
self.evc.primary_path.is_affected_by_link = return_false_mock |
716
|
1 |
|
self.evc.deploy_to_primary_path.return_value = False |
717
|
1 |
|
self.evc.is_using_backup_path = return_false_mock |
718
|
1 |
|
self.evc.is_using_dynamic_path = return_false_mock |
719
|
1 |
|
self.evc.backup_path.is_affected_by_link = return_false_mock |
720
|
1 |
|
self.evc.deploy_to_backup_path.return_value = False |
721
|
1 |
|
self.evc.dynamic_backup_path = False |
722
|
1 |
|
self.evc.deploy_to_path.return_value = False |
723
|
1 |
|
assert not self.evc.handle_link_up(MagicMock()) |
724
|
1 |
|
assert self.evc.deploy_to_primary_path.call_count == 2 |
725
|
1 |
|
assert self.evc.deploy_to_backup_path.call_count == 2 |
726
|
1 |
|
assert self.evc.deploy_to_path.call_count == 5 |
727
|
|
|
|
728
|
1 |
|
async def test_handle_link_up_case_7(self): |
729
|
|
|
"""Test handle_link_up method.""" |
730
|
1 |
|
return_false_mock = MagicMock(return_value=False) |
731
|
1 |
|
self.evc.is_using_primary_path = return_false_mock |
732
|
1 |
|
self.evc.primary_path.is_affected_by_link = return_false_mock |
733
|
1 |
|
self.evc.is_using_dynamic_path = return_false_mock |
734
|
1 |
|
self.evc.backup_path.is_affected_by_link = return_false_mock |
735
|
1 |
|
self.evc.dynamic_backup_path = True |
736
|
1 |
|
self.evc.activate() |
737
|
1 |
|
assert self.evc.is_active() |
738
|
1 |
|
self.evc.deploy_to_path = MagicMock(return_value=True) |
739
|
1 |
|
assert not self.evc.handle_link_up(MagicMock()) |
740
|
1 |
|
assert self.evc.deploy_to_path.call_count == 0 |
741
|
|
|
|
742
|
1 |
|
@patch(DEPLOY_TO_BACKUP_PATH) |
743
|
1 |
|
@patch(DEPLOY_TO_PRIMARY_PATH) |
744
|
1 |
|
async def test_handle_link_up_case_8( |
745
|
|
|
self, deploy_primary_mock, deploy_backup_mock |
746
|
|
|
): |
747
|
|
|
"""Test when UNI is UP and dinamic primary_path from |
748
|
|
|
EVC is UP as well.""" |
749
|
1 |
|
primary_path = [ |
750
|
|
|
get_link_mocked( |
751
|
|
|
endpoint_a_port=9, |
752
|
|
|
endpoint_b_port=10, |
753
|
|
|
metadata={"s_vlan": 5}, |
754
|
|
|
status=EntityStatus.UP, |
755
|
|
|
), |
756
|
|
|
get_link_mocked( |
757
|
|
|
endpoint_a_port=11, |
758
|
|
|
endpoint_b_port=12, |
759
|
|
|
metadata={"s_vlan": 6}, |
760
|
|
|
status=EntityStatus.UP, |
761
|
|
|
), |
762
|
|
|
] |
763
|
1 |
|
backup_path = [ |
764
|
|
|
get_link_mocked( |
765
|
|
|
endpoint_a_port=13, |
766
|
|
|
endpoint_b_port=14, |
767
|
|
|
metadata={"s_vlan": 5}, |
768
|
|
|
status=EntityStatus.UP, |
769
|
|
|
), |
770
|
|
|
get_link_mocked( |
771
|
|
|
endpoint_a_port=11, |
772
|
|
|
endpoint_b_port=12, |
773
|
|
|
metadata={"s_vlan": 6}, |
774
|
|
|
status=EntityStatus.DOWN, |
775
|
|
|
), |
776
|
|
|
] |
777
|
|
|
|
778
|
1 |
|
attributes = { |
779
|
|
|
"controller": get_controller_mock(), |
780
|
|
|
"name": "circuit", |
781
|
|
|
"uni_a": get_uni_mocked(is_valid=True), |
782
|
|
|
"uni_z": get_uni_mocked(is_valid=True), |
783
|
|
|
"primary_path": primary_path, |
784
|
|
|
"backup_path": backup_path, |
785
|
|
|
"enabled": True, |
786
|
|
|
"dynamic_backup_path": True, |
787
|
|
|
} |
788
|
|
|
|
789
|
1 |
|
evc = EVC(**attributes) |
790
|
1 |
|
evc.handle_link_up(interface=evc.uni_a.interface) |
791
|
1 |
|
assert deploy_primary_mock.call_count == 1 |
792
|
1 |
|
assert deploy_backup_mock.call_count == 0 |
793
|
|
|
|
794
|
1 |
|
evc.primary_path[0].status = EntityStatus.DOWN |
795
|
1 |
|
evc.backup_path[1].status = EntityStatus.UP |
796
|
1 |
|
evc.handle_link_up(interface=evc.uni_a.interface) |
797
|
1 |
|
assert deploy_primary_mock.call_count == 1 |
798
|
1 |
|
assert deploy_backup_mock.call_count == 1 |
799
|
|
|
|
800
|
1 |
|
async def test_are_unis_active(self): |
801
|
|
|
"""Test are_unis_active""" |
802
|
1 |
|
self.evc.uni_a.interface._enabled = True |
803
|
1 |
|
self.evc.uni_z.interface._enabled = True |
804
|
1 |
|
assert self.evc.are_unis_active() is True |
805
|
|
|
|
806
|
1 |
|
self.evc.uni_a.interface._active = False |
807
|
1 |
|
self.evc.uni_z.interface._active = False |
808
|
1 |
|
assert self.evc.are_unis_active() is False |
809
|
|
|
|
810
|
1 |
|
self.evc.uni_a.interface._enabled = False |
811
|
1 |
|
self.evc.uni_z.interface._enabled = False |
812
|
1 |
|
assert self.evc.are_unis_active() is False |
813
|
|
|
|
814
|
1 |
|
async def test_is_uni_interface_active(self): |
815
|
|
|
"""Test is_uni_interface_active""" |
816
|
1 |
|
interface_a = id_to_interface_mock('00:01:1') |
817
|
1 |
|
interface_a.status_reason = set() |
818
|
1 |
|
interface_z = id_to_interface_mock('00:03:1') |
819
|
1 |
|
interface_z.status_reason = set() |
820
|
|
|
|
821
|
1 |
|
interface_a.status = EntityStatus.UP |
822
|
1 |
|
interface_z.status = EntityStatus.UP |
823
|
1 |
|
actual = self.evc.is_uni_interface_active(interface_a, interface_z) |
824
|
1 |
|
interfaces = { |
825
|
|
|
'00:01:1': {"status": "UP", "status_reason": set()}, |
826
|
|
|
'00:03:1': {"status": "UP", "status_reason": set()}, |
827
|
|
|
} |
828
|
1 |
|
expected = (True, interfaces) |
829
|
1 |
|
assert actual == expected |
830
|
|
|
|
831
|
1 |
|
interface_a.status = EntityStatus.DOWN |
832
|
1 |
|
actual = self.evc.is_uni_interface_active(interface_a, interface_z) |
833
|
1 |
|
interfaces = { |
834
|
|
|
'00:01:1': {'status': 'DOWN', 'status_reason': set()} |
835
|
|
|
} |
836
|
1 |
|
expected = (False, interfaces) |
837
|
1 |
|
assert actual == expected |
838
|
|
|
|
839
|
1 |
|
interface_a.status = EntityStatus.UP |
840
|
1 |
|
interface_z.status = EntityStatus.DOWN |
841
|
1 |
|
actual = self.evc.is_uni_interface_active(interface_a, interface_z) |
842
|
1 |
|
interfaces = { |
843
|
|
|
'00:03:1': {'status': 'DOWN', 'status_reason': set()} |
844
|
|
|
} |
845
|
1 |
|
expected = (False, interfaces) |
846
|
1 |
|
assert actual == expected |
847
|
|
|
|
848
|
1 |
|
async def test_handle_interface_link(self, monkeypatch): |
849
|
|
|
""" |
850
|
|
|
Test Interface Link Up |
851
|
|
|
""" |
852
|
1 |
|
return_false_mock = MagicMock(return_value=False) |
853
|
1 |
|
return_true_mock = MagicMock(return_value=True) |
854
|
1 |
|
interface_a = self.evc.uni_a.interface |
855
|
1 |
|
interface_a.enable() |
856
|
1 |
|
interface_b = self.evc.uni_z.interface |
857
|
1 |
|
interface_b.enable() |
858
|
1 |
|
emit_mock = MagicMock() |
859
|
1 |
|
monkeypatch.setattr("napps.kytos.mef_eline.models.evc.emit_event", |
860
|
|
|
emit_mock) |
861
|
|
|
|
862
|
1 |
|
self.evc.try_to_activate = MagicMock() |
863
|
1 |
|
self.evc.deactivate = MagicMock() |
864
|
1 |
|
self.evc.sync = MagicMock() |
865
|
|
|
|
866
|
|
|
# Test do nothing |
867
|
1 |
|
self.evc.is_active = return_true_mock |
868
|
|
|
|
869
|
1 |
|
self.evc.handle_interface_link_up(interface_a) |
870
|
|
|
|
871
|
1 |
|
self.evc.try_to_activate.assert_not_called() |
872
|
1 |
|
self.evc.sync.assert_not_called() |
873
|
|
|
|
874
|
|
|
# Test deactivating |
875
|
1 |
|
interface_a.deactivate() |
876
|
|
|
|
877
|
1 |
|
assert emit_mock.call_count == 0 |
878
|
1 |
|
self.evc.handle_interface_link_down(interface_a) |
879
|
1 |
|
assert emit_mock.call_count == 1 |
880
|
|
|
|
881
|
1 |
|
self.evc.deactivate.assert_called_once() |
882
|
1 |
|
self.evc.sync.assert_called_once() |
883
|
|
|
|
884
|
|
|
# Test do nothing |
885
|
1 |
|
self.evc.is_active = return_false_mock |
886
|
|
|
|
887
|
1 |
|
self.evc.handle_interface_link_down(interface_a) |
888
|
|
|
|
889
|
1 |
|
self.evc.deactivate.assert_called_once() |
890
|
1 |
|
self.evc.sync.assert_called_once() |
891
|
|
|
|
892
|
|
|
# Test activating |
893
|
1 |
|
interface_a.activate() |
894
|
|
|
|
895
|
1 |
|
assert emit_mock.call_count == 1 |
896
|
1 |
|
self.evc.try_to_handle_uni_as_link_up = MagicMock() |
897
|
1 |
|
self.evc.try_to_handle_uni_as_link_up.return_value = False |
898
|
1 |
|
self.evc.handle_interface_link_up(interface_a) |
899
|
|
|
|
900
|
1 |
|
self.evc.try_to_activate.assert_called_once() |
901
|
1 |
|
assert self.evc.sync.call_count == 2 |
902
|
|
|
assert emit_mock.call_count == 2 |
903
|
|
|
|