Test Failed
Pull Request — master (#64)
by
unknown
02:34
created

build.models.MaintenanceWindow.update()   B

Complexity

Conditions 8

Size

Total Lines 23
Code Lines 22

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 19
CRAP Score 8.1624

Importance

Changes 0
Metric Value
eloc 22
dl 0
loc 23
ccs 19
cts 22
cp 0.8636
rs 7.3333
c 0
b 0
f 0
cc 8
nop 2
crap 8.1624
1
"""Models used by the maintenance NApp.
2
3
This module define models for the maintenance window itself and the
4
scheduler.
5
"""
6 1
from collections import Counter
7 1
from dataclasses import dataclass
8 1
from datetime import datetime
9
from enum import Enum
10 1
from typing import NewType, Optional
11 1
from uuid import uuid4
12 1
13
import pytz
14 1
from apscheduler.jobstores.base import JobLookupError
15 1
from apscheduler.schedulers.background import BackgroundScheduler
16 1
from apscheduler.schedulers.base import BaseScheduler
17
from pydantic import BaseModel, Field, root_validator, validator
18 1
19
from kytos.core import KytosEvent, log
20
from kytos.core.common import EntityStatus
21 1
from kytos.core.controller import Controller
22
23
TIME_FMT = "%Y-%m-%dT%H:%M:%S%z"
24 1
25 1
26 1
class Status(str, Enum):
27
    """Maintenance windows status."""
28
29 1
    PENDING = 'pending'
30
    RUNNING = 'running'
31
    FINISHED = 'finished'
32 1
33
34
MaintenanceID = NewType('MaintenanceID', str)
35
36
37
class MaintenanceWindow(BaseModel):
38
    """Class for structure of maintenance windows.
39
    """
40
    start: datetime
41
    end: datetime
42 1
    switches: list[str] = Field(default_factory=list)
43 1
    interfaces: list[str] = Field(default_factory=list)
44 1
    links: list[str] = Field(default_factory=list)
45
    id: MaintenanceID = Field(
46 1
        default_factory=lambda: MaintenanceID(uuid4().hex)
47 1
    )
48 1
    description: str = Field(default='')
49 1
    status: Status = Field(default=Status.PENDING)
50 1
    inserted_at: Optional[datetime] = Field(default=None)
51 1
    updated_at: Optional[datetime] = Field(default=None)
52 1
53 1
    @validator('start', 'end', pre=True)
54 1
    def convert_time(cls, time):
55 1
        """Convert time strings using TIME_FMT"""
56
        if isinstance(time, str):
57 1
            time = datetime.strptime(time, TIME_FMT)
58 1
        return time
59
60 1
    @validator('start')
61
    def check_start_in_past(cls, start_time):
62 1
        """Check if the start is set to occur before now."""
63 1
        if start_time < datetime.now(pytz.utc):
64
            raise ValueError('Start in the past not allowed')
65 1
        return start_time
66 1
67 1
    @validator('end')
68 1
    def check_end_before_start(cls, end_time, values):
69 1
        """Check if the end is set to occur before the start."""
70
        if 'start' in values and end_time <= values['start']:
71 1
            raise ValueError('End before start not allowed')
72
        return end_time
73
74 1
    @root_validator
75
    def check_items_empty(cls, values):
76 1
        """Check if no items are in the maintenance window."""
77
        no_items = all(
78 1
            map(
79 1
                lambda key: key not in values or len(values[key]) == 0,
80 1
                ['switches', 'links', 'interfaces']
81 1
            )
82 1
        )
83 1
        if no_items:
84 1
            raise ValueError('At least one item must be provided')
85 1
        return values
86 1
87 1
    def __str__(self) -> str:
88 1
        return f"'{self.id}'<{self.start} to {self.end}>"
89 1
90 1
    class Config:
91
        json_encoders = {
92 1
            datetime: lambda v: v.strftime(TIME_FMT),
93 1
        }
94
95
96
class MaintenanceWindows(BaseModel):
97
    """List of Maintenance Windows for json conversion."""
98
    __root__: list[MaintenanceWindow]
99
100
    def __iter__(self):
101
        return iter(self.__root__)
102
103
    def __getitem__(self, item):
104
        return self.__root__[item]
105
106
    def __len__(self):
107
        return len(self.__root__)
108
109
    class Config:
110 1
        json_encoders = {
111
            datetime: lambda v: v.strftime(TIME_FMT),
112 1
        }
113 1
114 1
115 1
@dataclass
116 1
class MaintenanceStart:
117 1
    """
118 1
    Callable used for starting maintenance windows
119 1
    """
120 1
    maintenance_scheduler: 'Scheduler'
121 1
    mw_id: MaintenanceID
122
123 1
    def __call__(self):
124
        self.maintenance_scheduler.start_maintenance(self.mw_id)
125 1
126 1
127 1
@dataclass
128 1
class MaintenanceEnd:
129 1
    """
130 1
    Callable used for ending maintenance windows
131 1
    """
132
    maintenance_scheduler: 'Scheduler'
133
    mw_id: MaintenanceID
134 1
135 1
    def __call__(self):
136
        self.maintenance_scheduler.end_maintenance(self.mw_id)
137
138
139
class OverlapError(Exception):
140 1
    """
141 1
    Exception for when a Maintenance Windows execution
142
    period overlaps with one or more windows.
143
    """
144
    new_window: MaintenanceWindow
145
    interfering: MaintenanceWindows
146
147
    def __init__(
148
                self,
149
                new_window: MaintenanceWindow,
150 1
                interfering: MaintenanceWindows
151 1
            ):
152
        self.new_window = new_window
153
        self.interfering = interfering
154
155
    def __str__(self):
156
        return f"Maintenance Window {self.new_window} " +\
157
            "interferes with the following windows: " +\
158
            '[' +\
159
            ', '.join([
160
                f"{window}"
161
                for window in self.interfering
162
            ]) +\
163
            ']'
164
165
166
@dataclass
167 1
class MaintenanceDeployer:
168 1
    """Class for deploying maintenances"""
169
    controller: Controller
170 1
    maintenance_devices: Counter
171 1
172
    @classmethod
173 1
    def new_deployer(cls, controller: Controller):
174
        instance = cls(controller, Counter())
175 1
        return instance
176 1
177 1
    def _maintenance_event(self, window: MaintenanceWindow, operation: str):
178 1
        """Create events to start/end a maintenance."""
179 1
        if window.switches:
180 1
            event = KytosEvent(
181 1
                name=f'kytos/maintenance.{operation}_switch',
182
                content={'switches': window.switches}
183 1
            )
184 1
            self.controller.buffers.app.put(event)
185 1
        if window.interfaces:
186
            event = KytosEvent(
187 1
                name=f'kytos/maintenance.{operation}_interface',
188 1
                content={'unis': window.interfaces}
189 1
            )
190
            self.controller.buffers.app.put(event)
191 1
        if window.links:
192
            event = KytosEvent(
193 1
                name=f'kytos/maintenance.{operation}_link',
194
                content={'links': window.links}
195 1
            )
196 1
            self.controller.buffers.app.put(event)
197
198 1
    def start_mw(self, window: MaintenanceWindow):
199
        """Actions taken when a maintenance window starts."""
200 1
        items = [*window.switches, *window.links, *window.interfaces]
201 1
        self.maintenance_devices.update(items)
202
        self._maintenance_event(window, 'start')
203
204 1
    def end_mw(self, window: MaintenanceWindow):
205
        """Actions taken when a maintenance window finishes."""
206
        items = [*window.switches, *window.links, *window.interfaces]
207 1
        self.maintenance_devices.subtract(items)
208
        self._maintenance_event(window, 'end')
209 1
210 1
    def dev_in_maintenance(self, dev):
211
        if self.maintenance_devices[dev.id]:
212 1
            return EntityStatus.DOWN
213
        return EntityStatus.UP
214 1
215
@dataclass
216
class Scheduler:
217 1
    """Class for scheduling maintenance windows."""
218
    deployer: MaintenanceDeployer
219
    db: 'MaintenanceController'
220
    scheduler: BaseScheduler
221 1
222
    @classmethod
223 1
    def new_scheduler(cls, deployer: MaintenanceDeployer):
224 1
        """
225 1
        Creates a new scheduler from the given kytos controller
226 1
        """
227 1
        scheduler = BackgroundScheduler(timezone=pytz.utc)
228 1
        from napps.kytos.maintenance.controllers import MaintenanceController
229 1
        db = MaintenanceController()
230 1
        db.bootstrap_indexes()
231
        instance = cls(deployer, db, scheduler)
232
        return instance
233
234
    def start(self):
235
        """
236
        Begin running the scheduler.
237
        """
238
        self.db.prepare_start()
239
240
        # Populate the scheduler with all pending tasks
241
        windows = self.db.get_windows()
242
        for window in windows:
243
            if window.status == Status.RUNNING:
244
                self.deployer.start_mw(window)
245
            self._schedule(window)
246
247
        # Start the scheduler
248
        self.scheduler.start()
249
250
    def shutdown(self):
251
        """
252
        Stop running the scheduler.
253
        """
254
        windows = self.db.get_windows()
255
256
        # Depopulate the scheduler
257
        for window in windows:
258
            self._unschedule(window)
259
260
        self.scheduler.remove_all_jobs()
261
        self.scheduler.shutdown()
262
263
    def start_maintenance(self, mw_id: MaintenanceID):
264
        """Begins executing the maintenance window
265
        """
266
        # Get Maintenance from DB and Update
267
        window = self.db.start_window(mw_id)
268
269
        # Activate Running
270
        self.deployer.start_mw(window)
271
272
        # Schedule next task
273
        self._schedule(window)
274
275
    def end_maintenance(self, mw_id: MaintenanceID):
276
        """Ends execution of the maintenance window
277
        """
278
        # Get Maintenance from DB
279
        window = self.db.end_window(mw_id)
280
281
        # Set to Ending
282
        self.deployer.end_mw(window)
283
284
    def end_maintenance_early(self, mw_id: MaintenanceID):
285
        """Ends execution of the maintenance window early
286
        """
287
        # Get Maintenance from DB
288
        window = self.db.end_window(mw_id)
289
290
        # Unschedule tasks
291
        self._unschedule(window)
292
293
    def add(self, window: MaintenanceWindow, force=False):
294
        """Add jobs to start and end a maintenance window."""
295
296
        if force is False:
297
            overlapping_windows = self.db.check_overlap(window)
298
            if overlapping_windows:
299
                raise OverlapError(window, overlapping_windows)
300
301
        # Add window to DB
302
        self.db.insert_window(window)
303
304
        # Schedule next task
305
        self._schedule(window)
306
307
    def update(self, window: MaintenanceWindow):
308
        """Update an existing Maintenance Window."""
309
310
        # Update window
311
        self.db.update_window(window)
312
313
        # Reschedule any pending tasks
314
        self._reschedule(window)
315
316
    def remove(self, mw_id: MaintenanceID):
317
        """Remove jobs that start and end a maintenance window."""
318
        # Get Maintenance from DB
319
        window = self.db.get_window(mw_id)
320
321
        # Remove from schedule
322
        self._unschedule(window)
323
324
        # Remove from DB
325
        self.db.remove_window(mw_id)
326
327
    def _schedule(self, window: MaintenanceWindow):
328
        log.info(f'Scheduling "{window.id}"')
329
        if window.status == Status.PENDING:
330
            self.scheduler.add_job(
331
                MaintenanceStart(self, window.id),
332
                'date',
333
                id=f'{window.id}-start',
334
                run_date=window.start
335
            )
336
            log.info(f'Scheduled "{window.id}" start at {window.start}')
337
        if window.status == Status.RUNNING:
338
            self.scheduler.add_job(
339
                MaintenanceEnd(self, window.id),
340
                'date',
341
                id=f'{window.id}-end',
342
                run_date=window.end
343
            )
344
            log.info(f'Scheduled "{window.id}" end at {window.end}')
345
346
    def _reschedule(self, window: MaintenanceWindow):
347
        log.info(f'Rescheduling "{window.id}"')
348
        try:
349
            self.scheduler.remove_job(
350
                f'{window.id}-start',
351
            )
352
            self.scheduler.add_job(
353
                MaintenanceStart(self, window.id),
354
                'date',
355
                id=f'{window.id}-start',
356
                run_date=window.start
357
            )
358
            log.info(f'Rescheduled "{window.id}" start to {window.start}')
359
        except JobLookupError:
360
            log.info(f'Could not reschedule "{window.id}" start, no start job')
361
        try:
362
            self.scheduler.remove_job(
363
                f'{window.id}-end',
364
            )
365
            self.scheduler.add_job(
366
                MaintenanceEnd(self, window.id),
367
                'date',
368
                id=f'{window.id}-end',
369
                run_date=window.end
370
            )
371
            log.info(f'Rescheduled "{window.id}" end to {window.end}')
372
        except JobLookupError:
373
            log.info(f'Could not reschedule "{window.id}" end, no end job')
374
375
    def _unschedule(self, window: MaintenanceWindow):
376
        """Remove maintenance events from scheduler.
377
        Does not update DB, due to being
378
        primarily for shutdown startup cases.
379
        """
380
        started = False
381
        ended = False
382
        try:
383
            self.scheduler.remove_job(f'{window.id}-start')
384
        except JobLookupError:
385
            started = True
386
            log.info(f'Job to start "{window.id}" already removed.')
387
        try:
388
            self.scheduler.remove_job(f'{window.id}-end')
389
        except JobLookupError:
390
            ended = True
391
            log.info(f'Job to end "{window.id}" already removed.')
392
        if started and not ended:
393
            self.deployer.end_mw(window)
394
395
    def get_maintenance(self, mw_id: MaintenanceID) -> MaintenanceWindow:
396
        """Get a single maintenance by id"""
397
        return self.db.get_window(mw_id)
398
399
    def list_maintenances(self) -> MaintenanceWindows:
400
        """Returns a list of all maintenances"""
401
        return self.db.get_windows()
402