Issues (26)

controllers/__init__.py (1 issue)

1
"""TopoController."""
2
3
# pylint: disable=invalid-name
4 1
import os
5 1
import re
6 1
from datetime import datetime
7 1
from threading import Lock
8 1
from typing import List, Optional, Tuple
9
10 1
import pymongo
11 1
from pymongo.collection import ReturnDocument
12 1
from pymongo.errors import ConnectionFailure, ExecutionTimeout
13 1
from pymongo.operations import UpdateOne
14 1
from tenacity import retry_if_exception_type, stop_after_attempt, wait_random
15
16 1
from kytos.core import log
17 1
from kytos.core.db import Mongo
18 1
from kytos.core.retry import before_sleep, for_all_methods, retries
19 1
from napps.kytos.topology.db.models import (InterfaceDetailDoc, LinkDoc,
20
                                            SwitchDoc)
21
22
23 1
@for_all_methods(
24
    retries,
25
    stop=stop_after_attempt(
26
        int(os.environ.get("MONGO_AUTO_RETRY_STOP_AFTER_ATTEMPT", 3))
27
    ),
28
    wait=wait_random(
29
        min=int(os.environ.get("MONGO_AUTO_RETRY_WAIT_RANDOM_MIN", 0.1)),
30
        max=int(os.environ.get("MONGO_AUTO_RETRY_WAIT_RANDOM_MAX", 1)),
31
    ),
32
    before_sleep=before_sleep,
33
    retry=retry_if_exception_type((ConnectionFailure, ExecutionTimeout)),
34
)
35 1
class TopoController:
36
    """TopoController."""
37
38 1
    def __init__(self, get_mongo=lambda: Mongo()) -> None:
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable Mongo does not seem to be defined.
Loading history...
39
        """Constructor of TopoController."""
40 1
        self.mongo = get_mongo()
41 1
        self.db_client = self.mongo.client
42 1
        self.db = self.db_client[self.mongo.db_name]
43
44 1
    def bootstrap_indexes(self) -> None:
45
        """Bootstrap all topology related indexes."""
46 1
        index_tuples = [
47
            ("switches", [("interfaces.id", pymongo.ASCENDING)]),
48
            ("links", [("endpoints.id", pymongo.ASCENDING)]),
49
        ]
50 1
        for collection, keys in index_tuples:
51 1
            if self.mongo.bootstrap_index(collection, keys):
52 1
                log.info(
53
                    f"Created DB index {keys}, collection: {collection})"
54
                )
55
56 1
    def get_topology(self) -> dict:
57
        """Get topology from DB."""
58 1
        switches = self.get_switches()
59 1
        links = self.get_links()
60 1
        return {"topology": {**links, **switches}}
61
62 1
    def get_switches(self) -> dict:
63
        """Get switches from DB."""
64 1
        switches = self.db.switches.aggregate(
65
            [
66
                {"$sort": {"_id": 1}},
67
                {"$project": SwitchDoc.projection()},
68
            ]
69
        )
70 1
        return {"switches": {value["id"]: value for value in switches}}
71
72 1
    def get_links(self) -> dict:
73
        """Get links from DB."""
74 1
        links = self.db.links.aggregate(
75
            [
76
                {"$sort": {"_id": 1}},
77
                {"$project": LinkDoc.projection()},
78
            ]
79
        )
80 1
        return {"links": {value["id"]: value for value in links}}
81
82 1
    def get_interfaces(self) -> dict:
83
        """Get interfaces from DB."""
84 1
        interfaces = self.db.switches.aggregate(
85
            [
86
                {"$sort": {"_id": 1}},
87
                {"$project": {"interfaces": 1, "_id": 0}},
88
                {"$unwind": "$interfaces"},
89
                {"$replaceRoot": {"newRoot": "$interfaces"}},
90
            ]
91
        )
92 1
        return {"interfaces": {value["id"]: value for value in interfaces}}
93
94 1
    @staticmethod
95 1
    def _set_updated_at(update_expr: dict) -> None:
96
        """Set updated_at on $set expression."""
97 1
        if "$set" in update_expr:
98 1
            update_expr["$set"].update({"updated_at": datetime.utcnow()})
99
        else:
100 1
            update_expr.update({"$set": {"updated_at": datetime.utcnow()}})
101
102 1
    def _update_switch(self, dpid: str, update_expr: dict) -> Optional[dict]:
103
        """Try to find one switch and update it given an update expression."""
104 1
        self._set_updated_at(update_expr)
105 1
        return self.db.switches.find_one_and_update({"_id": dpid}, update_expr)
106
107 1
    def upsert_switch(self, dpid: str, switch_dict: dict) -> Optional[dict]:
108
        """Update or insert switch."""
109 1
        utc_now = datetime.utcnow()
110 1
        model = SwitchDoc(
111
            **{**switch_dict, **{"_id": dpid, "updated_at": utc_now}}
112
        )
