Passed
Pull Request — master (#136)
by Aldo
03:42
created

build.main.Main.remove_mw()   A

Complexity

Conditions 3

Size

Total Lines 14
Code Lines 12

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 10
CRAP Score 3

Importance

Changes 0
Metric Value
cc 3
eloc 12
nop 2
dl 0
loc 14
rs 9.8
c 0
b 0
f 0
ccs 10
cts 10
cp 1
crap 3
1
"""Main module of kytos/maintenance Kytos Network Application.
2
3
This NApp creates maintenance windows, allowing the maintenance of network
4
devices (switch, link, and interface) without receiving alerts.
5
"""
6
7 1
import pathlib
8 1
from datetime import timedelta
9
10 1
from napps.kytos.maintenance.managers import MaintenanceDeployer as Deployer
11 1
from napps.kytos.maintenance.managers import MaintenanceScheduler as Scheduler
12 1
from napps.kytos.maintenance.models import MaintenanceID
13 1
from napps.kytos.maintenance.models import MaintenanceWindow as MW
14 1
from napps.kytos.maintenance.models import OverlapError, Status
15 1
from pydantic import ValidationError
16 1
from pymongo.errors import DuplicateKeyError
17
18 1
from kytos.core import KytosNApp, rest
19 1
from kytos.core.helpers import load_spec, validate_openapi
20 1
from kytos.core.rest_api import (
21
    HTTPException,
22
    JSONResponse,
23
    Request,
24
    Response,
25
    error_msg,
26
    get_json_or_400,
27
)
28
29
30 1
class Main(KytosNApp):
31
    """Main class of kytos/maintenance NApp.
32
33
    This class is the entry point for this napp.
34
    """
35
36 1
    spec = load_spec(pathlib.Path(__file__).parent / "openapi.yml")
37
38 1
    def setup(self):
39
        """Replace the '__init__' method for the KytosNApp subclass.
40
41
        The setup method is automatically called by the controller when your
42
        application is loaded.
43
44
        So, if you have any setup routine, insert it here.
45
        """
46 1
        self.maintenance_deployer = Deployer.new_deployer(self.controller)
47 1
        self.scheduler = Scheduler.new_scheduler(self.maintenance_deployer)
48 1
        self.scheduler.start()
49
50 1
    def execute(self):
51
        """Run after the setup method execution.
52
53
        You can also use this method in loop mode if you add to the above setup
54
        method a line like the following example:
55
56
            self.execute_as_loop(30)  # 30-second interval.
57
        """
58
59 1
    def shutdown(self):
60
        """Run when your napp is unloaded.
61
62
        If you have some cleanup procedure, insert it here.
63
        """
64
        self.scheduler.shutdown()
65
66 1
    @rest("/v1", methods=["GET"])
67 1
    def get_all_mw(self, _request: Request) -> Response:
68
        """Return all maintenance windows."""
69 1
        maintenances = self.scheduler.list_maintenances()
70 1
        return Response(
71
            f"{maintenances.json()}\n",
72
            status_code=200,
73
            media_type="application/json",
74
        )
75
76 1
    @rest("/v1/{mw_id}", methods=["GET"])
77 1
    def get_mw(self, request: Request) -> Response:
78
        """Return one maintenance window."""
79 1
        mw_id: MaintenanceID = request.path_params["mw_id"]
80 1
        window = self.scheduler.get_maintenance(mw_id)
81 1
        if window:
82 1
            return Response(
83
                f"{window.json()}\n",
84
                status_code=200,
85
                media_type="application/json",
86
            )
87 1
        raise HTTPException(404, f"Maintenance with id {mw_id} not found")
88
89 1
    @rest("/v1", methods=["POST"])
90 1
    def create_mw(self, request: Response) -> JSONResponse:
91
        """Create a new maintenance window."""
92 1
        data = get_json_or_400(request, self.controller.loop)
93 1
        if not isinstance(data, dict) or not data:
94
            raise HTTPException(400, detail=f"Invalid json body value: {data}")
95
96 1
        if "status" in data:
97 1
            raise HTTPException(
98
                400, detail="Setting a maintenance status is not allowed"
99
            )
100
        # if 'id' in data:
101
        #     raise HTTPException(
102
        #         400, detail='Setting a maintenance id is not allowed'
103
        #     )
104 1
        try:
105 1
            maintenance = MW.model_validate(data)
106 1
            force = data.get("force", False)
107 1
            ignore_no_exists = data.get("ignore_no_exists")
108 1
            if not ignore_no_exists:
109 1
                self.validate_item_existence(maintenance)
110 1
            self.scheduler.add(maintenance, force=force)
111 1
        except ValidationError as err:
112 1
            msg = error_msg(err.errors())
113 1
            raise HTTPException(400, detail=msg) from err
114
        except DuplicateKeyError as err:
115
            raise HTTPException(
116
                409, detail=f"Window with id: {maintenance.id} already exists"
117
            ) from err
118
        except OverlapError as err:
119
            raise HTTPException(400, detail=f"{err}") from err
120
        except ValueError as err:
121
            raise HTTPException(400, detail=f"{err}") from err
122 1
        return JSONResponse({"mw_id": maintenance.id}, status_code=201)
123
124 1
    @rest("/v1/{mw_id}", methods=["PATCH"])
125 1
    def update_mw(self, request: Request) -> JSONResponse:
126
        """Update a maintenance window."""
127 1
        data = get_json_or_400(request, self.controller.loop)
128 1
        if not isinstance(data, dict) or not data:
129
            raise HTTPException(400, detail=f"Invalid json body value: {data}")
130
131 1
        mw_id: MaintenanceID = request.path_params["mw_id"]
132 1
        old_maintenance = self.scheduler.get_maintenance(mw_id)
133 1
        if old_maintenance is None:
134 1
            raise HTTPException(404, detail=f"Maintenance with id {mw_id} not found")
135 1
        if old_maintenance.status == Status.RUNNING:
136
            raise HTTPException(
137
                400, detail="Updating a running maintenance is not allowed"
138
            )
139 1
        if "status" in data:
140 1
            raise HTTPException(
141
                400, detail="Updating a maintenance status is not allowed"
142
            )
143 1
        try:
144 1
            new_maintenance = MW.model_validate(
145
                {**old_maintenance.model_dump(), **data}
146
            )
147 1
        except ValidationError as err:
148 1
            msg = error_msg(err.errors())
149 1
            raise HTTPException(400, detail=msg) from err
150 1
        if new_maintenance.id != old_maintenance.id:
151
            raise HTTPException(400, detail="Updated id must match old id")
152 1
        self.scheduler.update(new_maintenance)
153 1
        return JSONResponse({"response": f"Maintenance {mw_id} updated"})
154
155 1
    @rest("/v1/{mw_id}", methods=["DELETE"])
156 1
    def remove_mw(self, request: Request) -> JSONResponse:
157
        """Delete a maintenance window."""
158 1
        mw_id: MaintenanceID = request.path_params["mw_id"]
159 1
        maintenance = self.scheduler.get_maintenance(mw_id)
160 1
        if maintenance is None:
161 1
            raise HTTPException(404, detail=f"Maintenance with id {mw_id} not found")
162 1
        if maintenance.status == Status.RUNNING:
163 1
            raise HTTPException(
164
                400, detail="Deleting a running maintenance is not allowed"
165
            )
166 1
        self.scheduler.remove(mw_id)
167 1
        return JSONResponse(
168
            {"response": f"Maintenance with id {mw_id} successfully removed"}
169
        )
170
171 1
    @rest("/v1/{mw_id}/end", methods=["PATCH"])
172 1
    def end_mw(self, request: Request) -> JSONResponse:
173
        """Finish a maintenance window right now."""
174 1
        mw_id: MaintenanceID = request.path_params["mw_id"]
175 1
        maintenance = self.scheduler.get_maintenance(mw_id)
176 1
        if maintenance is None:
177 1
            raise HTTPException(404, detail=f"Maintenance with id {mw_id} not found")
178 1
        if maintenance.status == Status.PENDING:
179 1
            raise HTTPException(
180
                400, detail=f"Maintenance window {mw_id} has not yet started"
181
            )
182 1
        if maintenance.status == Status.FINISHED:
183 1
            raise HTTPException(
184
                400, detail=f"Maintenance window {mw_id} has already finished"
185
            )
186 1
        self.scheduler.end_maintenance_early(mw_id)
187 1
        return JSONResponse({"response": f"Maintenance window {mw_id} " f"finished"})
188
189 1
    @rest("/v1/{mw_id}/extend", methods=["PATCH"])
190 1
    @validate_openapi(spec)
191 1
    def extend_mw(self, request: Request) -> JSONResponse:
192
        """Extend a running maintenance window."""
193 1
        mw_id: MaintenanceID = request.path_params["mw_id"]
194 1
        data = get_json_or_400(request, self.controller.loop)
195 1
        if not isinstance(data, dict):
196
            raise HTTPException(400, detail=f"Invalid json body value: {data}")
197
198 1
        maintenance = self.scheduler.get_maintenance(mw_id)
199 1
        if maintenance is None:
200 1
            raise HTTPException(404, detail=f"Maintenance with id {mw_id} not found")
201 1
        if maintenance.status == Status.PENDING:
202 1
            raise HTTPException(
203
                400, detail=f"Maintenance window {mw_id} has not yet started"
204
            )
205 1
        if maintenance.status == Status.FINISHED:
206 1
            raise HTTPException(
207
                400, detail=f"Maintenance window {mw_id} has already finished"
208
            )
209 1
        maintenance_end = maintenance.end + timedelta(**data)
210 1
        new_maintenance = maintenance.copy(update={"end": maintenance_end})
211
212 1
        self.scheduler.update(new_maintenance)
213 1
        return JSONResponse({"response": f"Maintenance {mw_id} extended"})
214
215 1
    def validate_item_existence(self, window: MW):
216
        """Validate that all items in a maintenance window exist."""
217 1
        non_existant_switches = list(
218
            filter(
219
                lambda switch_id: self.controller.switches.get(switch_id) is None,
220
                window.switches,
221
            )
222
        )
223 1
        non_existant_interfaces = list(
224
            filter(
225
                lambda interface_id: self.controller.get_interface_by_id(interface_id)
226
                is None,
227
                window.interfaces,
228
            )
229
        )
230 1
        non_existant_links = list(
231
            filter(
232
                lambda link_id: self.controller.links.get(link_id) is None,
233
                window.links,
234
            )
235
        )
236
237 1
        if non_existant_switches or non_existant_interfaces or non_existant_links:
238
            items = {
239
                "switches": non_existant_switches,
240
                "interfaces": non_existant_interfaces,
241
                "links": non_existant_links,
242
            }
243
            raise HTTPException(400, f"Window contains non-existant items: {items}")
244