Test Failed
Pull Request — master (#78)
by
unknown
05:59
created

MaintenanceDeployer.link_status_reason_func()   A

Complexity

Conditions 2

Size

Total Lines 5
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 2.5

Importance

Changes 0
Metric Value
cc 2
eloc 4
nop 2
dl 0
loc 5
ccs 2
cts 4
cp 0.5
crap 2.5
rs 10
c 0
b 0
f 0
1
"""Models used by the maintenance NApp.
2
3
This module define models for the maintenance window itself and the
4
scheduler.
5
"""
6 1
from collections import Counter
7 1
from dataclasses import dataclass
8 1
from datetime import datetime
9 1
from enum import Enum
10 1
from itertools import chain
11 1
from typing import NewType, Optional
12
from uuid import uuid4
13 1
14 1
import pytz
15 1
from apscheduler.jobstores.base import JobLookupError
16 1
from apscheduler.schedulers.background import BackgroundScheduler
17
from apscheduler.schedulers.base import BaseScheduler
18 1
# pylint: disable=no-name-in-module
19
from pydantic import BaseModel, Field, root_validator, validator
20 1
21 1
# pylint: enable=no-name-in-module
22 1
from kytos.core import KytosEvent, log
23
from kytos.core.common import EntityStatus
24
from kytos.core.controller import Controller
25
from kytos.core.interface import Interface
26
from kytos.core.link import Link
27 1
from kytos.core.switch import Switch
28
29
TIME_FMT = "%Y-%m-%dT%H:%M:%S%z"
30 1
31
32
class Status(str, Enum):
33 1
    """Maintenance windows status."""
34 1
35 1
    PENDING = 'pending'
36
    RUNNING = 'running'
37
    FINISHED = 'finished'
38 1
39
40
MaintenanceID = NewType('MaintenanceID', str)
41 1
42
43
class MaintenanceWindow(BaseModel):
44 1
    """Class for structure of maintenance windows.
45 1
    """
46 1
    start: datetime
47 1
    end: datetime
48 1
    switches: list[str] = Field(default_factory=list)
49 1
    interfaces: list[str] = Field(default_factory=list)
50
    links: list[str] = Field(default_factory=list)
51
    id: MaintenanceID = Field(
52 1
        default_factory=lambda: MaintenanceID(uuid4().hex)
53 1
    )
54 1
    description: str = Field(default='')
55 1
    status: Status = Field(default=Status.PENDING)
56
    inserted_at: Optional[datetime] = Field(default=None)
57
    updated_at: Optional[datetime] = Field(default=None)
58
59 1
    # pylint: disable=no-self-argument
60 1
61
    @validator('start', 'end', pre=True)
62 1
    def convert_time(cls, time):
63 1
        """Convert time strings using TIME_FMT"""
64 1
        if isinstance(time, str):
65
            time = datetime.strptime(time, TIME_FMT)
66 1
        return time
67 1
68
    @validator('start')
69 1
    def check_start_in_past(cls, start_time):
70 1
        """Check if the start is set to occur before now."""
71 1
        if start_time < datetime.now(pytz.utc):
72
            raise ValueError('Start in the past not allowed')
73 1
        return start_time
74 1
75
    @validator('end')
76 1
    def check_end_before_start(cls, end_time, values):
77 1
        """Check if the end is set to occur before the start."""
78 1
        if 'start' in values and end_time <= values['start']:
79
            raise ValueError('End before start not allowed')
80 1
        return end_time
81 1
82
    @root_validator
83 1
    def check_items_empty(cls, values):
84
        """Check if no items are in the maintenance window."""
85
        no_items = all(
86
            map(
87
                lambda key: key not in values or len(values[key]) == 0,
88
                ['switches', 'links', 'interfaces']
89 1
            )
90 1
        )
91 1
        if no_items:
92
            raise ValueError('At least one item must be provided')
93
        return values
94
95 1
    # pylint: enable=no-self-argument
96
97
    def __str__(self) -> str:
98 1
        return f"'{self.id}'<{self.start} to {self.end}>"
99
100 1
    class Config:
101
        """Config for encoding MaintenanceWindow class"""
102
        json_encoders = {
103
            datetime: lambda v: v.strftime(TIME_FMT),
104
        }
105 1
106
107 1
class MaintenanceWindows(BaseModel):
108
    """List of Maintenance Windows for json conversion."""
109 1
    __root__: list[MaintenanceWindow]
110
111
    def __iter__(self):
112 1
        return iter(self.__root__)
113
114
    def __getitem__(self, item):
115 1
        return self.__root__[item]
116
117
    def __len__(self):
118 1
        return len(self.__root__)
119
120 1
    class Config:
121
        """Config for encoding MaintenanceWindows class"""
122
        json_encoders = {
123
            datetime: lambda v: v.strftime(TIME_FMT),
124
        }
125 1
126 1
127
@dataclass
128
class MaintenanceStart:
129
    """
130 1
    Callable used for starting maintenance windows
131 1
    """
132
    maintenance_scheduler: 'Scheduler'
133 1
    mw_id: MaintenanceID
134 1
135
    def __call__(self):
136
        self.maintenance_scheduler.start_maintenance(self.mw_id)
137 1
138 1
139
@dataclass
140
class MaintenanceEnd:
141
    """
142 1
    Callable used for ending maintenance windows
143 1
    """
144
    maintenance_scheduler: 'Scheduler'
145 1
    mw_id: MaintenanceID
146 1
147
    def __call__(self):
148
        self.maintenance_scheduler.end_maintenance(self.mw_id)
149 1
150
151
class OverlapError(Exception):
152
    """
153
    Exception for when a Maintenance Windows execution
154 1
    period overlaps with one or more windows.
155 1
    """
156
    new_window: MaintenanceWindow
157 1
    interfering: MaintenanceWindows
158
159
    def __init__(
160
                self,
161
                new_window: MaintenanceWindow,
162
                interfering: MaintenanceWindows
163
            ):
164
        self.new_window = new_window
165 1
        self.interfering = interfering
166
167
    def __str__(self):
168
        return f"Maintenance Window {self.new_window} " +\
169
            "interferes with the following windows: " +\
170
            '[' +\
171
            ', '.join([
172
                f"{window}"
173
                for window in self.interfering
174
            ]) +\
175
            ']'
176 1
177 1
178
@dataclass
179 1
class MaintenanceDeployer:
180 1
    """Class for deploying maintenances"""
181
    controller: Controller
182 1
    maintenance_switches: Counter
183 1
    maintenance_interfaces: Counter
184
    maintenance_links: Counter
185
186
    @classmethod
187 1
    def new_deployer(cls, controller: Controller):
188 1
        """
189
        Creates a new MaintenanceDeployer from the given Kytos Controller
190 1
        """
191
        instance = cls(controller, Counter(), Counter(), Counter())
192 1
        Switch.register_status_func(
193 1
            'maintenance_status',
194
            instance.switch_status_func
195
        )
196
        Switch.register_status_reason_func(
197 1
            'maintenance_status',
198 1
            instance.switch_status_reason_func
199 1
        )
200
        Interface.register_status_func(
201
            'maintenance_status',
202
            instance.interface_status_func
203 1
        )
204 1
        Interface.register_status_reason_func(
205 1
            'maintenance_status',
206
            instance.interface_status_reason_func
207
        )
208
        Link.register_status_func(
209 1
            'maintenance_status',
210
            instance.link_status_func
211 1
        )
212
        Link.register_status_reason_func(
213 1
            'maintenance_status',
214 1
            instance.link_status_reason_func
215 1
        )
216
217 1
        return instance
218
219 1
    def _maintenance_event(self, window_devices: dict, operation: str):
220 1
        """Create events to start/end a maintenance."""
221 1
        event = KytosEvent(
222
            f'topology.interruption.{operation}',
223 1
            content={
224
                'type': 'maintenance',
225
                **window_devices
226
            }
227
        )
228
        self.controller.buffers.app.put(event)
229
230 1
    def _get_affected_ids(
231 1
        self,
232
        window: MaintenanceWindow
233 1
    ) -> dict[str, list[str]]:
234 1
        explicit_switches = filter(
235 1
            lambda switch: switch is not None,
236
            map(
237 1
                self.controller.switches.get,
238 1
                window.switches
239
            )
240
        )
241
242
        tot_switches = list(explicit_switches)
243
244
        implicit_interfaces = chain.from_iterable(
245
            map(
246
                lambda switch: switch.interfaces.values(),
247
                tot_switches
248
            )
249
        )
250
251
        explicit_interfaces = filter(
252 1
            lambda interface: interface is not None,
253
            map(
254
                self.controller.get_interface_by_id,
255
                window.interfaces
256 1
            )
257
        )
258
259 1
        tot_interfaces = list(chain(implicit_interfaces, explicit_interfaces))
260 1
261 1
        implicit_links = filter(
262 1
            lambda link: link is not None,
263 1
            map(
264
                lambda interface: interface.link,
265
                tot_interfaces
266 1
            )
267
        )
268 1
269
        explicit_links = filter(
270
            lambda link: link is not None,
271
            map(
272 1
                self.controller.napps[('kytos', 'topology')].links.get,
273
                window.links
274
            )
275 1
        )
276 1
277
        tot_links = list(chain(implicit_links, explicit_links))
278 1
279 1
        affected_switch_ids = list(set(map(
280
            lambda switch: switch.id,
281 1
            filter(
282
                self.switch_not_in_maintenance,
283
                tot_switches
284
            )
285 1
        )))
286
287
        affected_interface_ids = list(set(map(
288 1
            lambda interface: interface.id,
289
            filter(
290
                self.interface_not_in_maintenance,
291 1
                tot_interfaces
292
            )
293 1
        )))
294
295
        affected_link_ids = list(set(map(
296
            lambda link: link.id,
297 1
            filter(
298
                self.link_not_in_maintenance,
299
                tot_links
300 1
            )
301
        )))
302 1
303
        return {
304
            'switches': affected_switch_ids,
305
            'interfaces': affected_interface_ids,
306
            'links': affected_link_ids,
307
        }
308
309
    def start_mw(self, window: MaintenanceWindow):
310
        """Actions taken when a maintenance window starts."""
311 1
        affected_ids = self._get_affected_ids(window)
312
313
        self.maintenance_switches.update(window.switches)
314
        self.maintenance_interfaces.update(window.interfaces)
315
        self.maintenance_links.update(window.links)
316
317
        self._maintenance_event(
318
            affected_ids,
319
            'start'
320
        )
321
322
    def end_mw(self, window: MaintenanceWindow):
323
        """Actions taken when a maintenance window finishes."""
324
325 1
        self.maintenance_switches.subtract(window.switches)
326
        self.maintenance_interfaces.subtract(window.interfaces)
327
        self.maintenance_links.subtract(window.links)
328
329 1
        affected_ids = self._get_affected_ids(window)
330
331
        self._maintenance_event(
332 1
            affected_ids,
333
            'end'
334 1
        )
335
336
    def switch_not_in_maintenance(self, dev: Switch):
337
        """Checks if a switch is not undergoing maintenance"""
338
        return not self.maintenance_switches[dev.id]
339
340
    def interface_not_in_maintenance(self, dev: Interface):
341
        """Checks if a interface is not undergoing maintenance"""
342
        return (
343
            not self.maintenance_interfaces[dev.id] and
344
            self.switch_not_in_maintenance(dev.switch)
345 1
        )
346 1
347 1
    def link_not_in_maintenance(self, dev: Link):
348 1
        """Checks if a link is not undergoing maintenance"""
349
        return (
350
            not self.maintenance_links[dev.id] and
351
            all(
352
                map(
353
                    self.interface_not_in_maintenance,
354 1
                    (dev.endpoint_a, dev.endpoint_b)
355 1
                )
356 1
            )
357
        )
358
359
    def switch_status_func(self, dev: Switch):
360
        """Checks if a given device is undergoing maintenance"""
361
        if self.switch_not_in_maintenance(dev):
362 1
            return EntityStatus.UP
363
        return EntityStatus.DOWN
364 1
365 1
    def switch_status_reason_func(self, dev: Switch):
366 1
        """Checks if a given device is undergoing maintenance"""
367 1
        if self.switch_not_in_maintenance(dev):
368
            return frozenset()
369
        return frozenset({'maintenance'})
370 1
371
    def interface_status_func(self, dev: Interface):
372
        """Checks if a given device is undergoing maintenance"""
373
        if self.interface_not_in_maintenance(dev):
374
            return EntityStatus.UP
375
        return EntityStatus.DOWN
376 1
377
    def interface_status_reason_func(self, dev: Interface):
378
        """Checks if a given device is undergoing maintenance"""
379 1
        if self.interface_not_in_maintenance(dev):
380 1
            return frozenset()
381
        return frozenset({'maintenance'})
382
383 1
    def link_status_func(self, dev: Link):
384
        """Checks if a given device is undergoing maintenance"""
385
        if self.link_not_in_maintenance(dev):
386
            return EntityStatus.UP
387
        return EntityStatus.DOWN
388
389 1
    def link_status_reason_func(self, dev: Link):
390
        """Checks if a given device is undergoing maintenance"""
391
        if self.link_not_in_maintenance(dev):
392
            return frozenset()
393 1
        return frozenset({'maintenance'})
394
395
396
@dataclass
397
class Scheduler:
398 1
    """Class for scheduling maintenance windows."""
399 1
    deployer: MaintenanceDeployer
400 1
    db_controller: 'MaintenanceController'
401 1
    scheduler: BaseScheduler
402 1
403 1
    @classmethod
404 1
    def new_scheduler(cls, deployer: MaintenanceDeployer):
405 1
        """
406 1
        Creates a new scheduler from the given MaintenanceDeployer
407 1
        """
408 1
        scheduler = BackgroundScheduler(timezone=pytz.utc)
409 1
        # pylint: disable=import-outside-toplevel
410 1
        from napps.kytos.maintenance.controllers import MaintenanceController
411 1
412
        # pylint: enable=import-outside-toplevel
413 1
        db_controller = MaintenanceController()
414
        db_controller.bootstrap_indexes()
415
        instance = cls(deployer, db_controller, scheduler)
416
        return instance
417 1
418
    def start(self):
419
        """
420
        Begin running the scheduler.
421
        """
422
        self.db_controller.prepare_start()
423
424
        # Populate the scheduler with all pending tasks
425
        windows = self.db_controller.get_windows()
426
        for window in windows:
427
            if window.status == Status.RUNNING:
428
                self.deployer.start_mw(window)
429
            self._schedule(window)
430
431
        # Start the scheduler
432
        self.scheduler.start()
433
434
    def shutdown(self):
435
        """
436
        Stop running the scheduler.
437
        """
438
        windows = self.db_controller.get_windows()
439
440
        # Depopulate the scheduler
441
        for window in windows:
442
            self._unschedule(window)
443
444
        self.scheduler.remove_all_jobs()
445
        self.scheduler.shutdown()
446
447
    def start_maintenance(self, mw_id: MaintenanceID):
448
        """Begins executing the maintenance window
449
        """
450
        # Get Maintenance from DB and Update
451
        window = self.db_controller.start_window(mw_id)
452
453
        # Activate Running
454
        self.deployer.start_mw(window)
455
456
        # Schedule next task
457
        self._schedule(window)
458
459
    def end_maintenance(self, mw_id: MaintenanceID):
460
        """Ends execution of the maintenance window
461
        """
462
        # Get Maintenance from DB
463
        window = self.db_controller.end_window(mw_id)
464
465
        # Set to Ending
466
        self.deployer.end_mw(window)
467
468
    def end_maintenance_early(self, mw_id: MaintenanceID):
469
        """Ends execution of the maintenance window early
470
        """
471
        # Get Maintenance from DB
472
        window = self.db_controller.end_window(mw_id)
473
474
        # Unschedule tasks
475
        self._unschedule(window)
476
477
    def add(self, window: MaintenanceWindow, force=False):
478
        """Add jobs to start and end a maintenance window."""
479
480
        if force is False:
481
            overlapping_windows = self.db_controller.check_overlap(window)
482
            if overlapping_windows:
483
                raise OverlapError(window, overlapping_windows)
484
485
        # Add window to DB
486
        self.db_controller.insert_window(window)
487
488
        # Schedule next task
489
        self._schedule(window)
490
491
    def update(self, window: MaintenanceWindow):
492
        """Update an existing Maintenance Window."""
493
494
        # Update window
495
        self.db_controller.update_window(window)
496
497
        # Reschedule any pending tasks
498
        self._reschedule(window)
499
500
    def remove(self, mw_id: MaintenanceID):
501
        """Remove jobs that start and end a maintenance window."""
502
        # Get Maintenance from DB
503
        window = self.db_controller.get_window(mw_id)
504
505
        # Remove from schedule
506
        self._unschedule(window)
507
508
        # Remove from DB
509
        self.db_controller.remove_window(mw_id)
510
511
    def _schedule(self, window: MaintenanceWindow):
512
        log.info(f'Scheduling "{window.id}"')
513
        if window.status == Status.PENDING:
514
            self.scheduler.add_job(
515
                MaintenanceStart(self, window.id),
516
                'date',
517
                id=f'{window.id}-start',
518
                run_date=window.start
519
            )
520
            log.info(f'Scheduled "{window.id}" start at {window.start}')
521
        if window.status == Status.RUNNING:
522
            self.scheduler.add_job(
523
                MaintenanceEnd(self, window.id),
524
                'date',
525
                id=f'{window.id}-end',
526
                run_date=window.end
527
            )
528
            log.info(f'Scheduled "{window.id}" end at {window.end}')
529
530
    def _reschedule(self, window: MaintenanceWindow):
531
        log.info(f'Rescheduling "{window.id}"')
532
        try:
533
            self.scheduler.remove_job(
534
                f'{window.id}-start',
535
            )
536
            self.scheduler.add_job(
537
                MaintenanceStart(self, window.id),
538
                'date',
539
                id=f'{window.id}-start',
540
                run_date=window.start
541
            )
542
            log.info(f'Rescheduled "{window.id}" start to {window.start}')
543
        except JobLookupError:
544
            log.info(f'Could not reschedule "{window.id}" start, no start job')
545
        try:
546
            self.scheduler.remove_job(
547
                f'{window.id}-end',
548
            )
549
            self.scheduler.add_job(
550
                MaintenanceEnd(self, window.id),
551
                'date',
552
                id=f'{window.id}-end',
553
                run_date=window.end
554
            )
555
            log.info(f'Rescheduled "{window.id}" end to {window.end}')
556
        except JobLookupError:
557
            log.info(f'Could not reschedule "{window.id}" end, no end job')
558
559
    def _unschedule(self, window: MaintenanceWindow):
560
        """Remove maintenance events from scheduler.
561
        Does not update DB, due to being
562
        primarily for shutdown startup cases.
563
        """
564
        started = False
565
        ended = False
566
        try:
567
            self.scheduler.remove_job(f'{window.id}-start')
568
        except JobLookupError:
569
            started = True
570
            log.info(f'Job to start "{window.id}" already removed.')
571
        try:
572
            self.scheduler.remove_job(f'{window.id}-end')
573
        except JobLookupError:
574
            ended = True
575
            log.info(f'Job to end "{window.id}" already removed.')
576
        if started and not ended:
577
            self.deployer.end_mw(window)
578
579
    def get_maintenance(self, mw_id: MaintenanceID) -> MaintenanceWindow:
580
        """Get a single maintenance by id"""
581
        return self.db_controller.get_window(mw_id)
582
583
    def list_maintenances(self) -> MaintenanceWindows:
584
        """Returns a list of all maintenances"""
585
        return self.db_controller.get_windows()
586