build.tests.unit.managers.test_scheduler   A
last analyzed

Complexity

Total Complexity 8

Size/Duplication

Total Lines 183
Duplicated Lines 0 %

Test Coverage

Coverage 92.75%

Importance

Changes 0
Metric Value
eloc 124
dl 0
loc 183
ccs 64
cts 69
cp 0.9275
rs 10
c 0
b 0
f 0
wmc 8

6 Methods

Rating   Name   Duplication   Size   Complexity  
A TestScheduler.test_update() 0 33 2
A TestScheduler.test_maintenance_end() 0 13 1
A TestScheduler.test_maintenance_start() 0 19 1
A TestScheduler.test_shutdown() 0 38 2
A TestScheduler.setup_method() 0 21 1
A TestScheduler.test_start() 0 32 1
1
"""Tests for the scheduler module."""
2
3 1
from unittest.mock import  MagicMock, call
4
5 1
from apscheduler.jobstores.base import JobLookupError
6 1
from apscheduler.triggers.date import DateTrigger
7 1
from datetime import datetime, timedelta
8 1
import pytz
9
10
11 1
from napps.kytos.maintenance.models import MaintenanceWindow as MW
12 1
from napps.kytos.maintenance.managers.scheduler import (
13
    MaintenanceScheduler as Scheduler,
14
    MaintenanceStart,
15
    MaintenanceEnd,
16
)
17
18 1
class TestScheduler:
19
    """Test of the Scheduler Class"""
20
    
21 1
    def setup_method(self) -> None:
22 1
        self.maintenance_deployer = MagicMock()
23 1
        self.db_controller = MagicMock()
24 1
        self.task_scheduler = MagicMock()
25
        
26 1
        self.now = datetime.now(pytz.utc)
27
28 1
        self.window = MW.model_construct(
29
            id = 'Test Window',
30
            description = '',
31
            start = self.now + timedelta(hours=1),
32
            end = self.now + timedelta(hours=2),
33
            status = 'pending',
34
            switches = [],
35
            interfaces = [],
36
            links = [],
37
            updated_at = self.now - timedelta(days=1),
38
            inserted_at = self.now - timedelta(days=1),
39
        )
40
41 1
        self.scheduler = Scheduler(self.maintenance_deployer, self.db_controller, self.task_scheduler)
42
43 1
    def test_start(self):
44
45 1
        pending_window = self.window.copy(
46
            update={'id': 'pending window', 'status': 'pending'}
47
        )
48 1
        running_window = self.window.copy(
49
            update={'id': 'running window', 'status': 'running'}
50
        )
51 1
        finished_window = self.window.copy(
52
            update={'id': 'finished window', 'status': 'finished'}
53
        )
54
55 1
        expected_schedule_calls = [
56
            call(MaintenanceStart(self.scheduler, 'pending window'),
57
            'date', id='pending window-start',
58
            run_date = pending_window.start),
59
            call(MaintenanceEnd(self.scheduler, 'running window'),
60
            'date', id='running window-end',
61
            run_date = running_window.end),
62
        ]
63
64 1
        self.db_controller.get_unfinished_windows.return_value = [
65
            pending_window,
66
            running_window,
67
            finished_window,
68
        ]
69 1
        self.scheduler.start()
70
71 1
        resultant_schedule_calls = self.task_scheduler.add_job.call_args_list
72 1
        assert resultant_schedule_calls == expected_schedule_calls
73
74 1
        self.maintenance_deployer.start_mw.assert_called_once_with(running_window)
75
76 1
    def test_shutdown(self):
77 1
        pending_window = self.window.copy(
78
            update={'id': 'pending window', 'status': 'pending'}
79
        )
80 1
        running_window = self.window.copy(
81
            update={'id': 'running window', 'status': 'running'}
82
        )
83 1
        finished_window = self.window.copy(
84
            update={'id': 'finished window', 'status': 'finished'}
85
        )
86
87 1
        self.db_controller.get_windows.return_value = [
88
            pending_window,
89
            running_window,
90
            finished_window,
91
        ]
92
93 1
        remove_job_effects = {
94
            'pending window-start': False,
95
            'pending window-end': True,
96
            'running window-start': True,
97
            'running window-end': False,
98
            'finished window-start': True,
99
            'finished window-end': True,
100
        }
101
102 1
        def side_effect(job_id):
103 1
            effect = remove_job_effects[job_id]
104 1
            if effect:
105 1
                raise JobLookupError(job_id)
106
            else:
107 1
                return None
108
109 1
        self.task_scheduler.remove_job.side_effect = side_effect
110
111 1
        self.scheduler.shutdown()
112
113 1
        self.maintenance_deployer.end_mw.assert_called_once_with(running_window)
114
115 1
    def test_update(self):
116 1
        pending_window = self.window.copy(
117
            update={'id': 'pending window', 'status': 'pending'}
118
        )
119 1
        running_window = self.window.copy(
120
            update={'id': 'running window', 'status': 'running'}
121
        )
122 1
        finished_window = self.window.copy(
123
            update={'id': 'finished window', 'status': 'finished'}
124
        )
125
126 1
        modify_job_effects = {
127
            'pending window-start': (False, DateTrigger(pending_window.start)),
128
            'pending window-end': (True, DateTrigger(pending_window.end)),
129
            'running window-start': (True, DateTrigger(running_window.start)),
130
            'running window-end': (False, DateTrigger(running_window.end)),
131
            'finished window-start': (True, DateTrigger(finished_window.start)),
132
            'finished window-end': (True, DateTrigger(finished_window.end)),
133
        }
134
135 1
        def side_effect(job_id, trigger):
136
            throw, expected_trigger = modify_job_effects[job_id]
137
            assert trigger.run_date == expected_trigger.run_date
138
            if throw:
139
                raise JobLookupError(job_id)
140
            else:
141
                return None
142
        
143 1
        self.task_scheduler.modify_job.side_effect = side_effect
144
145 1
        self.scheduler.update(pending_window)
146 1
        self.scheduler.update(running_window)
147 1
        self.scheduler.update(finished_window)
148
149 1
    def test_maintenance_start(self):
150
151 1
        pending_window = self.window.copy(
152
            update={'id': 'pending window', 'status': 'pending'}
153
        )
154 1
        next_window = self.window.copy(
155
            update={'id': 'pending window', 'status': 'running'}
156
        )
157
158 1
        self.db_controller.start_window.return_value = next_window
159 1
        start = MaintenanceStart(self.scheduler, pending_window.id)
160 1
        start()
161 1
        self.maintenance_deployer.start_mw.assert_called_once_with(next_window)
162
163 1
        self.task_scheduler.add_job.assert_called_once_with(
164
            MaintenanceEnd(self.scheduler, pending_window.id),
165
            'date',
166
            id='pending window-end',
167
            run_date=pending_window.end
168
        )
169
170 1
    def test_maintenance_end(self):
171
172 1
        running_window = self.window.copy(
173
            update={'id': 'running window', 'status': 'running'}
174
        )
175 1
        next_window = self.window.copy(
176
            update={'id': 'running window', 'status': 'finished'}
177
        )
178
179 1
        self.db_controller.end_window.return_value = next_window
180 1
        end = MaintenanceEnd(self.scheduler, running_window.id)
181 1
        end()
182
        self.maintenance_deployer.end_mw.assert_called_once_with(next_window)
183