Test Failed
Pull Request — master (#64)
by
unknown
02:39
created

build.models.Scheduler.__init__()   A

Complexity

Conditions 1

Size

Total Lines 4
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 3
CRAP Score 1

Importance

Changes 0
Metric Value
eloc 3
dl 0
loc 4
ccs 3
cts 3
cp 1
rs 10
c 0
b 0
f 0
cc 1
nop 1
crap 1
1
"""Models used by the maintenance NApp.
2
3
This module define models for the maintenance window itself and the
4
scheduler.
5
"""
6 1
from dataclasses import dataclass
7 1
from datetime import datetime
8 1
from enum import Enum
9
from typing import NewType, Optional
10 1
from uuid import uuid4
11 1
12 1
import pytz
13
from apscheduler.jobstores.base import JobLookupError
14 1
from apscheduler.schedulers.background import BackgroundScheduler
15 1
from apscheduler.schedulers.base import BaseScheduler
16 1
from pydantic import BaseModel, Field, root_validator, validator
17
18 1
from kytos.core import KytosEvent, log
19
from kytos.core.controller import Controller
20
21 1
TIME_FMT = "%Y-%m-%dT%H:%M:%S%z"
22
23
24 1
class Status(str, Enum):
25 1
    """Maintenance windows status."""
26 1
27
    PENDING = 'pending'
28
    RUNNING = 'running'
29 1
    FINISHED = 'finished'
30
31
32 1
MaintenanceID = NewType('MaintenanceID', str)
33
34
35
class MaintenanceWindow(BaseModel):
36
    """Class for structure of maintenance windows.
37
    """
38
    start: datetime
39
    end: datetime
40
    switches: list[str] = Field(default_factory=list)
41
    interfaces: list[str] = Field(default_factory=list)
42 1
    links: list[str] = Field(default_factory=list)
43 1
    id: MaintenanceID = Field(
44 1
        default_factory=lambda: MaintenanceID(uuid4().hex)
45
    )
46 1
    description: str = Field(default='')
47 1
    status: Status = Field(default=Status.PENDING)
48 1
    inserted_at: Optional[datetime] = Field(default=None)
49 1
    updated_at: Optional[datetime] = Field(default=None)
50 1
51 1
    @validator('start', 'end', pre=True)
52 1
    def convert_time(cls, time):
53 1
        """Convert time strings using TIME_FMT"""
54 1
        if isinstance(time, str):
55 1
            time = datetime.strptime(time, TIME_FMT)
56
        return time
57 1
58 1
    @validator('start')
59
    def check_start_in_past(cls, start_time):
60 1
        """Check if the start is set to occur before now."""
61
        if start_time < datetime.now(pytz.utc):
62 1
            raise ValueError('Start in the past not allowed')
63 1
        return start_time
64
65 1
    @validator('end')
66 1
    def check_end_before_start(cls, end_time, values):
67 1
        """Check if the end is set to occur before the start."""
68 1
        if 'start' in values and end_time <= values['start']:
69 1
            raise ValueError('End before start not allowed')
70
        return end_time
71 1
72
    @root_validator
73
    def check_items_empty(cls, values):
74 1
        """Check if no items are in the maintenance window."""
75
        no_items = all(
76 1
            map(
77
                lambda key: key not in values or len(values[key]) == 0,
78 1
                ['switches', 'links', 'interfaces']
79 1
            )
80 1
        )
81 1
        if no_items:
82 1
            raise ValueError('At least one item must be provided')
83 1
        return values
84 1
85 1
    def maintenance_event(self, operation, controller: Controller):
86 1
        """Create events to start/end a maintenance."""
87 1
        if self.switches:
88 1
            event = KytosEvent(
89 1
                name=f'kytos/maintenance.{operation}_switch',
90 1
                content={'switches': self.switches}
91
            )
92 1
            controller.buffers.app.put(event)
93 1
        if self.interfaces:
94
            event = KytosEvent(
95
                name=f'kytos/maintenance.{operation}_interface',
96
                content={'unis': self.interfaces}
97
            )
98
            controller.buffers.app.put(event)
99
        if self.links:
100
            event = KytosEvent(
101
                name=f'kytos/maintenance.{operation}_link',
102
                content={'links': self.links}
103
            )
104
            controller.buffers.app.put(event)
105
106
    def start_mw(self, controller: Controller):
107
        """Actions taken when a maintenance window starts."""
108
        self.maintenance_event('start', controller)
109
110 1
    def end_mw(self, controller: Controller):
111
        """Actions taken when a maintenance window finishes."""
112 1
        self.maintenance_event('end', controller)
113 1
114 1
    def __str__(self) -> str:
115 1
        return f"'{self.id}'<{self.start} to {self.end}>"
116 1
117 1
    class Config:
118 1
        json_encoders = {
119 1
            datetime: lambda v: v.strftime(TIME_FMT),
120 1
        }
121 1
122
123 1
class MaintenanceWindows(BaseModel):
124
    """List of Maintenance Windows for json conversion."""
125 1
    __root__: list[MaintenanceWindow]
126 1
127 1
    def __iter__(self):
128 1
        return iter(self.__root__)
129 1
130 1
    def __getitem__(self, item):
131 1
        return self.__root__[item]
132
133
    def __len__(self):
134 1
        return len(self.__root__)
135 1
136
    class Config:
137
        json_encoders = {
138
            datetime: lambda v: v.strftime(TIME_FMT),
139
        }
140 1
141 1
142
@dataclass
143
class MaintenanceStart:
144
    """
145
    Callable used for starting maintenance windows
146
    """
147
    maintenance_scheduler: 'Scheduler'
148
    mw_id: MaintenanceID
149
150 1
    def __call__(self):
151 1
        self.maintenance_scheduler.start_maintenance(self.mw_id)
152
153
154
@dataclass
155
class MaintenanceEnd:
156
    """
157
    Callable used for ending maintenance windows
158
    """
159
    maintenance_scheduler: 'Scheduler'
160
    mw_id: MaintenanceID
161
162
    def __call__(self):
163
        self.maintenance_scheduler.end_maintenance(self.mw_id)
164
165
166
class OverlapError(Exception):
167 1
    """
168 1
    Exception for when a Maintenance Windows execution
169
    period overlaps with one or more windows.
170 1
    """
171 1
    new_window: MaintenanceWindow
172
    interfering: MaintenanceWindows
173 1
174
    def __init__(
175 1
                self,
176 1
                new_window: MaintenanceWindow,
177 1
                interfering: MaintenanceWindows
178 1
            ):
179 1
        self.new_window = new_window
180 1
        self.interfering = interfering
181 1
182
    def __str__(self):
183 1
        return f"Maintenance Window {self.new_window} " +\
184 1
            "interferes with the following windows: " +\
185 1
            '[' +\
186
            ', '.join([
187 1
                f"{window}"
188 1
                for window in self.interfering
189 1
            ]) +\
190
            ']'
191 1
192
193 1
@dataclass
194
class Scheduler:
195 1
    """Scheduler for a maintenance window."""
196 1
    controller: Controller
197
    db: 'MaintenanceController'
198 1
    scheduler: BaseScheduler
199
200 1
    @classmethod
201 1
    def new_scheduler(cls, controller: Controller):
202
        """
203
        Creates a new scheduler from the given kytos controller
204 1
        """
205
        scheduler = BackgroundScheduler(timezone=pytz.utc)
206
        from napps.kytos.maintenance.controllers import MaintenanceController
207 1
        db = MaintenanceController()
208
        db.bootstrap_indexes()
209 1
        instance = cls(controller, db, scheduler)
210 1
        return instance
211
212 1
    def start(self):
213
        """
214 1
        Begin running the scheduler.
215
        """
216
        self.db.prepare_start()
217 1
218
        # Populate the scheduler with all pending tasks
219
        windows = self.db.get_windows()
220
        for window in windows:
221 1
            if window.status == Status.PENDING:
222
                window.start_mw(self.controller)
223 1
            self._schedule(window)
224 1
225 1
        # Start the scheduler
226 1
        self.scheduler.start()
227 1
228 1
    def shutdown(self):
229 1
        """
230 1
        Stop running the scheduler.
231
        """
232
        windows = self.db.get_windows()
233
234
        # Depopulate the scheduler
235
        for window in windows:
236
            self._unschedule(window)
237
238
        self.scheduler.remove_all_jobs()
239
        self.scheduler.shutdown()
240
241
    def start_maintenance(self, mw_id: MaintenanceID):
242
        """Begins executing the maintenance window
243
        """
244
        # Get Maintenance from DB and Update
245
        window = self.db.start_window(mw_id)
246
247
        # Activate Running
248
        window.start_mw(self.controller)
249
250
        # Schedule next task
251
        self._schedule(window)
252
253
    def end_maintenance(self, mw_id: MaintenanceID):
254
        """Ends execution of the maintenance window
255
        """
256
        # Get Maintenance from DB
257
        window = self.db.end_window(mw_id)
258
259
        # Set to Ending
260
        window.end_mw(self.controller)
261
262
    def end_maintenance_early(self, mw_id: MaintenanceID):
263
        """Ends execution of the maintenance window early
264
        """
265
        # Get Maintenance from DB
266
        window = self.db.end_window(mw_id)
267
268
        # Unschedule tasks
269
        self._unschedule(window)
270
271
    def add(self, window: MaintenanceWindow, force=False):
272
        """Add jobs to start and end a maintenance window."""
273
274
        if force is False:
275
            overlapping_windows = self.db.check_overlap(window)
276
            if overlapping_windows:
277
                raise OverlapError(window, overlapping_windows)
278
279
        # Add window to DB
280
        self.db.insert_window(window)
281
282
        # Schedule next task
283
        self._schedule(window)
284
285
    def update(self, window: MaintenanceWindow):
286
        """Update an existing Maintenance Window."""
287
288
        # Update window
289
        self.db.update_window(window)
290
291
        # Reschedule any pending tasks
292
        self._reschedule(window)
293
294
    def remove(self, mw_id: MaintenanceID):
295
        """Remove jobs that start and end a maintenance window."""
296
        # Get Maintenance from DB
297
        window = self.db.get_window(mw_id)
298
299
        # Remove from schedule
300
        self._unschedule(window)
301
302
        # Remove from DB
303
        self.db.remove_window(mw_id)
304
305
    def _schedule(self, window: MaintenanceWindow):
306
        log.info(f'Scheduling "{window.id}"')
307
        if window.status == Status.PENDING:
308
            self.scheduler.add_job(
309
                MaintenanceStart(self, window.id),
310
                'date',
311
                id=f'{window.id}-start',
312
                run_date=window.start
313
            )
314
            log.info(f'Scheduled "{window.id}" start at {window.start}')
315
        if window.status == Status.RUNNING:
316
            self.scheduler.add_job(
317
                MaintenanceEnd(self, window.id),
318
                'date',
319
                id=f'{window.id}-end',
320
                run_date=window.end
321
            )
322
            log.info(f'Scheduled "{window.id}" end at {window.end}')
323
324
    def _reschedule(self, window: MaintenanceWindow):
325
        log.info(f'Rescheduling "{window.id}"')
326
        try:
327
            self.scheduler.remove_job(
328
                f'{window.id}-start',
329
            )
330
            self.scheduler.add_job(
331
                MaintenanceStart(self, window.id),
332
                'date',
333
                id=f'{window.id}-start',
334
                run_date=window.start
335
            )
336
            log.info(f'Rescheduled "{window.id}" start to {window.start}')
337
        except JobLookupError:
338
            log.info(f'Could not reschedule "{window.id}" start, no start job')
339
        try:
340
            self.scheduler.remove_job(
341
                f'{window.id}-end',
342
            )
343
            self.scheduler.add_job(
344
                MaintenanceEnd(self, window.id),
345
                'date',
346
                id=f'{window.id}-end',
347
                run_date=window.end
348
            )
349
            log.info(f'Rescheduled "{window.id}" end to {window.end}')
350
        except JobLookupError:
351
            log.info(f'Could not reschedule "{window.id}" end, no end job')
352
353
    def _unschedule(self, window: MaintenanceWindow):
354
        """Remove maintenance events from scheduler.
355
        Does not update DB, due to being
356
        primarily for shutdown startup cases.
357
        """
358
        started = False
359
        ended = False
360
        try:
361
            self.scheduler.remove_job(f'{window.id}-start')
362
        except JobLookupError:
363
            started = True
364
            log.info(f'Job to start "{window.id}" already removed.')
365
        try:
366
            self.scheduler.remove_job(f'{window.id}-end')
367
        except JobLookupError:
368
            ended = True
369
            log.info(f'Job to end "{window.id}" already removed.')
370
        if started and not ended:
371
            window.end_mw(self.controller)
372
373
    def get_maintenance(self, mw_id: MaintenanceID) -> MaintenanceWindow:
374
        """Get a single maintenance by id"""
375
        return self.db.get_window(mw_id)
376
377
    def list_maintenances(self) -> MaintenanceWindows:
378
        """Returns a list of all maintenances"""
379
        return self.db.get_windows()
380