113 1
        updated = self.db.switches.find_one_and_update(
114
            {"_id": dpid},
115
            {
116
                "$set": model.model_dump(exclude={"inserted_at"}),
117
                "$setOnInsert": {"inserted_at": utc_now},
118
            },
119
            return_document=ReturnDocument.AFTER,
120
            upsert=True,
121
        )
122 1
        return updated
123
124 1
    def enable_switch(self, dpid: str) -> Optional[dict]:
125
        """Try to find one switch and enable it."""
126 1
        return self._update_switch(dpid, {"$set": {"enabled": True}})
127
128 1
    def disable_switch(self, dpid: str) -> Optional[dict]:
129
        """Try to find one switch and disable it."""
130 1
        return self._update_switch(
131
            dpid, {"$set": {"enabled": False, "interfaces.$[].enabled": False}}
132
        )
133
134 1
    def add_switch_metadata(self, dpid: str, metadata: dict) -> Optional[dict]:
135
        """Try to find a switch and add to its metadata."""
136 1
        update_expr = {
137
            "$set": {f"metadata.{k}": v for k, v in metadata.items()}
138
        }
139 1
        return self._update_switch(dpid, update_expr)
140
141 1
    def delete_switch_metadata_key(
142
        self, dpid: str, key: str
143
    ) -> Optional[dict]:
144
        """Try to find a switch and delete a metadata key."""
145 1
        return self._update_switch(dpid, {"$unset": {f"metadata.{key}": ""}})
146
147 1
    def enable_interface(self, interface_id: str) -> Optional[dict]:
148
        """Try to enable one interface and its embedded object on links."""
149 1
        return self._update_interface(
150
            interface_id, {"$set": {"enabled": True}}
151
        )
152
153 1
    def disable_interface(self, interface_id: str) -> Optional[dict]:
154
        """Try to disable one interface and its embedded object on links."""
155 1
        return self._update_interface(
156
            interface_id, {"$set": {"enabled": False}}
157
        )
158
159 1
    def add_interface_metadata(
160
        self, interface_id: str, metadata: dict
161
    ) -> Optional[dict]:
162
        """Try to find an interface and add to its metadata."""
163 1
        update_expr = {
164
            "$set": {f"metadata.{k}": v for k, v in metadata.items()}
165
        }
166 1
        return self._update_interface(interface_id, update_expr)
167
168 1
    def delete_interface_metadata_key(
169
        self, interface_id: str, key: str
170
    ) -> Optional[dict]:
171
        """Try to find an interface and delete a metadata key."""
172 1
        return self._update_interface(
173
            interface_id, {"$unset": {f"metadata.{key}": ""}}
174
        )
175
176 1
    def _update_interface(
177
        self, interface_id: str, update_expr: dict
178
    ) -> Optional[dict]:
179
        """Try to update one interface and its embedded object on links."""
180 1
        self._set_updated_at(update_expr)
181 1
        interfaces_expression = {}
182 1
        for operator, values in update_expr.items():
183 1
            interfaces_expression[operator] = {
184
                f"interfaces.$.{k}": v for k, v in values.items()
185
            }
186 1
        return self.db.switches.find_one_and_update(
187
            {"interfaces.id": interface_id},
188
            interfaces_expression,
189
            return_document=ReturnDocument.AFTER,
190
        )
191
192 1
    def upsert_link(self, link_id: str, link_dict: dict) -> dict:
193
        """Update or insert a Link."""
194 1
        utc_now = datetime.utcnow()
195
196 1
        endpoint_a = link_dict.get("endpoint_a")
197 1
        endpoint_b = link_dict.get("endpoint_b")
198 1
        model = LinkDoc(
199
            **{
200
                **link_dict,
201
                **{
202
                    "updated_at": utc_now,
203
                    "_id": link_id,
204
                    "endpoints": [endpoint_a, endpoint_b],
205
                },
206
            }
207
        )
208 1
        updated = self.db.links.find_one_and_update(
209
            {"_id": link_id},
210
            {
211
                "$set": model.model_dump(exclude={"inserted_at"}),
212
                "$setOnInsert": {"inserted_at": utc_now},
213
            },
214
            return_document=ReturnDocument.AFTER,
215
            upsert=True,
216
        )
217 1
        self.db.switches.find_one_and_update(
218
            {"interfaces.id": endpoint_a},
219
            {
220
                "$set": {
221
                    "interfaces.$.link_id": link_id,
222
                    "interfaces.$.link_side": "endpoint_a",
223
                    "updated_at": utc_now,
224
                }
225
            },
226
        )
227 1
        self.db.switches.find_one_and_update(
228
            {"interfaces.id": endpoint_b},
229
            {
230
                "$set": {
231
                    "interfaces.$.link_id": link_id,
232
                    "interfaces.$.link_side": "endpoint_b",
233
                    "updated_at": utc_now,
234
                }
235
            },
236
        )
