build.main.Main.validate_item_existence()   B
last analyzed

Complexity

Conditions 7

Size

Total Lines 29
Code Lines 20

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 5
CRAP Score 8.1426

Importance

Changes 0
Metric Value
cc 7
eloc 20
nop 2
dl 0
loc 29
ccs 5
cts 7
cp 0.7143
crap 8.1426
rs 8
c 0
b 0
f 0
1
"""Main module of kytos/maintenance Kytos Network Application.
2
3
This NApp creates maintenance windows, allowing the maintenance of network
4
devices (switch, link, and interface) without receiving alerts.
5
"""
6
7 1
from datetime import timedelta
8
9 1
from napps.kytos.maintenance.managers import MaintenanceDeployer as Deployer
10 1
from napps.kytos.maintenance.managers import MaintenanceScheduler as Scheduler
11 1
from napps.kytos.maintenance.models import MaintenanceID
12 1
from napps.kytos.maintenance.models import MaintenanceWindow as MW
13 1
from napps.kytos.maintenance.models import OverlapError, Status
14 1
from pydantic import ValidationError
15 1
from pymongo.errors import DuplicateKeyError
16
17 1
from kytos.core import KytosNApp, rest
18 1
from kytos.core.rest_api import (
19
    HTTPException,
20
    JSONResponse,
21
    Request,
22
    Response,
23
    error_msg,
24
    get_json_or_400,
25
)
26
27
28 1
class Main(KytosNApp):
29
    """Main class of kytos/maintenance NApp.
30
31
    This class is the entry point for this napp.
32
    """
33
34 1
    def setup(self):
35
        """Replace the '__init__' method for the KytosNApp subclass.
36
37
        The setup method is automatically called by the controller when your
38
        application is loaded.
39
40
        So, if you have any setup routine, insert it here.
41
        """
42 1
        self.maintenance_deployer = Deployer.new_deployer(self.controller)
43 1
        self.scheduler = Scheduler.new_scheduler(self.maintenance_deployer)
44 1
        self.scheduler.start()
45
46 1
    def execute(self):
47
        """Run after the setup method execution.
48
49
        You can also use this method in loop mode if you add to the above setup
50
        method a line like the following example:
51
52
            self.execute_as_loop(30)  # 30-second interval.
53
        """
54
55 1
    def shutdown(self):
56
        """Run when your napp is unloaded.
57
58
        If you have some cleanup procedure, insert it here.
59
        """
60
        self.scheduler.shutdown()
61
62 1
    @rest("/v1", methods=["GET"])
63 1
    def get_all_mw(self, _request: Request) -> Response:
64
        """Return all maintenance windows."""
65 1
        maintenances = self.scheduler.list_maintenances()
66 1
        return Response(
67
            f"{maintenances.json()}\n",
68
            status_code=200,
69
            media_type="application/json",
70
        )
71
72 1
    @rest("/v1/{mw_id}", methods=["GET"])
73 1
    def get_mw(self, request: Request) -> Response:
74
        """Return one maintenance window."""
75 1
        mw_id: MaintenanceID = request.path_params["mw_id"]
76 1
        window = self.scheduler.get_maintenance(mw_id)
77 1
        if window:
78 1
            return Response(
79
                f"{window.json()}\n",
80
                status_code=200,
81
                media_type="application/json",
82
            )
83 1
        raise HTTPException(404, f"Maintenance with id {mw_id} not found")
84
85 1
    @rest("/v1", methods=["POST"])
86 1
    def create_mw(self, request: Response) -> JSONResponse:
87
        """Create a new maintenance window."""
88 1
        data = get_json_or_400(request, self.controller.loop)
89 1
        if not isinstance(data, dict) or not data:
90
            raise HTTPException(400, detail=f"Invalid json body value: {data}")
91
92 1
        if "status" in data:
93 1
            raise HTTPException(
94
                400, detail="Setting a maintenance status is not allowed"
95
            )
96
        # if 'id' in data:
97
        #     raise HTTPException(
98
        #         400, detail='Setting a maintenance id is not allowed'
99
        #     )
100 1
        try:
101 1
            maintenance = MW.parse_obj(data)
102 1
            force = data.get("force", False)
103 1
            ignore_no_exists = data.get("ignore_no_exists")
104 1
            if not ignore_no_exists:
105 1
                self.validate_item_existence(maintenance)
106 1
            self.scheduler.add(maintenance, force=force)
107 1
        except ValidationError as err:
108 1
            msg = error_msg(err.errors())
109 1
            raise HTTPException(400, detail=msg) from err
110
        except DuplicateKeyError as err:
111
            raise HTTPException(
112
                409, detail=f"Window with id: {maintenance.id} already exists"
113
            ) from err
114
        except OverlapError as err:
115
            raise HTTPException(400, detail=f"{err}") from err
116
        except ValueError as err:
117
            raise HTTPException(400, detail=f"{err}") from err
118 1
        return JSONResponse({"mw_id": maintenance.id}, status_code=201)
119
120 1
    @rest("/v1/{mw_id}", methods=["PATCH"])
121 1
    def update_mw(self, request: Request) -> JSONResponse:
122
        """Update a maintenance window."""
123 1
        data = get_json_or_400(request, self.controller.loop)
124 1
        if not isinstance(data, dict) or not data:
125
            raise HTTPException(400, detail=f"Invalid json body value: {data}")
126
127 1
        mw_id: MaintenanceID = request.path_params["mw_id"]
