Passed
Pull Request — master (#78)
by
unknown
05:53
created

build.models   F

Complexity

Total Complexity 74

Size/Duplication

Total Lines 586
Duplicated Lines 0 %

Test Coverage

Coverage 81.25%

Importance

Changes 0
Metric Value
eloc 353
dl 0
loc 586
ccs 208
cts 256
cp 0.8125
rs 2.48
c 0
b 0
f 0
wmc 74

40 Methods

Rating   Name   Duplication   Size   Complexity  
A MaintenanceDeployer.new_deployer() 0 32 1
A MaintenanceWindow.check_end_before_start() 0 6 3
A MaintenanceEnd.__call__() 0 2 1
A MaintenanceStart.__call__() 0 2 1
A Scheduler.get_maintenance() 0 3 1
A MaintenanceWindows.__len__() 0 2 1
A MaintenanceDeployer.link_not_in_maintenance() 0 8 1
A Scheduler.new_scheduler() 0 14 1
A Scheduler.list_maintenances() 0 3 1
A MaintenanceDeployer.switch_status_reason_func() 0 5 2
A Scheduler.shutdown() 0 12 2
A MaintenanceWindows.__iter__() 0 2 1
A Scheduler.start() 0 15 3
A Scheduler._reschedule() 0 28 3
A Scheduler._unschedule() 0 19 5
A MaintenanceWindows.__getitem__() 0 2 1
A MaintenanceDeployer.interface_status_func() 0 5 2
A MaintenanceDeployer._maintenance_event() 0 10 1
A Scheduler.add() 0 13 3
A MaintenanceWindow.check_items_empty() 0 12 3
A OverlapError.__str__() 0 9 1
A MaintenanceDeployer.link_status_reason_func() 0 5 2
A MaintenanceDeployer.switch_status_func() 0 5 2
C MaintenanceDeployer._get_affected_ids() 0 77 10
A OverlapError.__init__() 0 7 1
A Scheduler._schedule() 0 18 3
A MaintenanceWindow.__str__() 0 2 1
A MaintenanceDeployer.interface_not_in_maintenance() 0 5 1
A MaintenanceDeployer.link_status_func() 0 5 2
A MaintenanceWindow.check_start_in_past() 0 6 2
A Scheduler.start_maintenance() 0 11 1
A Scheduler.update() 0 8 1
A MaintenanceDeployer.end_mw() 0 12 1
A MaintenanceDeployer.start_mw() 0 11 1
A Scheduler.end_maintenance_early() 0 8 1
A Scheduler.remove() 0 10 1
A MaintenanceDeployer.switch_not_in_maintenance() 0 3 1
A MaintenanceDeployer.interface_status_reason_func() 0 5 2
A Scheduler.end_maintenance() 0 8 1
A MaintenanceWindow.convert_time() 0 6 2

How to fix   Complexity   

Complexity

Complex classes like build.models often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""Models used by the maintenance NApp.
2
3
This module define models for the maintenance window itself and the
4
scheduler.
5
"""
6 1
from collections import Counter
7 1
from dataclasses import dataclass
8 1
from datetime import datetime
9 1
from enum import Enum
10 1
from itertools import chain
11 1
from typing import NewType, Optional
12 1
from uuid import uuid4
13
14 1
import pytz
15 1
from apscheduler.jobstores.base import JobLookupError
16 1
from apscheduler.schedulers.background import BackgroundScheduler
17 1
from apscheduler.schedulers.base import BaseScheduler
18
# pylint: disable=no-name-in-module
19 1
from pydantic import BaseModel, Field, root_validator, validator
20
21
# pylint: enable=no-name-in-module
22 1
from kytos.core import KytosEvent, log
23 1
from kytos.core.common import EntityStatus
24 1
from kytos.core.controller import Controller
25 1
from kytos.core.interface import Interface
26 1
from kytos.core.link import Link
27 1
from kytos.core.switch import Switch
28
29 1
TIME_FMT = "%Y-%m-%dT%H:%M:%S%z"
30
31
32 1
class Status(str, Enum):
33
    """Maintenance windows status."""
34
35 1
    PENDING = 'pending'
36 1
    RUNNING = 'running'
37 1
    FINISHED = 'finished'
38
39
40 1
MaintenanceID = NewType('MaintenanceID', str)
41
42
43 1
class MaintenanceWindow(BaseModel):
44
    """Class for structure of maintenance windows.
45
    """
46 1
    start: datetime
47 1
    end: datetime
48 1
    switches: list[str] = Field(default_factory=list)
49 1
    interfaces: list[str] = Field(default_factory=list)
50 1
    links: list[str] = Field(default_factory=list)
51 1
    id: MaintenanceID = Field(
52
        default_factory=lambda: MaintenanceID(uuid4().hex)
53
    )
54 1
    description: str = Field(default='')
55 1
    status: Status = Field(default=Status.PENDING)
56 1
    inserted_at: Optional[datetime] = Field(default=None)
57 1
    updated_at: Optional[datetime] = Field(default=None)
58
59
    # pylint: disable=no-self-argument
60
61 1
    @validator('start', 'end', pre=True)
62 1
    def convert_time(cls, time):
63
        """Convert time strings using TIME_FMT"""
64 1
        if isinstance(time, str):
65 1
            time = datetime.strptime(time, TIME_FMT)
66 1
        return time
67
68 1
    @validator('start')
69 1
    def check_start_in_past(cls, start_time):
70
        """Check if the start is set to occur before now."""
71 1
        if start_time < datetime.now(pytz.utc):
72 1
            raise ValueError('Start in the past not allowed')
73 1
        return start_time
74
75 1
    @validator('end')
76 1
    def check_end_before_start(cls, end_time, values):
77
        """Check if the end is set to occur before the start."""
78 1
        if 'start' in values and end_time <= values['start']:
79 1
            raise ValueError('End before start not allowed')
80 1
        return end_time
81
82 1
    @root_validator
83 1
    def check_items_empty(cls, values):
84
        """Check if no items are in the maintenance window."""
85 1
        no_items = all(
86
            map(
87
                lambda key: key not in values or len(values[key]) == 0,
88
                ['switches', 'links', 'interfaces']
89
            )
90
        )
91 1
        if no_items:
92 1
            raise ValueError('At least one item must be provided')
93 1
        return values
94
95
    # pylint: enable=no-self-argument
96
97 1
    def __str__(self) -> str:
98
        return f"'{self.id}'<{self.start} to {self.end}>"
99
100 1
    class Config:
101
        """Config for encoding MaintenanceWindow class"""
102 1
        json_encoders = {
103
            datetime: lambda v: v.strftime(TIME_FMT),
104
        }
105
106
107 1
class MaintenanceWindows(BaseModel):
108
    """List of Maintenance Windows for json conversion."""
109 1
    __root__: list[MaintenanceWindow]
110
111 1
    def __iter__(self):
112
        return iter(self.__root__)
113
114 1
    def __getitem__(self, item):
115
        return self.__root__[item]
116
117 1
    def __len__(self):
118
        return len(self.__root__)
119
120 1
    class Config:
121
        """Config for encoding MaintenanceWindows class"""
122 1
        json_encoders = {
123
            datetime: lambda v: v.strftime(TIME_FMT),
124
        }
125
126
127 1
@dataclass
128 1
class MaintenanceStart:
129
    """
130
    Callable used for starting maintenance windows
131
    """
132 1
    maintenance_scheduler: 'Scheduler'
133 1
    mw_id: MaintenanceID
134
135 1
    def __call__(self):
136 1
        self.maintenance_scheduler.start_maintenance(self.mw_id)
137
138
139 1
@dataclass
140 1
class MaintenanceEnd:
141
    """
142
    Callable used for ending maintenance windows
143
    """
144 1
    maintenance_scheduler: 'Scheduler'
145 1
    mw_id: MaintenanceID
146
147 1
    def __call__(self):
148 1
        self.maintenance_scheduler.end_maintenance(self.mw_id)
149
150
151 1
class OverlapError(Exception):
152
    """
153
    Exception for when a Maintenance Windows execution
154
    period overlaps with one or more windows.
155
    """
156 1
    new_window: MaintenanceWindow
157 1
    interfering: MaintenanceWindows
158
159 1
    def __init__(
160
                self,
161
                new_window: MaintenanceWindow,
162
                interfering: MaintenanceWindows
163
            ):
164
        self.new_window = new_window
165
        self.interfering = interfering
166
167 1
    def __str__(self):
168
        return f"Maintenance Window {self.new_window} " +\
169
            "interferes with the following windows: " +\
170
            '[' +\
171
            ', '.join([
172
                f"{window}"
173
                for window in self.interfering
174
            ]) +\
175
            ']'
176
177
178 1
@dataclass
179 1
class MaintenanceDeployer:
180
    """Class for deploying maintenances"""
181 1
    controller: Controller
182 1
    maintenance_switches: Counter
183 1
    maintenance_interfaces: Counter
184 1
    maintenance_links: Counter
185
186 1
    @classmethod
187 1
    def new_deployer(cls, controller: Controller):
188
        """
189
        Creates a new MaintenanceDeployer from the given Kytos Controller
190
        """
191 1
        instance = cls(controller, Counter(), Counter(), Counter())
192 1
        Switch.register_status_func(
193
            'maintenance_status',
194
            instance.switch_status_func
195
        )
196 1
        Switch.register_status_reason_func(
197
            'maintenance_status',
198
            instance.switch_status_reason_func
199
        )
200 1
        Interface.register_status_func(
201
            'maintenance_status',
202
            instance.interface_status_func
203
        )
204 1
        Interface.register_status_reason_func(
205
            'maintenance_status',
206
            instance.interface_status_reason_func
207
        )
208 1
        Link.register_status_func(
209
            'maintenance_status',
210
            instance.link_status_func
211
        )
212 1
        Link.register_status_reason_func(
213
            'maintenance_status',
214
            instance.link_status_reason_func
215
        )
216
217 1
        return instance
218
219 1
    def _maintenance_event(self, window_devices: dict, operation: str):
220
        """Create events to start/end a maintenance."""
221 1
        event = KytosEvent(
222
            f'kytos/topology.interruption.{operation}',
223
            content={
224
                'type': 'maintenance',
225
                **window_devices
226
            }
227
        )
228 1
        self.controller.buffers.app.put(event)
229
230 1
    def _get_affected_ids(
231
        self,
232
        window: MaintenanceWindow
233
    ) -> dict[str, list[str]]:
234 1
        explicit_switches = filter(
235
            lambda switch: switch is not None,
236
            map(
237
                self.controller.switches.get,
238
                window.switches
239
            )
240
        )
241
242 1
        tot_switches = list(explicit_switches)
243
244 1
        implicit_interfaces = chain.from_iterable(
245
            map(
246
                lambda switch: switch.interfaces.values(),
247
                tot_switches
248
            )
249
        )
250
251 1
        explicit_interfaces = filter(
252
            lambda interface: interface is not None,
253
            map(
254
                self.controller.get_interface_by_id,
255
                window.interfaces
256
            )
257
        )
258
259 1
        tot_interfaces = list(chain(implicit_interfaces, explicit_interfaces))
260
261 1
        implicit_links = filter(
262
            lambda link: link is not None,
263
            map(
264
                lambda interface: interface.link,
265
                tot_interfaces
266
            )
267
        )
268
269 1
        explicit_links = filter(
270
            lambda link: link is not None,
271
            map(
272
                self.controller.napps[('kytos', 'topology')].links.get,
273
                window.links
274
            )
275
        )
276
277 1
        tot_links = list(chain(implicit_links, explicit_links))
278
279 1
        affected_switch_ids = list(set(map(
280
            lambda switch: switch.id,
281
            filter(
282
                self.switch_not_in_maintenance,
283
                tot_switches
284
            )
285
        )))
286
287 1
        affected_interface_ids = list(set(map(
288
            lambda interface: interface.id,
289
            filter(
290
                self.interface_not_in_maintenance,
291
                tot_interfaces
292
            )
293
        )))
294
295 1
        affected_link_ids = list(set(map(
296
            lambda link: link.id,
297
            filter(
298
                self.link_not_in_maintenance,
299
                tot_links
300
            )
301
        )))
302
303 1
        return {
304
            'switches': affected_switch_ids,
305
            'interfaces': affected_interface_ids,
306
            'links': affected_link_ids,
307
        }
308
309 1
    def start_mw(self, window: MaintenanceWindow):
310
        """Actions taken when a maintenance window starts."""
311 1
        affected_ids = self._get_affected_ids(window)
312
313 1
        self.maintenance_switches.update(window.switches)
314 1
        self.maintenance_interfaces.update(window.interfaces)
315 1
        self.maintenance_links.update(window.links)
316
317 1
        self._maintenance_event(
318
            affected_ids,
319
            'start'
320
        )
321
322 1
    def end_mw(self, window: MaintenanceWindow):
323
        """Actions taken when a maintenance window finishes."""
324
325 1
        self.maintenance_switches.subtract(window.switches)
326 1
        self.maintenance_interfaces.subtract(window.interfaces)
327 1
        self.maintenance_links.subtract(window.links)
328
329 1
        affected_ids = self._get_affected_ids(window)
330
331 1
        self._maintenance_event(
332
            affected_ids,
333
            'end'
334
        )
335
336 1
    def switch_not_in_maintenance(self, dev: Switch):
337
        """Checks if a switch is not undergoing maintenance"""
338 1
        return not self.maintenance_switches[dev.id]
339
340 1
    def interface_not_in_maintenance(self, dev: Interface):
341
        """Checks if a interface is not undergoing maintenance"""
342 1
        return (
343
            not self.maintenance_interfaces[dev.id] and
344
            self.switch_not_in_maintenance(dev.switch)
345
        )
346
347 1
    def link_not_in_maintenance(self, dev: Link):
348
        """Checks if a link is not undergoing maintenance"""
349 1
        return (
350
            not self.maintenance_links[dev.id] and
351
            all(
352
                map(
353
                    self.interface_not_in_maintenance,
354
                    (dev.endpoint_a, dev.endpoint_b)
355
                )
356
            )
357
        )
358
359 1
    def switch_status_func(self, dev: Switch):
360
        """Checks if a given device is undergoing maintenance"""
361
        if self.switch_not_in_maintenance(dev):
362
            return EntityStatus.UP
363
        return EntityStatus.DOWN
364
365 1
    def switch_status_reason_func(self, dev: Switch):
366
        """Checks if a given device is undergoing maintenance"""
367
        if self.switch_not_in_maintenance(dev):
368
            return frozenset()
369
        return frozenset({'maintenance'})
370
371 1
    def interface_status_func(self, dev: Interface):
372
        """Checks if a given device is undergoing maintenance"""
373
        if self.interface_not_in_maintenance(dev):
374
            return EntityStatus.UP
375
        return EntityStatus.DOWN
376
377 1
    def interface_status_reason_func(self, dev: Interface):
378
        """Checks if a given device is undergoing maintenance"""
379
        if self.interface_not_in_maintenance(dev):
380
            return frozenset()
381
        return frozenset({'maintenance'})
382
383 1
    def link_status_func(self, dev: Link):
384
        """Checks if a given device is undergoing maintenance"""
385
        if self.link_not_in_maintenance(dev):
386
            return EntityStatus.UP
387
        return EntityStatus.DOWN
388
389 1
    def link_status_reason_func(self, dev: Link):
390
        """Checks if a given device is undergoing maintenance"""
391
        if self.link_not_in_maintenance(dev):
392
            return frozenset()
393
        return frozenset({'maintenance'})
394
395
396 1
@dataclass
397 1
class Scheduler:
398
    """Class for scheduling maintenance windows."""
399 1
    deployer: MaintenanceDeployer
400 1
    db_controller: 'MaintenanceController'
401 1
    scheduler: BaseScheduler
402
403 1
    @classmethod
404 1
    def new_scheduler(cls, deployer: MaintenanceDeployer):
405
        """
406
        Creates a new scheduler from the given MaintenanceDeployer
407
        """
408
        scheduler = BackgroundScheduler(timezone=pytz.utc)
409
        # pylint: disable=import-outside-toplevel
410
        from napps.kytos.maintenance.controllers import MaintenanceController
411
412
        # pylint: enable=import-outside-toplevel
413
        db_controller = MaintenanceController()
414
        db_controller.bootstrap_indexes()
415
        instance = cls(deployer, db_controller, scheduler)
416
        return instance
417
418 1
    def start(self):
419
        """
420
        Begin running the scheduler.
421
        """
422 1
        self.db_controller.prepare_start()
423
424
        # Populate the scheduler with all pending tasks
425 1
        windows = self.db_controller.get_windows()
426 1
        for window in windows:
427 1
            if window.status == Status.RUNNING:
428 1
                self.deployer.start_mw(window)
429 1
            self._schedule(window)
430
431
        # Start the scheduler
432 1
        self.scheduler.start()
433
434 1
    def shutdown(self):
435
        """
436
        Stop running the scheduler.
437
        """
438 1
        windows = self.db_controller.get_windows()
439
440
        # Depopulate the scheduler
441 1
        for window in windows:
442 1
            self._unschedule(window)
443
444 1
        self.scheduler.remove_all_jobs()
445 1
        self.scheduler.shutdown()
446
447 1
    def start_maintenance(self, mw_id: MaintenanceID):
448
        """Begins executing the maintenance window
449
        """
450
        # Get Maintenance from DB and Update
451 1
        window = self.db_controller.start_window(mw_id)
452
453
        # Activate Running
454 1
        self.deployer.start_mw(window)
455
456
        # Schedule next task
457 1
        self._schedule(window)
458
459 1
    def end_maintenance(self, mw_id: MaintenanceID):
460
        """Ends execution of the maintenance window
461
        """
462
        # Get Maintenance from DB
463 1
        window = self.db_controller.end_window(mw_id)
464
465
        # Set to Ending
466 1
        self.deployer.end_mw(window)
467
468 1
    def end_maintenance_early(self, mw_id: MaintenanceID):
469
        """Ends execution of the maintenance window early
470
        """
471
        # Get Maintenance from DB
472
        window = self.db_controller.end_window(mw_id)
473
474
        # Unschedule tasks
475
        self._unschedule(window)
476
477 1
    def add(self, window: MaintenanceWindow, force=False):
478
        """Add jobs to start and end a maintenance window."""
479
480
        if force is False:
481
            overlapping_windows = self.db_controller.check_overlap(window)
482
            if overlapping_windows:
483
                raise OverlapError(window, overlapping_windows)
484
485
        # Add window to DB
486
        self.db_controller.insert_window(window)
487
488
        # Schedule next task
489
        self._schedule(window)
490
491 1
    def update(self, window: MaintenanceWindow):
492
        """Update an existing Maintenance Window."""
493
494
        # Update window
495 1
        self.db_controller.update_window(window)
496
497
        # Reschedule any pending tasks
498 1
        self._reschedule(window)
499
500 1
    def remove(self, mw_id: MaintenanceID):
501
        """Remove jobs that start and end a maintenance window."""
502
        # Get Maintenance from DB
503
        window = self.db_controller.get_window(mw_id)
504
505
        # Remove from schedule
506
        self._unschedule(window)
507
508
        # Remove from DB
509
        self.db_controller.remove_window(mw_id)
510
511 1
    def _schedule(self, window: MaintenanceWindow):
512 1
        log.info(f'Scheduling "{window.id}"')
513 1
        if window.status == Status.PENDING:
514 1
            self.scheduler.add_job(
515
                MaintenanceStart(self, window.id),
516
                'date',
517
                id=f'{window.id}-start',
518
                run_date=window.start
519
            )
520 1
            log.info(f'Scheduled "{window.id}" start at {window.start}')
521 1
        if window.status == Status.RUNNING:
522 1
            self.scheduler.add_job(
523
                MaintenanceEnd(self, window.id),
524
                'date',
525
                id=f'{window.id}-end',
526
                run_date=window.end
527
            )
528 1
            log.info(f'Scheduled "{window.id}" end at {window.end}')
529
530 1
    def _reschedule(self, window: MaintenanceWindow):
531 1
        log.info(f'Rescheduling "{window.id}"')
532 1
        try:
533 1
            self.scheduler.remove_job(
534
                f'{window.id}-start',
535
            )
536 1
            self.scheduler.add_job(
537
                MaintenanceStart(self, window.id),
538
                'date',
539
                id=f'{window.id}-start',
540
                run_date=window.start
541
            )
542 1
            log.info(f'Rescheduled "{window.id}" start to {window.start}')
543
        except JobLookupError:
544
            log.info(f'Could not reschedule "{window.id}" start, no start job')
545 1
        try:
546 1
            self.scheduler.remove_job(
547
                f'{window.id}-end',
548
            )
549 1
            self.scheduler.add_job(
550
                MaintenanceEnd(self, window.id),
551
                'date',
552
                id=f'{window.id}-end',
553
                run_date=window.end
554
            )
555 1
            log.info(f'Rescheduled "{window.id}" end to {window.end}')
556
        except JobLookupError:
557
            log.info(f'Could not reschedule "{window.id}" end, no end job')
558
559 1
    def _unschedule(self, window: MaintenanceWindow):
560
        """Remove maintenance events from scheduler.
561
        Does not update DB, due to being
562
        primarily for shutdown startup cases.
563
        """
564 1
        started = False
565 1
        ended = False
566 1
        try:
567 1
            self.scheduler.remove_job(f'{window.id}-start')
568 1
        except JobLookupError:
569 1
            started = True
570 1
            log.info(f'Job to start "{window.id}" already removed.')
571 1
        try:
572 1
            self.scheduler.remove_job(f'{window.id}-end')
573 1
        except JobLookupError:
574 1
            ended = True
575 1
            log.info(f'Job to end "{window.id}" already removed.')
576 1
        if started and not ended:
577 1
            self.deployer.end_mw(window)
578
579 1
    def get_maintenance(self, mw_id: MaintenanceID) -> MaintenanceWindow:
580
        """Get a single maintenance by id"""
581
        return self.db_controller.get_window(mw_id)
582
583 1
    def list_maintenances(self) -> MaintenanceWindows:
584
        """Returns a list of all maintenances"""
585
        return self.db_controller.get_windows()
586