1 | """TopoController.""" |
||
2 | |||
3 | # pylint: disable=invalid-name |
||
4 | 1 | import os |
|
5 | 1 | import re |
|
6 | 1 | from datetime import datetime |
|
7 | 1 | from threading import Lock |
|
8 | 1 | from typing import List, Optional, Tuple |
|
9 | |||
10 | 1 | import pymongo |
|
11 | 1 | from pymongo.collection import ReturnDocument |
|
12 | 1 | from pymongo.errors import ConnectionFailure, ExecutionTimeout |
|
13 | 1 | from pymongo.operations import UpdateOne |
|
14 | 1 | from tenacity import retry_if_exception_type, stop_after_attempt, wait_random |
|
15 | |||
16 | 1 | from kytos.core import log |
|
17 | 1 | from kytos.core.db import Mongo |
|
18 | 1 | from kytos.core.retry import before_sleep, for_all_methods, retries |
|
19 | 1 | from napps.kytos.topology.db.models import (InterfaceDetailDoc, LinkDoc, |
|
20 | SwitchDoc) |
||
21 | |||
22 | |||
23 | 1 | @for_all_methods( |
|
24 | retries, |
||
25 | stop=stop_after_attempt( |
||
26 | int(os.environ.get("MONGO_AUTO_RETRY_STOP_AFTER_ATTEMPT", 3)) |
||
27 | ), |
||
28 | wait=wait_random( |
||
29 | min=int(os.environ.get("MONGO_AUTO_RETRY_WAIT_RANDOM_MIN", 0.1)), |
||
30 | max=int(os.environ.get("MONGO_AUTO_RETRY_WAIT_RANDOM_MAX", 1)), |
||
31 | ), |
||
32 | before_sleep=before_sleep, |
||
33 | retry=retry_if_exception_type((ConnectionFailure, ExecutionTimeout)), |
||
34 | ) |
||
35 | 1 | class TopoController: |
|
36 | """TopoController.""" |
||
37 | |||
38 | 1 | def __init__(self, get_mongo=lambda: Mongo()) -> None: |
|
0 ignored issues
–
show
Comprehensibility
Best Practice
introduced
by
![]() |
|||
39 | """Constructor of TopoController.""" |
||
40 | 1 | self.mongo = get_mongo() |
|
41 | 1 | self.db_client = self.mongo.client |
|
42 | 1 | self.db = self.db_client[self.mongo.db_name] |
|
43 | |||
44 | 1 | def bootstrap_indexes(self) -> None: |
|
45 | """Bootstrap all topology related indexes.""" |
||
46 | 1 | index_tuples = [ |
|
47 | ("switches", [("interfaces.id", pymongo.ASCENDING)]), |
||
48 | ("links", [("endpoints.id", pymongo.ASCENDING)]), |
||
49 | ] |
||
50 | 1 | for collection, keys in index_tuples: |
|
51 | 1 | if self.mongo.bootstrap_index(collection, keys): |
|
52 | 1 | log.info( |
|
53 | f"Created DB index {keys}, collection: {collection})" |
||
54 | ) |
||
55 | |||
56 | 1 | def get_topology(self) -> dict: |
|
57 | """Get topology from DB.""" |
||
58 | 1 | switches = self.get_switches() |
|
59 | 1 | links = self.get_links() |
|
60 | 1 | return {"topology": {**links, **switches}} |
|
61 | |||
62 | 1 | def get_switches(self) -> dict: |
|
63 | """Get switches from DB.""" |
||
64 | 1 | switches = self.db.switches.aggregate( |
|
65 | [ |
||
66 | {"$sort": {"_id": 1}}, |
||
67 | {"$project": SwitchDoc.projection()}, |
||
68 | ] |
||
69 | ) |
||
70 | 1 | return {"switches": {value["id"]: value for value in switches}} |
|
71 | |||
72 | 1 | def get_links(self) -> dict: |
|
73 | """Get links from DB.""" |
||
74 | 1 | links = self.db.links.aggregate( |
|
75 | [ |
||
76 | {"$sort": {"_id": 1}}, |
||
77 | {"$project": LinkDoc.projection()}, |
||
78 | ] |
||
79 | ) |
||
80 | 1 | return {"links": {value["id"]: value for value in links}} |
|
81 | |||
82 | 1 | def get_interfaces(self) -> dict: |
|
83 | """Get interfaces from DB.""" |
||
84 | 1 | interfaces = self.db.switches.aggregate( |
|
85 | [ |
||
86 | {"$sort": {"_id": 1}}, |
||
87 | {"$project": {"interfaces": 1, "_id": 0}}, |
||
88 | {"$unwind": "$interfaces"}, |
||
89 | {"$replaceRoot": {"newRoot": "$interfaces"}}, |
||
90 | ] |
||
91 | ) |
||
92 | 1 | return {"interfaces": {value["id"]: value for value in interfaces}} |
|
93 | |||
94 | 1 | @staticmethod |
|
95 | 1 | def _set_updated_at(update_expr: dict) -> None: |
|
96 | """Set updated_at on $set expression.""" |
||
97 | 1 | if "$set" in update_expr: |
|
98 | 1 | update_expr["$set"].update({"updated_at": datetime.utcnow()}) |
|
99 | else: |
||
100 | 1 | update_expr.update({"$set": {"updated_at": datetime.utcnow()}}) |
|
101 | |||
102 | 1 | def _update_switch(self, dpid: str, update_expr: dict) -> Optional[dict]: |
|
103 | """Try to find one switch and update it given an update expression.""" |
||
104 | 1 | self._set_updated_at(update_expr) |
|
105 | 1 | return self.db.switches.find_one_and_update({"_id": dpid}, update_expr) |
|
106 | |||
107 | 1 | def upsert_switch(self, dpid: str, switch_dict: dict) -> Optional[dict]: |
|
108 | """Update or insert switch.""" |
||
109 | 1 | utc_now = datetime.utcnow() |
|
110 | 1 | model = SwitchDoc( |
|
111 | **{**switch_dict, **{"_id": dpid, "updated_at": utc_now}} |
||
112 | ) |
||
113 | 1 | updated = self.db.switches.find_one_and_update( |
|
114 | {"_id": dpid}, |
||
115 | { |
||
116 | "$set": model.model_dump(exclude={"inserted_at"}), |
||
117 | "$setOnInsert": {"inserted_at": utc_now}, |
||
118 | }, |
||
119 | return_document=ReturnDocument.AFTER, |
||
120 | upsert=True, |
||
121 | ) |
||
122 | 1 | return updated |
|
123 | |||
124 | 1 | def enable_switch(self, dpid: str) -> Optional[dict]: |
|
125 | """Try to find one switch and enable it.""" |
||
126 | 1 | return self._update_switch(dpid, {"$set": {"enabled": True}}) |
|
127 | |||
128 | 1 | def disable_switch(self, dpid: str) -> Optional[dict]: |
|
129 | """Try to find one switch and disable it.""" |
||
130 | 1 | return self._update_switch( |
|
131 | dpid, {"$set": {"enabled": False, "interfaces.$[].enabled": False}} |
||
132 | ) |
||
133 | |||
134 | 1 | def add_switch_metadata(self, dpid: str, metadata: dict) -> Optional[dict]: |
|
135 | """Try to find a switch and add to its metadata.""" |
||
136 | 1 | update_expr = { |
|
137 | "$set": {f"metadata.{k}": v for k, v in metadata.items()} |
||
138 | } |
||
139 | 1 | return self._update_switch(dpid, update_expr) |
|
140 | |||
141 | 1 | def delete_switch_metadata_key( |
|
142 | self, dpid: str, key: str |
||
143 | ) -> Optional[dict]: |
||
144 | """Try to find a switch and delete a metadata key.""" |
||
145 | 1 | return self._update_switch(dpid, {"$unset": {f"metadata.{key}": ""}}) |
|
146 | |||
147 | 1 | def enable_interface(self, interface_id: str) -> Optional[dict]: |
|
148 | """Try to enable one interface and its embedded object on links.""" |
||
149 | 1 | return self._update_interface( |
|
150 | interface_id, {"$set": {"enabled": True}} |
||
151 | ) |
||
152 | |||
153 | 1 | def disable_interface(self, interface_id: str) -> Optional[dict]: |
|
154 | """Try to disable one interface and its embedded object on links.""" |
||
155 | 1 | return self._update_interface( |
|
156 | interface_id, {"$set": {"enabled": False}} |
||
157 | ) |
||
158 | |||
159 | 1 | def add_interface_metadata( |
|
160 | self, interface_id: str, metadata: dict |
||
161 | ) -> Optional[dict]: |
||
162 | """Try to find an interface and add to its metadata.""" |
||
163 | 1 | update_expr = { |
|
164 | "$set": {f"metadata.{k}": v for k, v in metadata.items()} |
||
165 | } |
||
166 | 1 | return self._update_interface(interface_id, update_expr) |
|
167 | |||
168 | 1 | def delete_interface_metadata_key( |
|
169 | self, interface_id: str, key: str |
||
170 | ) -> Optional[dict]: |
||
171 | """Try to find an interface and delete a metadata key.""" |
||
172 | 1 | return self._update_interface( |
|
173 | interface_id, {"$unset": {f"metadata.{key}": ""}} |
||
174 | ) |
||
175 | |||
176 | 1 | def _update_interface( |
|
177 | self, interface_id: str, update_expr: dict |
||
178 | ) -> Optional[dict]: |
||
179 | """Try to update one interface and its embedded object on links.""" |
||
180 | 1 | self._set_updated_at(update_expr) |
|
181 | 1 | interfaces_expression = {} |
|
182 | 1 | for operator, values in update_expr.items(): |
|
183 | 1 | interfaces_expression[operator] = { |
|
184 | f"interfaces.$.{k}": v for k, v in values.items() |
||
185 | } |
||
186 | 1 | return self.db.switches.find_one_and_update( |
|
187 | {"interfaces.id": interface_id}, |
||
188 | interfaces_expression, |
||
189 | return_document=ReturnDocument.AFTER, |
||
190 | ) |
||
191 | |||
192 | 1 | def upsert_link(self, link_id: str, link_dict: dict) -> dict: |
|
193 | """Update or insert a Link.""" |
||
194 | 1 | utc_now = datetime.utcnow() |
|
195 | |||
196 | 1 | endpoint_a = link_dict.get("endpoint_a") |
|
197 | 1 | endpoint_b = link_dict.get("endpoint_b") |
|
198 | 1 | model = LinkDoc( |
|
199 | **{ |
||
200 | **link_dict, |
||
201 | **{ |
||
202 | "updated_at": utc_now, |
||
203 | "_id": link_id, |
||
204 | "endpoints": [endpoint_a, endpoint_b], |
||
205 | }, |
||
206 | } |
||
207 | ) |
||
208 | 1 | updated = self.db.links.find_one_and_update( |
|
209 | {"_id": link_id}, |
||
210 | { |
||
211 | "$set": model.model_dump(exclude={"inserted_at"}), |
||
212 | "$setOnInsert": {"inserted_at": utc_now}, |
||
213 | }, |
||
214 | return_document=ReturnDocument.AFTER, |
||
215 | upsert=True, |
||
216 | ) |
||
217 | 1 | self.db.switches.find_one_and_update( |
|
218 | {"interfaces.id": endpoint_a}, |
||
219 | { |
||
220 | "$set": { |
||
221 | "interfaces.$.link_id": link_id, |
||
222 | "interfaces.$.link_side": "endpoint_a", |
||
223 | "updated_at": utc_now, |
||
224 | } |
||
225 | }, |
||
226 | ) |
||
227 | 1 | self.db.switches.find_one_and_update( |
|
228 | {"interfaces.id": endpoint_b}, |
||
229 | { |
||
230 | "$set": { |
||
231 | "interfaces.$.link_id": link_id, |
||
232 | "interfaces.$.link_side": "endpoint_b", |
||
233 | "updated_at": utc_now, |
||
234 | } |
||
235 | }, |
||
236 | ) |
||
237 | 1 | return updated |
|
238 | |||
239 | 1 | def _update_link(self, link_id: str, update_expr: dict) -> Optional[dict]: |
|
240 | """Try to find one link and update it given an update expression.""" |
||
241 | 1 | self._set_updated_at(update_expr) |
|
242 | 1 | return self.db.links.find_one_and_update({"_id": link_id}, update_expr) |
|
243 | |||
244 | 1 | def enable_link(self, link_id: str) -> Optional[dict]: |
|
245 | """Try to find one link and enable it.""" |
||
246 | 1 | return self._update_link(link_id, {"$set": {"enabled": True}}) |
|
247 | |||
248 | 1 | def disable_link(self, link_id: str) -> Optional[dict]: |
|
249 | """Try to find one link and disable it.""" |
||
250 | 1 | return self._update_link(link_id, {"$set": {"enabled": False}}) |
|
251 | |||
252 | 1 | def bulk_disable_links(self, link_ids: set[str]) -> int: |
|
253 | """Bulk update that disables found links.""" |
||
254 | 1 | if not link_ids: |
|
255 | 1 | return 0 |
|
256 | 1 | ops = [] |
|
257 | 1 | for _id in link_ids: |
|
258 | 1 | ops.append( |
|
259 | UpdateOne( |
||
260 | {"_id": _id}, |
||
261 | { |
||
262 | "$set": |
||
263 | { |
||
264 | "updated_at": datetime.utcnow(), |
||
265 | "enabled": False, |
||
266 | } |
||
267 | } |
||
268 | ) |
||
269 | ) |
||
270 | 1 | return self.db.links.bulk_write(ops).modified_count |
|
271 | |||
272 | 1 | def add_link_metadata( |
|
273 | self, link_id: str, metadata: dict |
||
274 | ) -> Optional[dict]: |
||
275 | """Try to find link and add to its metadata.""" |
||
276 | 1 | update_expr = { |
|
277 | "$set": {f"metadata.{k}": v for k, v in metadata.items()} |
||
278 | } |
||
279 | 1 | return self._update_link(link_id, update_expr) |
|
280 | |||
281 | 1 | def delete_link_metadata_key( |
|
282 | self, link_id: str, key: str |
||
283 | ) -> Optional[dict]: |
||
284 | """Try to find a link and delete a metadata key.""" |
||
285 | 1 | return self._update_link(link_id, {"$unset": {f"metadata.{key}": ""}}) |
|
286 | |||
287 | 1 | def bulk_delete_link_metadata_key( |
|
288 | self, link_ids: List[str], key: str |
||
289 | ) -> Optional[dict]: |
||
290 | """Bulk delelete link metadata key.""" |
||
291 | update_expr = {"$unset": {f"metadata.{key}": 1}} |
||
292 | self._set_updated_at(update_expr) |
||
293 | return self.db.links.update_many({"_id": {"$in": link_ids}}, |
||
294 | update_expr) |
||
295 | |||
296 | # pylint: disable=too-many-arguments |
||
297 | 1 | def upsert_interface_details( |
|
298 | self, |
||
299 | id_: str, |
||
300 | available_tags: dict[str, list[list[int]]], |
||
301 | tag_ranges: dict[str, list[list[int]]], |
||
302 | special_available_tags: dict[str, list[str]], |
||
303 | special_tags: dict[str, list[str]] |
||
304 | ) -> Optional[dict]: |
||
305 | """Update or insert interfaces details.""" |
||
306 | 1 | utc_now = datetime.utcnow() |
|
307 | 1 | model = InterfaceDetailDoc(**{ |
|
308 | "_id": id_, |
||
309 | "available_tags": available_tags, |
||
310 | "tag_ranges": tag_ranges, |
||
311 | "special_available_tags": special_available_tags, |
||
312 | "special_tags": special_tags, |
||
313 | "updated_at": utc_now |
||
314 | }).model_dump(exclude={"inserted_at"}) |
||
315 | 1 | updated = self.db.interface_details.find_one_and_update( |
|
316 | {"_id": id_}, |
||
317 | { |
||
318 | "$set": model, |
||
319 | "$setOnInsert": {"inserted_at": utc_now}, |
||
320 | }, |
||
321 | return_document=ReturnDocument.AFTER, |
||
322 | upsert=True, |
||
323 | ) |
||
324 | 1 | return updated |
|
325 | |||
326 | 1 | def get_interfaces_details( |
|
327 | self, interface_ids: List[str] |
||
328 | ) -> Optional[dict]: |
||
329 | """Try to get interfaces details given a list of interface ids.""" |
||
330 | 1 | return self.db.interface_details.aggregate( |
|
331 | [ |
||
332 | {"$match": {"_id": {"$in": interface_ids}}}, |
||
333 | ] |
||
334 | ) |
||
335 | |||
336 | 1 | def delete_link(self, link_id: str) -> Optional[dict]: |
|
337 | """Delete a link by its id.""" |
||
338 | 1 | return self.db.links.find_one_and_delete( |
|
339 | {"_id": link_id} |
||
340 | ) |
||
341 | |||
342 | 1 | def delete_switch_data( |
|
343 | self, |
||
344 | switch_dpid: str |
||
345 | ) -> tuple[Optional[dict], int]: |
||
346 | """Delete a switch related data in database.""" |
||
347 | 1 | regex = re.compile(rf'^{switch_dpid}:\d+$') |
|
348 | 1 | det_result = self.db.interface_details.delete_many( |
|
349 | {"_id": {"$regex": regex}} |
||
350 | ) |
||
351 | 1 | swt_result = self.db.switches.find_one_and_delete( |
|
352 | {"_id": switch_dpid} |
||
353 | ) |
||
354 | 1 | return (swt_result, det_result.deleted_count) |
|
355 | |||
356 | 1 | def delete_interface_from_details(self, intf_id: str) -> Optional[dict]: |
|
357 | """Delete interface from interface_details.""" |
||
358 | 1 | return self.db.interface_details.find_one_and_delete( |
|
359 | {"_id": intf_id} |
||
360 | ) |
||
361 |