Test Failed
Pull Request — master (#64)
by
unknown
28:59 queued 26:30
created

build.models.MaintenanceWindow.maintenance_event()   A

Complexity

Conditions 4

Size

Total Lines 20
Code Lines 16

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 17
CRAP Score 4

Importance

Changes 0
Metric Value
cc 4
eloc 16
nop 3
dl 0
loc 20
rs 9.6
c 0
b 0
f 0
ccs 17
cts 17
cp 1
crap 4
1
"""Models used by the maintenance NApp.
2
3
This module define models for the maintenance window itself and the
4
scheduler.
5
"""
6 1
from dataclasses import dataclass
7 1
from datetime import datetime
8 1
from enum import Enum
9
from typing import NewType, Optional
10 1
from uuid import uuid4
11 1
12 1
import pytz
13
from apscheduler.jobstores.base import JobLookupError
14 1
from apscheduler.schedulers.background import BackgroundScheduler
15 1
from apscheduler.schedulers.base import BaseScheduler
16 1
from pydantic import BaseModel, Field, validator, root_validator
17
18 1
from kytos.core import KytosEvent, log
19
from kytos.core.controller import Controller
20
21 1
TIME_FMT = "%Y-%m-%dT%H:%M:%S%z"
22
23
class Status(str, Enum):
24 1
    """Maintenance windows status."""
25 1
26 1
    PENDING = 'pending'
27
    RUNNING = 'running'
28
    FINISHED = 'finished'
29 1
30
31
MaintenanceID = NewType('MaintenanceID', str)
32 1
33
34
class MaintenanceWindow(BaseModel):
35
    """Class for structure of maintenance windows.
36
    """
37
    start: datetime
38
    end: datetime
39
    switches: list[str] = Field(default_factory = list)
40
    interfaces: list[str] = Field(default_factory = list)
41
    links: list[str] = Field(default_factory = list)
42 1
    id: MaintenanceID = Field(
43 1
        default_factory = lambda: MaintenanceID(uuid4().hex)
44 1
    )
45
    description: str = Field(default = '')
46 1
    status: Status = Field(default=Status.PENDING)
47 1
    inserted_at: Optional[datetime] = Field(default = None)
48 1
    updated_at: Optional[datetime] = Field(default = None)
49 1
50 1
    @validator('start', 'end', pre = True)
51 1
    def convert_time(cls, time):
52 1
        if isinstance(time, str):
53 1
            time = datetime.strptime(time, TIME_FMT)
54 1
        return time
55 1
56
    @validator('start')
57 1
    def check_start_in_past(cls, start_time):
58 1
        if start_time < datetime.now(pytz.utc):
59
            raise ValueError('Start in the past not allowed')
60 1
        return start_time
61
62 1
    @validator('end')
63 1
    def check_end_before_start(cls, end_time, values):
64
        if 'start' in values and end_time <= values['start']:
65 1
            raise ValueError('End before start not allowed')
66 1
        return end_time
67 1
    
68 1
    @root_validator
69 1
    def check_items_empty(cls, values):
70
        if all(map(lambda key: len(values[key]) == 0, ['switches', 'links', 'interfaces'])):
71 1
            raise ValueError('At least one item must be provided')
72
        return values
73
74 1
    def maintenance_event(self, operation, controller: Controller):
75
        """Create events to start/end a maintenance."""
76 1
        if self.switches:
77
            event = KytosEvent(
78 1
                name=f'kytos/maintenance.{operation}_switch',
79 1
                content={'switches': self.switches}
80 1
            )
81 1
            controller.buffers.app.put(event)
82 1
        if self.interfaces:
83 1
            event = KytosEvent(
84 1
                name=f'kytos/maintenance.{operation}_interface',
85 1
                content={'unis': self.interfaces}
86 1
            )
87 1
            controller.buffers.app.put(event)
88 1
        if self.links:
89 1
            event = KytosEvent(
90 1
                name=f'kytos/maintenance.{operation}_link',
91
                content={'links': self.links}
92 1
            )
93 1
            controller.buffers.app.put(event)
94
95
    def start_mw(self, controller: Controller):
96
        """Actions taken when a maintenance window starts."""
97
        self.maintenance_event('start', controller)
98
99
    def end_mw(self, controller: Controller):
100
        """Actions taken when a maintenance window finishes."""
101
        self.maintenance_event('end', controller)
102
103
    class Config:
104
        json_encoders = {
105
            datetime: lambda v: v.strftime(TIME_FMT),
106
        }
107
108
class MaintenanceWindows(BaseModel):
109
    __root__: list[MaintenanceWindow]
110 1
111
    def __iter__(self):
112 1
        return iter(self.__root__)
113 1
114 1
    def __getitem__(self, item):
115 1
        return self.__root__[item]
116 1
117 1
    class Config:
118 1
        json_encoders = {
119 1
            datetime: lambda v: v.strftime(TIME_FMT),
120 1
        }
121 1
122
@dataclass
123 1
class MaintenanceStart:
124
    """
125 1
    Callable used for starting maintenance windows
126 1
    """
127 1
    maintenance_scheduler: 'Scheduler'
128 1
    mw_id: MaintenanceID
129 1
130 1
    def __call__(self):
131 1
        self.maintenance_scheduler.start_maintenance(self.mw_id)
132
133
134 1
@dataclass
135 1
class MaintenanceEnd:
136
    """
137
    Callable used for ending maintenance windows
138
    """
139
    maintenance_scheduler: 'Scheduler'
140 1
    mw_id: MaintenanceID
141 1
142
    def __call__(self):
143
        self.maintenance_scheduler.end_maintenance(self.mw_id)
144
145
146
@dataclass
147
class Scheduler:
148
    """Scheduler for a maintenance window."""
149
    controller: Controller
150 1
    db: 'MaintenanceController'
151 1
    scheduler: BaseScheduler
152
153
    @classmethod
154
    def new_scheduler(cls, controller: Controller):
155
        """
156
        Creates a new scheduler from the given kytos controller
157
        """
158
        scheduler = BackgroundScheduler(timezone=pytz.utc)
159
        from napps.kytos.maintenance.controllers import MaintenanceController
160
        db = MaintenanceController()
161
        db.bootstrap_indexes()
162
        instance = cls(controller, db, scheduler)
163
        return instance
164
165
    def start(self):
166
        """
167 1
        Begin running the scheduler.
168 1
        """
169
        # Populate the scheduler with all pending tasks
170 1
        windows = self.db.get_windows()
171 1
        for window in windows:
172
            self._schedule(window)
173 1
174
        # Start the scheduler
175 1
        self.scheduler.start()
176 1
177 1
    def shutdown(self):
178 1
        """
179 1
        Stop running the scheduler.
180 1
        """
181 1
        self.scheduler.remove_all_jobs()
182
        self.scheduler.shutdown()
183 1
        windows = self.db.get_windows()
184 1
185 1
        # Depopulate the scheduler
186
        for window in windows:
187 1
            self._unschedule(window)
188 1
189 1
    def start_maintenance(self, mw_id: MaintenanceID):
190
        """Begins executing the maintenance window
191 1
        """
192
        # Get Maintenance from DB and Update
193 1
        window = self.db.start_window(mw_id)
194
195 1
        # Activate Running
196 1
        window.start_mw(self.controller)
197
198 1
        # Schedule next task
199
        self._schedule(window)
200 1
201 1
    def end_maintenance(self, mw_id: MaintenanceID):
202
        """Ends execution of the maintenance window
203
        """
204 1
        # Get Maintenance from DB
205
        window = self.db.end_window(mw_id)
206
207 1
        # Set to Ending
208
        window.end_mw(self.controller)
209 1
210 1
    def end_maintenance_early(self, mw_id: MaintenanceID):
211
        """Ends execution of the maintenance window early
212 1
        """
213
        # Get Maintenance from DB
214 1
        window = self.db.end_window(mw_id)
215
216
        # Unschedule tasks
217 1
        self._unschedule(window)
218
219
    def add(self, window: MaintenanceWindow):
220
        """Add jobs to start and end a maintenance window."""
221 1
222
        # Add window to DB
223 1
        self.db.insert_window(window)
224 1
225 1
        # Schedule next task
226 1
        self._schedule(window)
227 1
228 1
    def update(self, window: MaintenanceWindow):
229 1
        """Update an existing Maintenance Window."""
230 1
231
        # Update window
232
        self.db.update_window(window)
233
234
        # Reschedule any pending tasks
235
        self._reschedule(window)
236
237
    def remove(self, mw_id: MaintenanceID):
238
        """Remove jobs that start and end a maintenance window."""
239
        # Get Maintenance from DB
240
        window = self.db.get_window(mw_id)
241
242
        # Remove from schedule
243
        self._unschedule(window)
244
245
        # Remove from DB
246
        self.db.remove_window(mw_id)
247
248
    def _schedule(self, window: MaintenanceWindow):
249
        if window.status is Status.PENDING:
250
            self.scheduler.add_job(
251
                MaintenanceStart(self, window.id),
252
                'date',
253
                id=f'{window.id}-start',
254
                run_date=window.start
255
            )
256
        if window.status is Status.RUNNING:
257
            window.start_mw(self.controller)
258
            self.scheduler.add_job(
259
                MaintenanceEnd(self, window.id),
260
                'date',
261
                id=f'{window.id}-end',
262
                run_date=window.end
263
            )
264
265
    def _reschedule(self, window: MaintenanceWindow):
266
        try:
267
            self.scheduler.modify_job(
268
                f'{window.id}-start',
269
                run_date = window.start,
270
            )
271
        except JobLookupError:
272
            log.info(f'Could not reschedule start, already started')
273
        try:
274
            self.scheduler.modify_job(
275
                f'{window.id}-end',
276
                run_date = window.end,
277
            )
278
        except JobLookupError:
279
            log.info(f'Could not reschedule end, already ended')
280
281
    def _unschedule(self, window: MaintenanceWindow):
282
        """Remove maintenance events from scheduler.
283
        Does not update DB, due to being
284
        primarily for shutdown startup cases.
285
        """
286
        started = False
287
        ended = False
288
        try:
289
            self.scheduler.remove_job(f'{window.id}-start')
290
        except JobLookupError:
291
            started = True
292
            log.info(f'Job to start {window.id} already removed.')
293
        try:
294
            self.scheduler.remove_job(f'{window.id}-end')
295
        except JobLookupError:
296
            ended = True
297
            log.info(f'Job to end {window.id} already removed.')
298
        if started and not ended:
299
            window.end_mw(self.controller)
300
301
    def get_maintenance(self, mw_id: MaintenanceID) -> MaintenanceWindow:
302
        """Get a single maintenance by id"""
303
        return self.db.get_window(mw_id)
304
305
    def list_maintenances(self) -> MaintenanceWindows:
306
        """Returns a list of all maintenances"""
307
        return self.db.get_windows()
308