Passed
Push — master ( 0d5b5f...2f9122 )
by
unknown
01:39 queued 12s
created

build.tests.unit.managers.test_scheduler   A

Complexity

Total Complexity 8

Size/Duplication

Total Lines 184
Duplicated Lines 0 %

Test Coverage

Coverage 92.86%

Importance

Changes 0
Metric Value
eloc 125
dl 0
loc 184
ccs 65
cts 70
cp 0.9286
rs 10
c 0
b 0
f 0
wmc 8

6 Methods

Rating   Name   Duplication   Size   Complexity  
A TestScheduler.test_update() 0 33 2
A TestScheduler.test_maintenance_end() 0 13 1
A TestScheduler.test_maintenance_start() 0 19 1
A TestScheduler.test_shutdown() 0 38 2
A TestScheduler.setUp() 0 21 1
A TestScheduler.test_start() 0 32 1
1
"""Tests for the scheduler module."""
2
3 1
from unittest import TestCase
4 1
from unittest.mock import  MagicMock, call
5
6 1
from apscheduler.jobstores.base import JobLookupError
7 1
from apscheduler.triggers.date import DateTrigger
8 1
from datetime import datetime, timedelta
9 1
import pytz
10
11
12 1
from napps.kytos.maintenance.models import MaintenanceWindow as MW
13 1
from napps.kytos.maintenance.managers.scheduler import (
14
    MaintenanceScheduler as Scheduler,
15
    MaintenanceStart,
16
    MaintenanceEnd,
17
)
18
19 1
class TestScheduler(TestCase):
20
    """Test of the Scheduler Class"""
21
    
22 1
    def setUp(self) -> None:
23 1
        self.maintenance_deployer = MagicMock()
24 1
        self.db_controller = MagicMock()
25 1
        self.task_scheduler = MagicMock()
26
        
27 1
        self.now = datetime.now(pytz.utc)
28
29 1
        self.window = MW.construct(
30
            id = 'Test Window',
31
            description = '',
32
            start = self.now + timedelta(hours=1),
33
            end = self.now + timedelta(hours=2),
34
            status = 'pending',
35
            switches = [],
36
            interfaces = [],
37
            links = [],
38
            updated_at = self.now - timedelta(days=1),
39
            inserted_at = self.now - timedelta(days=1),
40
        )
41
42 1
        self.scheduler = Scheduler(self.maintenance_deployer, self.db_controller, self.task_scheduler)
43
44 1
    def test_start(self):
45
46 1
        pending_window = self.window.copy(
47
            update={'id': 'pending window', 'status': 'pending'}
48
        )
49 1
        running_window = self.window.copy(
50
            update={'id': 'running window', 'status': 'running'}
51
        )
52 1
        finished_window = self.window.copy(
53
            update={'id': 'finished window', 'status': 'finished'}
54
        )
55
56 1
        expected_schedule_calls = [
57
            call(MaintenanceStart(self.scheduler, 'pending window'),
58
            'date', id='pending window-start',
59
            run_date = pending_window.start),
60
            call(MaintenanceEnd(self.scheduler, 'running window'),
61
            'date', id='running window-end',
62
            run_date = running_window.end),
63
        ]
64
65 1
        self.db_controller.get_unfinished_windows.return_value = [
66
            pending_window,
67
            running_window,
68
            finished_window,
69
        ]
70 1
        self.scheduler.start()
71
72 1
        resultant_schedule_calls = self.task_scheduler.add_job.call_args_list
73 1
        self.assertEqual(resultant_schedule_calls, expected_schedule_calls)
74
75 1
        self.maintenance_deployer.start_mw.assert_called_once_with(running_window)
76
77 1
    def test_shutdown(self):
78 1
        pending_window = self.window.copy(
79
            update={'id': 'pending window', 'status': 'pending'}
80
        )
81 1
        running_window = self.window.copy(
82
            update={'id': 'running window', 'status': 'running'}
83
        )
84 1
        finished_window = self.window.copy(
85
            update={'id': 'finished window', 'status': 'finished'}
86
        )
87
88 1
        self.db_controller.get_windows.return_value = [
89
            pending_window,
90
            running_window,
91
            finished_window,
92
        ]
93
94 1
        remove_job_effects = {
95
            'pending window-start': False,
96
            'pending window-end': True,
97
            'running window-start': True,
98
            'running window-end': False,
99
            'finished window-start': True,
100
            'finished window-end': True,
101
        }
102
103 1
        def side_effect(job_id):
104 1
            effect = remove_job_effects[job_id]
105 1
            if effect:
106 1
                raise JobLookupError(job_id)
107
            else:
108 1
                return None
109
110 1
        self.task_scheduler.remove_job.side_effect = side_effect
111
112 1
        self.scheduler.shutdown()
113
114 1
        self.maintenance_deployer.end_mw.assert_called_once_with(running_window)
115
116 1
    def test_update(self):
117 1
        pending_window = self.window.copy(
118
            update={'id': 'pending window', 'status': 'pending'}
119
        )
120 1
        running_window = self.window.copy(
121
            update={'id': 'running window', 'status': 'running'}
122
        )
123 1
        finished_window = self.window.copy(
124
            update={'id': 'finished window', 'status': 'finished'}
125
        )
126
127 1
        modify_job_effects = {
128
            'pending window-start': (False, DateTrigger(pending_window.start)),
129
            'pending window-end': (True, DateTrigger(pending_window.end)),
130
            'running window-start': (True, DateTrigger(running_window.start)),
131
            'running window-end': (False, DateTrigger(running_window.end)),
132
            'finished window-start': (True, DateTrigger(finished_window.start)),
133
            'finished window-end': (True, DateTrigger(finished_window.end)),
134
        }
135
136 1
        def side_effect(job_id, trigger):
137
            throw, expected_trigger = modify_job_effects[job_id]
138
            self.assertEqual(trigger.run_date, expected_trigger.run_date)
139
            if throw:
140
                raise JobLookupError(job_id)
141
            else:
142
                return None
143
        
144 1
        self.task_scheduler.modify_job.side_effect = side_effect
145
146 1
        self.scheduler.update(pending_window)
147 1
        self.scheduler.update(running_window)
148 1
        self.scheduler.update(finished_window)
149
150 1
    def test_maintenance_start(self):
151
152 1
        pending_window = self.window.copy(
153
            update={'id': 'pending window', 'status': 'pending'}
154
        )
155 1
        next_window = self.window.copy(
156
            update={'id': 'pending window', 'status': 'running'}
157
        )
158
159 1
        self.db_controller.start_window.return_value = next_window
160 1
        start = MaintenanceStart(self.scheduler, pending_window.id)
161 1
        start()
162 1
        self.maintenance_deployer.start_mw.assert_called_once_with(next_window)
163
164 1
        self.task_scheduler.add_job.assert_called_once_with(
165
            MaintenanceEnd(self.scheduler, pending_window.id),
166
            'date',
167
            id='pending window-end',
168
            run_date=pending_window.end
169
        )
170
171 1
    def test_maintenance_end(self):
172
173 1
        running_window = self.window.copy(
174
            update={'id': 'running window', 'status': 'running'}
175
        )
176 1
        next_window = self.window.copy(
177
            update={'id': 'running window', 'status': 'finished'}
178
        )
179
180 1
        self.db_controller.end_window.return_value = next_window
181 1
        end = MaintenanceEnd(self.scheduler, running_window.id)
182 1
        end()
183 1
        self.maintenance_deployer.end_mw.assert_called_once_with(next_window)
184
185