Passed
Push — master ( 70402e...0aafab )
by Vinicius
08:31 queued 06:24
created

build.controllers.TopoController.delete_link()   A

Complexity

Conditions 1

Size

Total Lines 4
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 1

Importance

Changes 0
Metric Value
cc 1
eloc 3
nop 2
dl 0
loc 4
ccs 2
cts 2
cp 1
crap 1
rs 10
c 0
b 0
f 0
1
"""TopoController."""
2
3
# pylint: disable=invalid-name
4 1
import os
5 1
from datetime import datetime
6 1
from threading import Lock
7 1
from typing import List, Optional, Tuple
8
9 1
import pymongo
10 1
from pymongo.collection import ReturnDocument
11 1
from pymongo.errors import AutoReconnect
12 1
from tenacity import retry_if_exception_type, stop_after_attempt, wait_random
13
14 1
from kytos.core import log
15 1
from kytos.core.db import Mongo
16 1
from kytos.core.retry import before_sleep, for_all_methods, retries
17 1
from napps.kytos.topology.db.models import (InterfaceDetailDoc, LinkDoc,
18
                                            SwitchDoc)
19
20
21 1
@for_all_methods(
22
    retries,
23
    stop=stop_after_attempt(
24
        int(os.environ.get("MONGO_AUTO_RETRY_STOP_AFTER_ATTEMPT", 3))
25
    ),
26
    wait=wait_random(
27
        min=int(os.environ.get("MONGO_AUTO_RETRY_WAIT_RANDOM_MIN", 0.1)),
28
        max=int(os.environ.get("MONGO_AUTO_RETRY_WAIT_RANDOM_MAX", 1)),
29
    ),
30
    before_sleep=before_sleep,
31
    retry=retry_if_exception_type((AutoReconnect,)),
32
)
33 1
class TopoController:
34
    """TopoController."""
35
36 1
    def __init__(self, get_mongo=lambda: Mongo()) -> None:
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable Mongo does not seem to be defined.
Loading history...
37
        """Constructor of TopoController."""
38 1
        self.mongo = get_mongo()
39 1
        self.db_client = self.mongo.client
40 1
        self.db = self.db_client[self.mongo.db_name]
41
42 1
    def bootstrap_indexes(self) -> None:
43
        """Bootstrap all topology related indexes."""
44 1
        index_tuples = [
45
            ("switches", [("interfaces.id", pymongo.ASCENDING)]),
46
            ("links", [("endpoints.id", pymongo.ASCENDING)]),
47
        ]
48 1
        for collection, keys in index_tuples:
49 1
            if self.mongo.bootstrap_index(collection, keys):
50 1
                log.info(
51
                    f"Created DB index {keys}, collection: {collection})"
52
                )
53
54 1
    def get_topology(self) -> dict:
55
        """Get topology from DB."""
56 1
        switches = self.get_switches()
57 1
        links = self.get_links()
58 1
        return {"topology": {**links, **switches}}
59
60 1
    def get_switches(self) -> dict:
61
        """Get switches from DB."""
62 1
        switches = self.db.switches.aggregate(
63
            [
64
                {"$sort": {"_id": 1}},
65
                {"$project": SwitchDoc.projection()},
66
            ]
67
        )
68 1
        return {"switches": {value["id"]: value for value in switches}}
69
70 1
    def get_links(self) -> dict:
71
        """Get links from DB."""
72 1
        links = self.db.links.aggregate(
73
            [
74
                {"$sort": {"_id": 1}},
75
                {"$project": LinkDoc.projection()},
76
            ]
77
        )
78 1
        return {"links": {value["id"]: value for value in links}}
79
80 1
    def get_interfaces(self) -> dict:
81
        """Get interfaces from DB."""
82 1
        interfaces = self.db.switches.aggregate(
83
            [
84
                {"$sort": {"_id": 1}},
85
                {"$project": {"interfaces": 1, "_id": 0}},
86
                {"$unwind": "$interfaces"},
87
                {"$replaceRoot": {"newRoot": "$interfaces"}},
88
            ]
89
        )
90 1
        return {"interfaces": {value["id"]: value for value in interfaces}}
91
92 1
    @staticmethod
93 1
    def _set_updated_at(update_expr: dict) -> None:
94
        """Set updated_at on $set expression."""
95 1
        if "$set" in update_expr:
96 1
            update_expr["$set"].update({"updated_at": datetime.utcnow()})
97
        else:
98 1
            update_expr.update({"$set": {"updated_at": datetime.utcnow()}})
