Passed
Pull Request — master (#64)
by
unknown
02:38
created

MaintenanceWindow.check_end_before_start()   A

Complexity

Conditions 3

Size

Total Lines 6
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 5
CRAP Score 3

Importance

Changes 0
Metric Value
cc 3
eloc 5
nop 3
dl 0
loc 6
ccs 5
cts 5
cp 1
crap 3
rs 10
c 0
b 0
f 0
1
"""Models used by the maintenance NApp.
2
3
This module define models for the maintenance window itself and the
4
scheduler.
5
"""
6 1
from collections import Counter
7 1
from dataclasses import dataclass
8 1
from datetime import datetime
9 1
from enum import Enum
10 1
from typing import NewType, Optional
11 1
from uuid import uuid4
12
13 1
import pytz
14 1
from apscheduler.jobstores.base import JobLookupError
15 1
from apscheduler.schedulers.background import BackgroundScheduler
16 1
from apscheduler.schedulers.base import BaseScheduler
17
# pylint: disable=no-name-in-module
18 1
from pydantic import BaseModel, Field, root_validator, validator
19
20 1
from kytos.core import KytosEvent, log
21 1
from kytos.core.common import EntityStatus
22 1
from kytos.core.controller import Controller
23
24
# pylint: enable=no-name-in-module
25
26
27 1
TIME_FMT = "%Y-%m-%dT%H:%M:%S%z"
28
29
30 1
class Status(str, Enum):
31
    """Maintenance windows status."""
32
33 1
    PENDING = 'pending'
34 1
    RUNNING = 'running'
35 1
    FINISHED = 'finished'
36
37
38 1
MaintenanceID = NewType('MaintenanceID', str)
39
40
41 1
class MaintenanceWindow(BaseModel):
42
    """Class for structure of maintenance windows.
43
    """
44 1
    start: datetime
45 1
    end: datetime
46 1
    switches: list[str] = Field(default_factory=list)
47 1
    interfaces: list[str] = Field(default_factory=list)
48 1
    links: list[str] = Field(default_factory=list)
49 1
    id: MaintenanceID = Field(
50
        default_factory=lambda: MaintenanceID(uuid4().hex)
51
    )
52 1
    description: str = Field(default='')
53 1
    status: Status = Field(default=Status.PENDING)
54 1
    inserted_at: Optional[datetime] = Field(default=None)
55 1
    updated_at: Optional[datetime] = Field(default=None)
56
57
    # pylint: disable=no-self-argument
58
59 1
    @validator('start', 'end', pre=True)
60 1
    def convert_time(cls, time):
61
        """Convert time strings using TIME_FMT"""
62 1
        if isinstance(time, str):
63 1
            time = datetime.strptime(time, TIME_FMT)
64 1
        return time
65
66 1
    @validator('start')
67 1
    def check_start_in_past(cls, start_time):
68
        """Check if the start is set to occur before now."""
69 1
        if start_time < datetime.now(pytz.utc):
70 1
            raise ValueError('Start in the past not allowed')
71 1
        return start_time
72
73 1
    @validator('end')
74 1
    def check_end_before_start(cls, end_time, values):
75
        """Check if the end is set to occur before the start."""
76 1
        if 'start' in values and end_time <= values['start']:
77 1
            raise ValueError('End before start not allowed')
78 1
        return end_time
79
80 1
    @root_validator
81 1
    def check_items_empty(cls, values):
82
        """Check if no items are in the maintenance window."""
83 1
        no_items = all(
84
            map(
85
                lambda key: key not in values or len(values[key]) == 0,
86
                ['switches', 'links', 'interfaces']
87
            )
88
        )
89 1
        if no_items:
90 1
            raise ValueError('At least one item must be provided')
91 1
        return values
92
93
    # pylint: enable=no-self-argument
94
95 1
    def __str__(self) -> str:
96
        return f"'{self.id}'<{self.start} to {self.end}>"
97
98 1
    class Config:
99
        """Config for encoding MaintenanceWindow class"""
100 1
        json_encoders = {
101
            datetime: lambda v: v.strftime(TIME_FMT),
102
        }
103
104
105 1
class MaintenanceWindows(BaseModel):
106
    """List of Maintenance Windows for json conversion."""
107 1
    __root__: list[MaintenanceWindow]
108
109 1
    def __iter__(self):
110
        return iter(self.__root__)
111
112 1
    def __getitem__(self, item):
113
        return self.__root__[item]
114
115 1
    def __len__(self):
116
        return len(self.__root__)
117
118 1
    class Config:
119
        """Config for encoding MaintenanceWindows class"""
120 1
        json_encoders = {
121
            datetime: lambda v: v.strftime(TIME_FMT),
122
        }
123
124
125 1
@dataclass
126 1
class MaintenanceStart:
127
    """
128
    Callable used for starting maintenance windows
129
    """
130 1
    maintenance_scheduler: 'Scheduler'
131 1
    mw_id: MaintenanceID
132
133 1
    def __call__(self):
134 1
        self.maintenance_scheduler.start_maintenance(self.mw_id)
135
136
137 1
@dataclass
138 1
class MaintenanceEnd:
139
    """
140
    Callable used for ending maintenance windows
141
    """
142 1
    maintenance_scheduler: 'Scheduler'
143 1
    mw_id: MaintenanceID
144
145 1
    def __call__(self):
146 1
        self.maintenance_scheduler.end_maintenance(self.mw_id)
147
148
149 1
class OverlapError(Exception):
150
    """
151
    Exception for when a Maintenance Windows execution
152
    period overlaps with one or more windows.
153
    """
154 1
    new_window: MaintenanceWindow
155 1
    interfering: MaintenanceWindows
156
157 1
    def __init__(
158
                self,
159
                new_window: MaintenanceWindow,
160
                interfering: MaintenanceWindows
161
            ):
162
        self.new_window = new_window
163
        self.interfering = interfering
164
165 1
    def __str__(self):
166
        return f"Maintenance Window {self.new_window} " +\
167
            "interferes with the following windows: " +\
168
            '[' +\
169
            ', '.join([
170
                f"{window}"
171
                for window in self.interfering
172
            ]) +\
173
            ']'
174
175
176 1
@dataclass
177 1
class MaintenanceDeployer:
178
    """Class for deploying maintenances"""
179 1
    controller: Controller
180 1
    maintenance_devices: Counter
181
182 1
    @classmethod
183 1
    def new_deployer(cls, controller: Controller):
184
        """
185
        Creates a new MaintenanceDeployer from the given Kytos Controller
186
        """
187 1
        instance = cls(controller, Counter())
188 1
        return instance
189
190 1
    def _maintenance_event(self, window: MaintenanceWindow, operation: str):
191
        """Create events to start/end a maintenance."""
192 1
        if window.switches:
193 1
            event = KytosEvent(
194
                name=f'kytos/maintenance.{operation}_switch',
195
                content={'switches': window.switches}
196
            )
197 1
            self.controller.buffers.app.put(event)
198 1
        if window.interfaces:
199 1
            event = KytosEvent(
200
                name=f'kytos/maintenance.{operation}_interface',
201
                content={'unis': window.interfaces}
202
            )
203 1
            self.controller.buffers.app.put(event)
204 1
        if window.links:
205 1
            event = KytosEvent(
206
                name=f'kytos/maintenance.{operation}_link',
207
                content={'links': window.links}
208
            )
209 1
            self.controller.buffers.app.put(event)
210
211 1
    def start_mw(self, window: MaintenanceWindow):
212
        """Actions taken when a maintenance window starts."""
213 1
        items = [*window.switches, *window.links, *window.interfaces]
214 1
        self.maintenance_devices.update(items)
215 1
        self._maintenance_event(window, 'start')
216
217 1
    def end_mw(self, window: MaintenanceWindow):
218
        """Actions taken when a maintenance window finishes."""
219 1
        items = [*window.switches, *window.links, *window.interfaces]
220 1
        self.maintenance_devices.subtract(items)
221 1
        self._maintenance_event(window, 'end')
222
223 1
    def dev_in_maintenance(self, dev):
224
        """Checks if a given device is undergoing maintenance"""
225
        if self.maintenance_devices[dev.id]:
226
            return EntityStatus.DOWN
227
        return EntityStatus.UP
228
229
230 1
@dataclass
231 1
class Scheduler:
232
    """Class for scheduling maintenance windows."""
233 1
    deployer: MaintenanceDeployer
234 1
    db_controller: 'MaintenanceController'
235 1
    scheduler: BaseScheduler
236
237 1
    @classmethod
238 1
    def new_scheduler(cls, deployer: MaintenanceDeployer):
239
        """
240
        Creates a new scheduler from the given MaintenanceDeployer
241
        """
242
        scheduler = BackgroundScheduler(timezone=pytz.utc)
243
        # pylint: disable=import-outside-toplevel
244
        from napps.kytos.maintenance.controllers import MaintenanceController
245
246
        # pylint: enable=import-outside-toplevel
247
        db_controller = MaintenanceController()
248
        db_controller.bootstrap_indexes()
249
        instance = cls(deployer, db_controller, scheduler)
250
        return instance
251
252 1
    def start(self):
253
        """
254
        Begin running the scheduler.
255
        """
256 1
        self.db_controller.prepare_start()
257
258
        # Populate the scheduler with all pending tasks
259 1
        windows = self.db_controller.get_windows()
260 1
        for window in windows:
261 1
            if window.status == Status.RUNNING:
262 1
                self.deployer.start_mw(window)
263 1
            self._schedule(window)
264
265
        # Start the scheduler
266 1
        self.scheduler.start()
267
268 1
    def shutdown(self):
269
        """
270
        Stop running the scheduler.
271
        """
272 1
        windows = self.db_controller.get_windows()
273
274
        # Depopulate the scheduler
275 1
        for window in windows:
276 1
            self._unschedule(window)
277
278 1
        self.scheduler.remove_all_jobs()
279 1
        self.scheduler.shutdown()
280
281 1
    def start_maintenance(self, mw_id: MaintenanceID):
282
        """Begins executing the maintenance window
283
        """
284
        # Get Maintenance from DB and Update
285 1
        window = self.db_controller.start_window(mw_id)
286
287
        # Activate Running
288 1
        self.deployer.start_mw(window)
289
290
        # Schedule next task
291 1
        self._schedule(window)
292
293 1
    def end_maintenance(self, mw_id: MaintenanceID):
294
        """Ends execution of the maintenance window
295
        """
296
        # Get Maintenance from DB
297 1
        window = self.db_controller.end_window(mw_id)
298
299
        # Set to Ending
300 1
        self.deployer.end_mw(window)
301
302 1
    def end_maintenance_early(self, mw_id: MaintenanceID):
303
        """Ends execution of the maintenance window early
304
        """
305
        # Get Maintenance from DB
306
        window = self.db_controller.end_window(mw_id)
307
308
        # Unschedule tasks
309
        self._unschedule(window)
310
311 1
    def add(self, window: MaintenanceWindow, force=False):
312
        """Add jobs to start and end a maintenance window."""
313
314
        if force is False:
315
            overlapping_windows = self.db_controller.check_overlap(window)
316
            if overlapping_windows:
317
                raise OverlapError(window, overlapping_windows)
318
319
        # Add window to DB
320
        self.db_controller.insert_window(window)
321
322
        # Schedule next task
323
        self._schedule(window)
324
325 1
    def update(self, window: MaintenanceWindow):
326
        """Update an existing Maintenance Window."""
327
328
        # Update window
329 1
        self.db_controller.update_window(window)
330
331
        # Reschedule any pending tasks
332 1
        self._reschedule(window)
333
334 1
    def remove(self, mw_id: MaintenanceID):
335
        """Remove jobs that start and end a maintenance window."""
336
        # Get Maintenance from DB
337
        window = self.db_controller.get_window(mw_id)
338
339
        # Remove from schedule
340
        self._unschedule(window)
341
342
        # Remove from DB
343
        self.db_controller.remove_window(mw_id)
344
345 1
    def _schedule(self, window: MaintenanceWindow):
346 1
        log.info(f'Scheduling "{window.id}"')
347 1
        if window.status == Status.PENDING:
348 1
            self.scheduler.add_job(
349
                MaintenanceStart(self, window.id),
350
                'date',
351
                id=f'{window.id}-start',
352
                run_date=window.start
353
            )
354 1
            log.info(f'Scheduled "{window.id}" start at {window.start}')
355 1
        if window.status == Status.RUNNING:
356 1
            self.scheduler.add_job(
357
                MaintenanceEnd(self, window.id),
358
                'date',
359
                id=f'{window.id}-end',
360
                run_date=window.end
361
            )
362 1
            log.info(f'Scheduled "{window.id}" end at {window.end}')
363
364 1
    def _reschedule(self, window: MaintenanceWindow):
365 1
        log.info(f'Rescheduling "{window.id}"')
366 1
        try:
367 1
            self.scheduler.remove_job(
368
                f'{window.id}-start',
369
            )
370 1
            self.scheduler.add_job(
371
                MaintenanceStart(self, window.id),
372
                'date',
373
                id=f'{window.id}-start',
374
                run_date=window.start
375
            )
376 1
            log.info(f'Rescheduled "{window.id}" start to {window.start}')
377
        except JobLookupError:
378
            log.info(f'Could not reschedule "{window.id}" start, no start job')
379 1
        try:
380 1
            self.scheduler.remove_job(
381
                f'{window.id}-end',
382
            )
383 1
            self.scheduler.add_job(
384
                MaintenanceEnd(self, window.id),
385
                'date',
386
                id=f'{window.id}-end',
387
                run_date=window.end
388
            )
389 1
            log.info(f'Rescheduled "{window.id}" end to {window.end}')
390
        except JobLookupError:
391
            log.info(f'Could not reschedule "{window.id}" end, no end job')
392
393 1
    def _unschedule(self, window: MaintenanceWindow):
394
        """Remove maintenance events from scheduler.
395
        Does not update DB, due to being
396
        primarily for shutdown startup cases.
397
        """
398 1
        started = False
399 1
        ended = False
400 1
        try:
401 1
            self.scheduler.remove_job(f'{window.id}-start')
402 1
        except JobLookupError:
403 1
            started = True
404 1
            log.info(f'Job to start "{window.id}" already removed.')
405 1
        try:
406 1
            self.scheduler.remove_job(f'{window.id}-end')
407 1
        except JobLookupError:
408 1
            ended = True
409 1
            log.info(f'Job to end "{window.id}" already removed.')
410 1
        if started and not ended:
411 1
            self.deployer.end_mw(window)
412
413 1
    def get_maintenance(self, mw_id: MaintenanceID) -> MaintenanceWindow:
414
        """Get a single maintenance by id"""
415
        return self.db_controller.get_window(mw_id)
416
417 1
    def list_maintenances(self) -> MaintenanceWindows:
418
        """Returns a list of all maintenances"""
419
        return self.db_controller.get_windows()
420