Passed
Pull Request — master (#61)
by Antonio
02:26
created

build.models.MaintenanceWindow.link_from_dict()   A

Complexity

Conditions 3

Size

Total Lines 16
Code Lines 14

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 8.2077

Importance

Changes 0
Metric Value
eloc 14
dl 0
loc 16
ccs 2
cts 12
cp 0.1666
rs 9.7
c 0
b 0
f 0
cc 3
nop 2
crap 8.2077
1
"""Models used by the maintenance NApp.
2
3
This module define models for the maintenance window itself and the
4
scheduler.
5
"""
6 1
import datetime
7 1
from enum import IntEnum
8 1
from typing import Any, Optional, Union
9 1
from uuid import uuid4
10
11 1
import pytz
12 1
from apscheduler.jobstores.base import JobLookupError
13 1
from apscheduler.schedulers.background import BackgroundScheduler
14 1
from pydantic import BaseModel, Field, conlist
15
16 1
from kytos.core import KytosEvent, log
17 1
from kytos.core.interface import TAG, UNI
18 1
from kytos.core.link import Link
19
20 1
TIME_FMT = "%Y-%m-%dT%H:%M:%S%z"
21
22
23 1
class Status(IntEnum):
24
    """Maintenance windows status."""
25
26 1
    PENDING = 0
27 1
    RUNNING = 1
28 1
    FINISHED = 2
29
30
31 1
class MaintenanceWindow(BaseModel):
32
    """Class to store a maintenance window."""
33
34 1
    items: conlist(Union[UNI, Link, str], min_items=1)
35 1
    id: str = Field(default_factory=lambda: uuid4().hex)
36 1
    description: Optional[str]
37 1
    start: datetime.datetime
38 1
    end: datetime.datetime
39 1
    status: Status = Status.PENDING
40 1
    controller: Any = Field(..., exclude=True)
41
42 1
    class Config:
43
        """Pydantic configuration."""
44 1
        arbitrary_types_allowed = True
45 1
        json_encoders = {
46
            datetime: lambda dt: dt.strftime(TIME_FMT)
47
        }
48
49 1
    def as_dict(self):
50
        """Return this maintenance window as a dictionary."""
51 1
        return self.dict(exclude_none=True)
52
53 1
    @classmethod
54 1
    def from_dict(cls, mw_dict, controller):
55
        """Create a maintenance window from a dictionary of attributes."""
56
        return cls(controller=controller, **mw_dict)
57
58 1
    def update(self, mw_dict):
59
        """Update a maintenance window with the data from a dictionary."""
60 1
        try:
61 1
            start = self.str_to_datetime(mw_dict['start'])
62 1
        except KeyError:
63 1
            start = self.start
64 1
        try:
65 1
            end = self.str_to_datetime(mw_dict['end'])
66 1
        except KeyError:
67 1
            end = self.end
68 1
        now = datetime.datetime.now(pytz.utc)
69 1
        if start < now:
70
            raise ValueError('Start in the past not allowed.')
71 1
        if end < start:
72
            raise ValueError('End before start not allowed.')
73 1
        if 'items' in mw_dict:
74 1
            if not mw_dict['items']:
75 1
                raise ValueError('At least one item must be provided')
76 1
            self.items = mw_dict['items']
77 1
        self.start = start
78 1
        self.end = end
79 1
        if 'description' in mw_dict:
80
            self.description = mw_dict['description']
81
82 1
    @staticmethod
83 1
    def intf_from_dict(intf_id, controller):
84
        """Get the Interface instance with intf_id."""
85
        intf = controller.get_interface_by_id(intf_id)
86
        return intf
87
88 1
    @staticmethod
89 1
    def uni_from_dict(uni_dict, controller):
90
        """Create UNI instance from a dictionary."""
91
        intf = MaintenanceWindow.intf_from_dict(uni_dict['interface_id'],
92
                                                controller)
93
        tag = TAG.from_dict(uni_dict['tag'])
94
        if intf and tag:
95
            return UNI(intf, tag)
96
        return None
97
98 1
    @staticmethod
99 1
    def link_from_dict(link_dict, controller):
100
        """Create a link instance from a dictionary."""
101
        endpoint_a = controller.get_interface_by_id(
102
            link_dict['endpoint_a']['id'])
103
        endpoint_b = controller.get_interface_by_id(
104
            link_dict['endpoint_b']['id'])
105
106
        link = Link(endpoint_a, endpoint_b)
107
        if 'metadata' in link_dict:
108
            link.extend_metadata(link_dict['metadata'])
109
        s_vlan = link.get_metadata('s_vlan')
110
        if s_vlan:
111
            tag = TAG.from_dict(s_vlan)
112
            link.update_metadata('s_vlan', tag)
113
        return link
114
115 1
    @staticmethod
116 1
    def str_to_datetime(str_date):
117
        """Convert a string representing a date and time to datetime."""
118 1
        date = datetime.datetime.strptime(str_date, TIME_FMT)
119 1
        return date.astimezone(pytz.utc)
120
121 1
    def maintenance_event(self, operation):
122
        """Create events to start/end a maintenance."""
123 1
        switches = []
124 1
        unis = []
125 1
        links = []
126 1
        for item in self.items:
127 1
            if isinstance(item, UNI):
128 1
                unis.append(UNI)
129 1
            elif isinstance(item, Link):
130 1
                links.append(item)
131
            else:
132 1
                switch = self.controller.switches.get(item, None)
133 1
                if switch:
134 1
                    switches.append(switch)
135 1
        if switches:
136 1
            event = KytosEvent(name=f'kytos/maintenance.{operation}_switch',
137
                               content={'switches': switches})
138 1
            self.controller.buffers.app.put(event)
139 1
        if unis:
140 1
            event = KytosEvent(name=f'kytos/maintenance.{operation}_uni',
141
                               content={'unis': unis})
142 1
            self.controller.buffers.app.put(event)
143 1
        if links:
144 1
            event = KytosEvent(name=f'kytos/maintenance.{operation}_link',
145
                               content={'links': links})
146 1
            self.controller.buffers.app.put(event)
147
148 1
    def start_mw(self):
149
        """Actions taken when a maintenance window starts."""
150 1
        self.status = Status.RUNNING
151 1
        self.maintenance_event('start')
152
153 1
    def end_mw(self):
154
        """Actions taken when a maintenance window finishes."""
155 1
        self.status = Status.FINISHED
156 1
        self.maintenance_event('end')
157
158
159 1
class Scheduler:
160
    """Scheduler for a maintenance window."""
161
162 1
    def __init__(self):
163
        """Initialize a new scheduler."""
164 1
        self.scheduler = BackgroundScheduler(timezone=pytz.utc)
165 1
        self.scheduler.start()
166
167 1
    def add(self, maintenance):
168
        """Add jobs to start and end a maintenance window."""
169 1
        self.scheduler.add_job(maintenance.start_mw, 'date',
170
                               id=f'{maintenance.id}-start',
171
                               run_date=maintenance.start)
172 1
        self.scheduler.add_job(maintenance.end_mw, 'date',
173
                               id=f'{maintenance.id}-end',
174
                               run_date=maintenance.end)
175
176 1
    def remove(self, maintenance):
177
        """Remove jobs that start and end a maintenance window."""
178 1
        try:
179 1
            self.scheduler.remove_job(f'{maintenance.id}-start')
180 1
        except JobLookupError:
181 1
            log.info(f'Job to start {maintenance.id} already removed.')
182 1
        try:
183 1
            self.scheduler.remove_job(f'{maintenance.id}-end')
184 1
        except JobLookupError:
185
            log.info(f'Job to end {maintenance.id} already removed.')
186