99
100 1
    def _update_switch(self, dpid: str, update_expr: dict) -> Optional[dict]:
101
        """Try to find one switch and update it given an update expression."""
102 1
        self._set_updated_at(update_expr)
103 1
        return self.db.switches.find_one_and_update({"_id": dpid}, update_expr)
104
105 1
    def upsert_switch(self, dpid: str, switch_dict: dict) -> Optional[dict]:
106
        """Update or insert switch."""
107 1
        utc_now = datetime.utcnow()
108 1
        model = SwitchDoc(
109
            **{**switch_dict, **{"_id": dpid, "updated_at": utc_now}}
110
        )
111 1
        updated = self.db.switches.find_one_and_update(
112
            {"_id": dpid},
113
            {
114
                "$set": model.dict(exclude={"inserted_at"}),
115
                "$setOnInsert": {"inserted_at": utc_now},
116
            },
117
            return_document=ReturnDocument.AFTER,
118
            upsert=True,
119
        )
120 1
        return updated
121
122 1
    def enable_switch(self, dpid: str) -> Optional[dict]:
123
        """Try to find one switch and enable it."""
124 1
        return self._update_switch(dpid, {"$set": {"enabled": True}})
125
126 1
    def disable_switch(self, dpid: str) -> Optional[dict]:
127
        """Try to find one switch and disable it."""
128 1
        return self._update_switch(
129
            dpid, {"$set": {"enabled": False, "interfaces.$[].enabled": False}}
130
        )
131
132 1
    def add_switch_metadata(self, dpid: str, metadata: dict) -> Optional[dict]:
133
        """Try to find a switch and add to its metadata."""
134 1
        update_expr = {
135
            "$set": {f"metadata.{k}": v for k, v in metadata.items()}
136
        }
137 1
        return self._update_switch(dpid, update_expr)
138
139 1
    def delete_switch_metadata_key(
140
        self, dpid: str, key: str
141
    ) -> Optional[dict]:
142
        """Try to find a switch and delete a metadata key."""
143 1
        return self._update_switch(dpid, {"$unset": {f"metadata.{key}": ""}})
144
145 1
    def enable_interface(self, interface_id: str) -> Optional[dict]:
146
        """Try to enable one interface and its embedded object on links."""
147 1
        return self._update_interface(
148
            interface_id, {"$set": {"enabled": True}}
149
        )
150
151 1
    def disable_interface(self, interface_id: str) -> Optional[dict]:
152
        """Try to disable one interface and its embedded object on links."""
153 1
        return self._update_interface(
154
            interface_id, {"$set": {"enabled": False}}
155
        )
156
157 1
    def add_interface_metadata(
158
        self, interface_id: str, metadata: dict
159
    ) -> Optional[dict]:
160
        """Try to find an interface and add to its metadata."""
161 1
        update_expr = {
162
            "$set": {f"metadata.{k}": v for k, v in metadata.items()}
163
        }
164 1
        return self._update_interface(interface_id, update_expr)
165
166 1
    def delete_interface_metadata_key(
167
        self, interface_id: str, key: str
168
    ) -> Optional[dict]:
169
        """Try to find an interface and delete a metadata key."""
170 1
        return self._update_interface(
171
            interface_id, {"$unset": {f"metadata.{key}": ""}}
172
        )
173
174 1
    def _update_interface(
175
        self, interface_id: str, update_expr: dict
176
    ) -> Optional[dict]:
177
        """Try to update one interface and its embedded object on links."""
178 1
        self._set_updated_at(update_expr)
179 1
        interfaces_expression = {}
180 1
        for operator, values in update_expr.items():
181 1
            interfaces_expression[operator] = {
182
                f"interfaces.$.{k}": v for k, v in values.items()
183
            }
184 1
        return self.db.switches.find_one_and_update(
185
            {"interfaces.id": interface_id},
186
            interfaces_expression,
187
            return_document=ReturnDocument.AFTER,
188
        )
189
190 1
    def upsert_link(self, link_id: str, link_dict: dict) -> dict:
191
        """Update or insert a Link."""
192 1
        utc_now = datetime.utcnow()
193
194 1
        endpoint_a = link_dict.get("endpoint_a")
195 1
        endpoint_b = link_dict.get("endpoint_b")
196 1
        model = LinkDoc(
197
            **{
198
                **link_dict,
199
                **{
200
                    "updated_at": utc_now,
201
                    "_id": link_id,
202
                    "endpoints": [endpoint_a, endpoint_b],
203
                },
204
            }
205
        )
