Test Failed
Pull Request — master (#64)
by
unknown
02:34
created

TestScheduler.test_shutdown()   A

Complexity

Conditions 2

Size

Total Lines 38
Code Lines 26

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 2
eloc 26
nop 1
dl 0
loc 38
rs 9.256
c 0
b 0
f 0
1
"""Tests for the models module."""
2
3
from unittest import TestCase
4
from unittest.mock import patch, MagicMock, call
5
6
from collections import Counter
7
8
from apscheduler.jobstores.base import JobLookupError
9
from apscheduler.triggers.date import DateTrigger
10
from datetime import datetime, timedelta
11
import pytz
12
from kytos.lib.helpers import get_controller_mock
13
from napps.kytos.maintenance.models import MaintenanceDeployer
14
from napps.kytos.maintenance.models import MaintenanceWindow as MW, Status, Scheduler
15
from napps.kytos.maintenance.models import MaintenanceStart, MaintenanceEnd
16
TIME_FMT = "%Y-%m-%dT%H:%M:%S%z"
17
18
19
class TestMW(TestCase):
20
    """Test of the MaintenanceWindow class."""
21
22
    # pylint: disable=protected-access
23
24
    def setUp(self):
25
        """Initialize before tests are executed."""
26
        self.controller = get_controller_mock()
27
        self.start = datetime.now(pytz.utc)
28
        self.start += timedelta(days=1)
29
        self.end = self.start + timedelta(hours=6)
30
        self.switches = [
31
            "01:23:45:67:89:ab:cd:ef"
32
        ]
33
        self.maintenance = MW(
34
            start=self.start,
35
            end=self.end,
36
            switches=self.switches
37
        )
38
39
    def test_as_dict(self):
40
        """Test as_dict method."""
41
        mw_dict = self.maintenance.dict()
42
        expected_dict = {
43
            'description': '',
44
            'start': self.start,
45
            'end': self.end,
46
            'id': self.maintenance.id,
47
            'switches': self.switches,
48
            'interfaces': [],
49
            'links': [],
50
            'status': Status.PENDING,
51
            'inserted_at': None,
52
            'updated_at': None,
53
        }
54
        self.assertEqual(mw_dict, expected_dict)
55
56
    def test_start_in_past(self):
57
        start = datetime.now(pytz.utc) - timedelta(days=1)
58
59
        self.assertRaises(ValueError, MW,
60
            start=start,
61
            end=self.end,
62
            switches=self.switches,
63
        )
64
65
    def test_end_before_start(self):
66
        end = datetime.now(pytz.utc) - timedelta(days=1)
67
68
        self.assertRaises(ValueError, MW,
69
            start=self.start,
70
            end=end,
71
            switches=self.switches,
72
        )
73
74
    def test_items_empty(self):
75
76
        self.assertRaises(ValueError, MW,
77
            start=self.start,
78
            end=self.end,
79
        )
80
81
class TestDeployer(TestCase):
82
    """Test of the MaintenanceDeployer class."""
83
    def setUp(self):
84
        self.controller = get_controller_mock()
85
        self.start = datetime.now(pytz.utc)
86
        self.start += timedelta(days=1)
87
        self.end = self.start + timedelta(hours=6)
88
        self.switches = [
89
            "01:23:45:67:89:ab:cd:ef"
90
        ]
91
        self.maintenance = MW(
92
            start=self.start,
93
            end=self.end,
94
            switches=self.switches
95
        )
96
97
        self.deployer = MaintenanceDeployer(self.controller, Counter())
98
99
    @patch('kytos.core.buffers.KytosEventBuffer.put')
100
    def test_start_mw_case_1(self, buffer_put_mock):
101
        """Test the method that starts a maintenance."""
102
        maintenance = self.maintenance.copy(
103
            update = {
104
                'switches': [
105
                    '01:23:45:67:89:ab:cd:ef',
106
                    '01:23:45:67:65:ab:cd:ef'
107
                ],
108
                'interfaces': [],
109
                'links': [],
110
            }
111
        )
112
        self.deployer.start_mw(maintenance)
113
        buffer_put_mock.assert_called_once()
114
115
    @patch('kytos.core.buffers.KytosEventBuffer.put')
116
    def test_start_mw_case_2(self, buffer_put_mock):
117
        """Test the method that starts a maintenance."""
118
        interface_id = "interface_1"
119
        maintenance = self.maintenance.copy(
120
            update = {
121
                'switches': [
122
                    '01:23:45:67:89:ab:cd:ef',
123
                    '01:23:45:67:65:ab:cd:ef'
124
                ],
125
                'interfaces': [interface_id],
126
                'links': [],
127
            }
128
        )
129
        self.deployer.start_mw(maintenance)
130
        self.assertEqual(buffer_put_mock.call_count, 2)
131
132
    @patch('kytos.core.buffers.KytosEventBuffer.put')
133
    def test_start_mw_case_3(self, buffer_put_mock):
134
        """Test the method that starts a maintenance."""
135
        interface_id = "interface_1"
136
        link1 = "link_1"
137
        link2 = "link_2"
138
        maintenance = self.maintenance.copy(
139
            update = {
140
                'switches': [
141
                ],
142
                'interfaces': [interface_id],
143
                'links': [link1, link2],
144
            }
145
        )
146
        self.deployer.start_mw(maintenance)
147
        self.assertEqual(buffer_put_mock.call_count, 2)
148
149
    @patch('kytos.core.buffers.KytosEventBuffer.put')
150
    def test_end_mw_case_1(self, buffer_put_mock):
151
        """Test the method that ends a maintenance."""
152
        maintenance = self.maintenance.copy(
153
            update = {
154
                'switches': [
155
                    '01:23:45:67:89:ab:cd:ef',
156
                    '01:23:45:67:65:ab:cd:ef'
157
                ],
158
                'interfaces': [],
159
                'links': [],
160
                'status': Status.RUNNING,
161
            }
162
        )
163
        self.deployer.end_mw(maintenance)
164
        buffer_put_mock.assert_called_once()
165
166
    @patch('kytos.core.buffers.KytosEventBuffer.put')
167
    def test_end_mw_case_2(self, buffer_put_mock):
168
        """Test the method that ends a maintenance."""
169
        interface_id = "interface_1"
170
        maintenance = self.maintenance.copy(
171
            update = {
172
                'switches': [
173
                    '01:23:45:67:89:ab:cd:ef',
174
                    '01:23:45:67:65:ab:cd:ef'
175
                ],
176
                'interfaces': [interface_id],
177
                'links': [],
178
                'status': Status.RUNNING,
179
            }
180
        )
181
        self.deployer.end_mw(maintenance)
182
        self.assertEqual(buffer_put_mock.call_count, 2)
183
184
    @patch('kytos.core.buffers.KytosEventBuffer.put')
185
    def test_end_mw_case_3(self, buffer_put_mock):
186
        """Test the method that ends a maintenance."""
187
        interface_id = "interface_1"
188
        link1 = "link_1"
189
        link2 = "link_2"
190
        maintenance = self.maintenance.copy(
191
            update = {
192
                'switches': [
193
                ],
194
                'interfaces': [interface_id],
195
                'links': [link1, link2],
196
                'status': Status.RUNNING,
197
            }
198
        )
199
        self.deployer.end_mw(maintenance)
200
        self.assertEqual(buffer_put_mock.call_count, 2)
201
202
class TestScheduler(TestCase):
203
    """Test of the Scheduler Class"""
204
    
205
    def setUp(self) -> None:
206
        self.maintenance_deployer = MagicMock()
207
        self.db = MagicMock()
208
        self.task_scheduler = MagicMock()
209
        
210
        self.now = datetime.now(pytz.utc)
211
212
        self.window = MW.construct(
213
            id = 'Test Window',
214
            description = '',
215
            start = self.now + timedelta(hours=1),
216
            end = self.now + timedelta(hours=2),
217
            status = 'pending',
218
            switches = [],
219
            interfaces = [],
220
            links = [],
221
            updated_at = self.now - timedelta(days=1),
222
            inserted_at = self.now - timedelta(days=1),
223
        )
224
225
        self.scheduler = Scheduler(self.maintenance_deployer, self.db, self.task_scheduler)
226
227
    def test_start(self):
228
229
        pending_window = self.window.copy(
230
            update={'id': 'pending window', 'status': 'pending'}
231
        )
232
        running_window = self.window.copy(
233
            update={'id': 'running window', 'status': 'running'}
234
        )
235
        finished_window = self.window.copy(
236
            update={'id': 'finished window', 'status': 'finished'}
237
        )
238
239
        expected_schedule_calls = [
240
            call(MaintenanceStart(self.scheduler, 'pending window'),
241
            'date', id='pending window-start',
242
            run_date = pending_window.start),
243
            call(MaintenanceEnd(self.scheduler, 'running window'),
244
            'date', id='running window-end',
245
            run_date = running_window.end),
246
        ]
247
248
        self.db.get_windows.return_value = [
249
            pending_window,
250
            running_window,
251
            finished_window,
252
        ]
253
        self.scheduler.start()
254
255
        resultant_schedule_calls = self.task_scheduler.add_job.call_args_list
256
        self.assertEqual(resultant_schedule_calls, expected_schedule_calls)
257
258
        self.maintenance_deployer.start_mw.assert_called_once_with(running_window)
259
260
    def test_shutdown(self):
261
        pending_window = self.window.copy(
262
            update={'id': 'pending window', 'status': 'pending'}
263
        )
264
        running_window = self.window.copy(
265
            update={'id': 'running window', 'status': 'running'}
266
        )
267
        finished_window = self.window.copy(
268
            update={'id': 'finished window', 'status': 'finished'}
269
        )
270
271
        self.db.get_windows.return_value = [
272
            pending_window,
273
            running_window,
274
            finished_window,
275
        ]
276
277
        remove_job_effects = {
278
            'pending window-start': False,
279
            'pending window-end': True,
280
            'running window-start': True,
281
            'running window-end': False,
282
            'finished window-start': True,
283
            'finished window-end': True,
284
        }
285
286
        def side_effect(job_id):
287
            effect = remove_job_effects[job_id]
288
            if effect:
289
                raise JobLookupError(job_id)
290
            else:
291
                return None
292
293
        self.task_scheduler.remove_job.side_effect = side_effect
294
295
        self.scheduler.shutdown()
296
297
        self.maintenance_deployer.end_mw.assert_called_once_with(running_window)
298
299
    def test_update(self):
300
        pending_window = self.window.copy(
301
            update={'id': 'pending window', 'status': 'pending'}
302
        )
303
        running_window = self.window.copy(
304
            update={'id': 'running window', 'status': 'running'}
305
        )
306
        finished_window = self.window.copy(
307
            update={'id': 'finished window', 'status': 'finished'}
308
        )
309
310
        modify_job_effects = {
311
            'pending window-start': (False, DateTrigger(pending_window.start)),
312
            'pending window-end': (True, DateTrigger(pending_window.end)),
313
            'running window-start': (True, DateTrigger(running_window.start)),
314
            'running window-end': (False, DateTrigger(running_window.end)),
315
            'finished window-start': (True, DateTrigger(finished_window.start)),
316
            'finished window-end': (True, DateTrigger(finished_window.end)),
317
        }
318
319
        def side_effect(job_id, trigger):
320
            throw, expected_trigger = modify_job_effects[job_id]
321
            self.assertEqual(trigger.run_date, expected_trigger.run_date)
322
            if throw:
323
                raise JobLookupError(job_id)
324
            else:
325
                return None
326
        
327
        self.task_scheduler.modify_job.side_effect = side_effect
328
329
        self.scheduler.update(pending_window)
330
        self.scheduler.update(running_window)
331
        self.scheduler.update(finished_window)
332
333
    def test_maintenance_start(self):
334
335
        pending_window = self.window.copy(
336
            update={'id': 'pending window', 'status': 'pending'}
337
        )
338
        next_window = self.window.copy(
339
            update={'id': 'pending window', 'status': 'running'}
340
        )
341
342
        self.db.start_window.return_value = next_window
343
        start = MaintenanceStart(self.scheduler, pending_window.id)
344
        start()
345
        self.maintenance_deployer.start_mw.assert_called_once_with(next_window)
346
347
        self.task_scheduler.add_job.assert_called_once_with(
348
            MaintenanceEnd(self.scheduler, pending_window.id),
349
            'date',
350
            id='pending window-end',
351
            run_date=pending_window.end
352
        )
353
354
    def test_maintenance_end(self):
355
356
        running_window = self.window.copy(
357
            update={'id': 'running window', 'status': 'running'}
358
        )
359
        next_window = self.window.copy(
360
            update={'id': 'running window', 'status': 'finished'}
361
        )
362
363
        self.db.end_window.return_value = next_window
364
        end = MaintenanceEnd(self.scheduler, running_window.id)
365
        end()
366
        self.maintenance_deployer.end_mw.assert_called_once_with(next_window)
367
368