237 1
        return updated
238
239 1
    def _update_link(self, link_id: str, update_expr: dict) -> Optional[dict]:
240
        """Try to find one link and update it given an update expression."""
241 1
        self._set_updated_at(update_expr)
242 1
        return self.db.links.find_one_and_update({"_id": link_id}, update_expr)
243
244 1
    def enable_link(self, link_id: str) -> Optional[dict]:
245
        """Try to find one link and enable it."""
246 1
        return self._update_link(link_id, {"$set": {"enabled": True}})
247
248 1
    def disable_link(self, link_id: str) -> Optional[dict]:
249
        """Try to find one link and disable it."""
250 1
        return self._update_link(link_id, {"$set": {"enabled": False}})
251
252 1
    def bulk_disable_links(self, link_ids: set[str]) -> int:
253
        """Bulk update that disables found links."""
254 1
        if not link_ids:
255 1
            return 0
256 1
        ops = []
257 1
        for _id in link_ids:
258 1
            ops.append(
259
                UpdateOne(
260
                    {"_id": _id},
261
                    {
262
                        "$set":
263
                        {
264
                            "updated_at": datetime.utcnow(),
265
                            "enabled": False,
266
                        }
267
                    }
268
                )
269
            )
270 1
        return self.db.links.bulk_write(ops).modified_count
271
272 1
    def add_link_metadata(
273
        self, link_id: str, metadata: dict
274
    ) -> Optional[dict]:
275
        """Try to find link and add to its metadata."""
276 1
        update_expr = {
277
            "$set": {f"metadata.{k}": v for k, v in metadata.items()}
278
        }
279 1
        return self._update_link(link_id, update_expr)
280
281 1
    def delete_link_metadata_key(
282
        self, link_id: str, key: str
283
    ) -> Optional[dict]:
284
        """Try to find a link and delete a metadata key."""
285 1
        return self._update_link(link_id, {"$unset": {f"metadata.{key}": ""}})
286
287 1
    def bulk_delete_link_metadata_key(
288
        self, link_ids: List[str], key: str
289
    ) -> Optional[dict]:
290
        """Bulk delelete link metadata key."""
291
        update_expr = {"$unset": {f"metadata.{key}": 1}}
292
        self._set_updated_at(update_expr)
293
        return self.db.links.update_many({"_id": {"$in": link_ids}},
294
                                         update_expr)
295
296
    # pylint: disable=too-many-arguments
297 1
    def upsert_interface_details(
298
        self,
299
        id_: str,
300
        available_tags: dict[str, list[list[int]]],
301
        tag_ranges: dict[str, list[list[int]]],
302
        special_available_tags: dict[str, list[str]],
303
        special_tags: dict[str, list[str]]
304
    ) -> Optional[dict]:
305
        """Update or insert interfaces details."""
306 1
        utc_now = datetime.utcnow()
307 1
        model = InterfaceDetailDoc(**{
308
                "_id": id_,
309
                "available_tags": available_tags,
310
                "tag_ranges": tag_ranges,
311
                "special_available_tags": special_available_tags,
312
                "special_tags": special_tags,
313
                "updated_at": utc_now
314
        }).model_dump(exclude={"inserted_at"})
315 1
        updated = self.db.interface_details.find_one_and_update(
316
            {"_id": id_},
317
            {
318
                "$set": model,
319
                "$setOnInsert": {"inserted_at": utc_now},
320
            },
321
            return_document=ReturnDocument.AFTER,
322
            upsert=True,
323
        )
324 1
        return updated
325
326 1
    def get_interfaces_details(
327
        self, interface_ids: List[str]
328
    ) -> Optional[dict]:
329
        """Try to get interfaces details given a list of interface ids."""
330 1
        return self.db.interface_details.aggregate(
331
            [
332
                {"$match": {"_id": {"$in": interface_ids}}},
333
            ]
334
        )
335
336 1
    def delete_link(self, link_id: str) -> Optional[dict]:
337
        """Delete a link by its id."""
338 1
        return self.db.links.find_one_and_delete(
339
            {"_id": link_id}
340
        )
341
342 1
    def delete_switch_data(
343
        self,
344
        switch_dpid: str
345
    ) -> tuple[Optional[dict], int]:
346
        """Delete a switch related data in database."""
347 1
        regex = re.compile(rf'^{switch_dpid}:\d+$')
348 1
        det_result = self.db.interface_details.delete_many(
349
            {"_id": {"$regex": regex}}
350
        )
351 1
        swt_result = self.db.switches.find_one_and_delete(
352
            {"_id": switch_dpid}
353
        )
354 1
        return (swt_result, det_result.deleted_count)
355
356 1
    def delete_interface_from_details(self, intf_id: str) -> Optional[dict]:
357
        """Delete interface from interface_details."""
358 1
        return self.db.interface_details.find_one_and_delete(
359
            {"_id": intf_id}
360
        )
361