206 1
        updated = self.db.links.find_one_and_update(
207
            {"_id": link_id},
208
            {
209
                "$set": model.dict(exclude={"inserted_at"}),
210
                "$setOnInsert": {"inserted_at": utc_now},
211
            },
212
            return_document=ReturnDocument.AFTER,
213
            upsert=True,
214
        )
215 1
        self.db.switches.find_one_and_update(
216
            {"interfaces.id": endpoint_a},
217
            {
218
                "$set": {
219
                    "interfaces.$.link_id": link_id,
220
                    "interfaces.$.link_side": "endpoint_a",
221
                    "updated_at": utc_now,
222
                }
223
            },
224
        )
225 1
        self.db.switches.find_one_and_update(
226
            {"interfaces.id": endpoint_b},
227
            {
228
                "$set": {
229
                    "interfaces.$.link_id": link_id,
230
                    "interfaces.$.link_side": "endpoint_b",
231
                    "updated_at": utc_now,
232
                }
233
            },
234
        )
235 1
        return updated
236
237 1
    def _update_link(self, link_id: str, update_expr: dict) -> Optional[dict]:
238
        """Try to find one link and update it given an update expression."""
239 1
        self._set_updated_at(update_expr)
240 1
        return self.db.links.find_one_and_update({"_id": link_id}, update_expr)
241
242 1
    def enable_link(self, link_id: str) -> Optional[dict]:
243
        """Try to find one link and enable it."""
244 1
        return self._update_link(link_id, {"$set": {"enabled": True}})
245
246 1
    def disable_link(self, link_id: str) -> Optional[dict]:
247
        """Try to find one link and disable it."""
248 1
        return self._update_link(link_id, {"$set": {"enabled": False}})
249
250 1
    def add_link_metadata(
251
        self, link_id: str, metadata: dict
252
    ) -> Optional[dict]:
253
        """Try to find link and add to its metadata."""
254 1
        update_expr = {
255
            "$set": {f"metadata.{k}": v for k, v in metadata.items()}
256
        }
257 1
        return self._update_link(link_id, update_expr)
258
259 1
    def delete_link_metadata_key(
260
        self, link_id: str, key: str
261
    ) -> Optional[dict]:
262
        """Try to find a link and delete a metadata key."""
263 1
        return self._update_link(link_id, {"$unset": {f"metadata.{key}": ""}})
264
265 1
    def bulk_delete_link_metadata_key(
266
        self, link_ids: List[str], key: str
267
    ) -> Optional[dict]:
268
        """Bulk delelete link metadata key."""
269
        update_expr = {"$unset": {f"metadata.{key}": 1}}
270
        self._set_updated_at(update_expr)
271
        return self.db.links.update_many({"_id": {"$in": link_ids}},
272
                                         update_expr)
273
274
    # pylint: disable=too-many-arguments
275 1
    def upsert_interface_details(
276
        self,
277
        id_: str,
278
        available_tags: dict[str, list[list[int]]],
279
        tag_ranges: dict[str, list[list[int]]],
280
        special_available_tags: dict[str, list[str]],
281
        special_tags: dict[str, list[str]]
282
    ) -> Optional[dict]:
283
        """Update or insert interfaces details."""
284 1
        utc_now = datetime.utcnow()
285 1
        model = InterfaceDetailDoc(**{
286
                "_id": id_,
287
                "available_tags": available_tags,
288
                "tag_ranges": tag_ranges,
289
                "special_available_tags": special_available_tags,
290
                "special_tags": special_tags,
291
                "updated_at": utc_now
292
        }).dict(exclude={"inserted_at"})
293 1
        updated = self.db.interface_details.find_one_and_update(
294
            {"_id": id_},
295
            {
296
                "$set": model,
297
                "$setOnInsert": {"inserted_at": utc_now},
298
            },
299
            return_document=ReturnDocument.AFTER,
300
            upsert=True,
301
        )
302 1
        return updated
303
304 1
    def get_interfaces_details(
305
        self, interface_ids: List[str]
306
    ) -> Optional[dict]:
307
        """Try to get interfaces details given a list of interface ids."""
308 1
        return self.db.interface_details.aggregate(
309
            [
310
                {"$match": {"_id": {"$in": interface_ids}}},
311
            ]
312
        )
313
314 1
    def delete_link(self, link_id: str) -> Optional[dict]:
315
        """Delete a link by its id."""
316 1
        return self.db.links.find_one_and_delete(
317
            {"_id": link_id}
318
        )
319