128 1
        old_maintenance = self.scheduler.get_maintenance(mw_id)
129 1
        if old_maintenance is None:
130 1
            raise HTTPException(404, detail=f"Maintenance with id {mw_id} not found")
131 1
        if old_maintenance.status == Status.RUNNING:
132
            raise HTTPException(
133
                400, detail="Updating a running maintenance is not allowed"
134
            )
135 1
        if "status" in data:
136 1
            raise HTTPException(
137
                400, detail="Updating a maintenance status is not allowed"
138
            )
139 1
        try:
140 1
            new_maintenance = MW.parse_obj({**old_maintenance.model_dump(), **data})
141 1
        except ValidationError as err:
142 1
            msg = error_msg(err.errors())
143 1
            raise HTTPException(400, detail=msg) from err
144 1
        if new_maintenance.id != old_maintenance.id:
145
            raise HTTPException(400, detail="Updated id must match old id")
146 1
        self.scheduler.update(new_maintenance)
147 1
        return JSONResponse({"response": f"Maintenance {mw_id} updated"})
148
149 1
    @rest("/v1/{mw_id}", methods=["DELETE"])
150 1
    def remove_mw(self, request: Request) -> JSONResponse:
151
        """Delete a maintenance window."""
152 1
        mw_id: MaintenanceID = request.path_params["mw_id"]
153 1
        maintenance = self.scheduler.get_maintenance(mw_id)
154 1
        if maintenance is None:
155 1
            raise HTTPException(404, detail=f"Maintenance with id {mw_id} not found")
156 1
        if maintenance.status == Status.RUNNING:
157 1
            raise HTTPException(
158
                400, detail="Deleting a running maintenance is not allowed"
159
            )
160 1
        self.scheduler.remove(mw_id)
161 1
        return JSONResponse(
162
            {"response": f"Maintenance with id {mw_id} successfully removed"}
163
        )
164
165 1
    @rest("/v1/{mw_id}/end", methods=["PATCH"])
166 1
    def end_mw(self, request: Request) -> JSONResponse:
167
        """Finish a maintenance window right now."""
168 1
        mw_id: MaintenanceID = request.path_params["mw_id"]
169 1
        maintenance = self.scheduler.get_maintenance(mw_id)
170 1
        if maintenance is None:
171 1
            raise HTTPException(404, detail=f"Maintenance with id {mw_id} not found")
172 1
        if maintenance.status == Status.PENDING:
173 1
            raise HTTPException(
174
                400, detail=f"Maintenance window {mw_id} has not yet started"
175
            )
176 1
        if maintenance.status == Status.FINISHED:
177 1
            raise HTTPException(
178
                400, detail=f"Maintenance window {mw_id} has already finished"
179
            )
180 1
        self.scheduler.end_maintenance_early(mw_id)
181 1
        return JSONResponse({"response": f"Maintenance window {mw_id} " f"finished"})
182
183 1
    @rest("/v1/{mw_id}/extend", methods=["PATCH"])
184 1
    def extend_mw(self, request: Request) -> JSONResponse:
185
        """Extend a running maintenance window."""
186 1
        mw_id: MaintenanceID = request.path_params["mw_id"]
187 1
        data = get_json_or_400(request, self.controller.loop)
188 1
        if not isinstance(data, dict):
189
            raise HTTPException(400, detail=f"Invalid json body value: {data}")
190
191 1
        maintenance = self.scheduler.get_maintenance(mw_id)
192 1
        if maintenance is None:
193 1
            raise HTTPException(404, detail=f"Maintenance with id {mw_id} not found")
194 1
        if "minutes" not in data:
195 1
            raise HTTPException(400, detail="Minutes of extension must be sent")
196 1
        if maintenance.status == Status.PENDING:
197 1
            raise HTTPException(
198
                400, detail=f"Maintenance window {mw_id} has not yet started"
199
            )
200 1
        if maintenance.status == Status.FINISHED:
201 1
            raise HTTPException(
202
                400, detail=f"Maintenance window {mw_id} has already finished"
203
            )
204 1
        try:
205 1
            maintenance_end = maintenance.end + timedelta(minutes=data["minutes"])
206 1
            new_maintenance = maintenance.copy(update={"end": maintenance_end})
207 1
        except TypeError as exc:
208 1
            raise HTTPException(
209
                400, detail="Minutes of extension must be integer"
210
            ) from exc
211
212 1
        self.scheduler.update(new_maintenance)
213 1
        return JSONResponse({"response": f"Maintenance {mw_id} extended"})
214
215 1
    def validate_item_existence(self, window: MW):
216
        """Validate that all items in a maintenance window exist."""
217 1
        non_existant_switches = list(
218
            filter(
219
                lambda switch_id: self.controller.switches.get(switch_id) is None,
220
                window.switches,
221
            )
222
        )
223 1
        non_existant_interfaces = list(
224
            filter(
225
                lambda interface_id: self.controller.get_interface_by_id(interface_id)
226
                is None,
227
                window.interfaces,
228
            )
229
        )
230 1
        non_existant_links = list(
231
            filter(
232
                lambda link_id: self.controller.links.get(link_id) is None,
233
                window.links,
234
            )
235
        )
236
237 1
        if non_existant_switches or non_existant_interfaces or non_existant_links:
238
            items = {
239
                "switches": non_existant_switches,
240
                "interfaces": non_existant_interfaces,
241
                "links": non_existant_links,
242
            }
243
            raise HTTPException(400, f"Window contains non-existant items: {items}")
244