Passed
Pull Request — master (#166)
by Vinicius
06:28 queued 03:15
created

build.controllers   A

Complexity

Total Complexity 32

Size/Duplication

Total Lines 307
Duplicated Lines 0 %

Test Coverage

Coverage 97.17%

Importance

Changes 0
Metric Value
eloc 199
dl 0
loc 307
ccs 103
cts 106
cp 0.9717
rs 9.84
c 0
b 0
f 0
wmc 32

27 Methods

Rating   Name   Duplication   Size   Complexity  
A TopoController.add_interface_metadata() 0 8 1
A TopoController.add_link_metadata() 0 8 1
A TopoController.delete_link_metadata_key() 0 5 1
A TopoController._update_interface() 0 14 2
A TopoController.enable_link() 0 3 1
A TopoController._update_link() 0 4 1
A TopoController.disable_link() 0 3 1
A TopoController.delete_interface_metadata_key() 0 6 1
A TopoController.upsert_link() 0 46 1
A TopoController.disable_interface() 0 4 1
A TopoController.bootstrap_indexes() 0 10 3
A TopoController.upsert_interface_details() 0 24 1
A TopoController.disable_switch() 0 4 1
A TopoController.enable_interface() 0 4 1
A TopoController.__init__() 0 5 2
A TopoController.bulk_delete_link_metadata_key() 0 8 1
A TopoController.enable_switch() 0 3 1
A TopoController.get_topology() 0 5 1
A TopoController.get_links() 0 9 1
A TopoController.add_switch_metadata() 0 6 1
A TopoController._update_switch() 0 4 1
A TopoController.delete_switch_metadata_key() 0 5 1
A TopoController.get_switches() 0 9 1
A TopoController._set_updated_at() 0 7 2
A TopoController.upsert_switch() 0 16 1
A TopoController.get_interfaces() 0 11 1
A TopoController.get_interfaces_details() 0 7 1
1
"""TopoController."""
2
3
# pylint: disable=invalid-name
4 1
import os
5 1
from datetime import datetime
6 1
from threading import Lock
7 1
from typing import List, Optional, Tuple
8
9 1
import pymongo
10 1
from pymongo.collection import ReturnDocument
11 1
from pymongo.errors import AutoReconnect
12 1
from tenacity import retry_if_exception_type, stop_after_attempt, wait_random
13
14 1
from kytos.core import log
15 1
from kytos.core.db import Mongo
16 1
from kytos.core.interface import Interface
17 1
from kytos.core.retry import before_sleep, for_all_methods, retries
18 1
from napps.kytos.topology.db.models import (InterfaceDetailDoc, LinkDoc,
19
                                            SwitchDoc)
20
21
22 1
@for_all_methods(
23
    retries,
24
    stop=stop_after_attempt(
25
        int(os.environ.get("MONGO_AUTO_RETRY_STOP_AFTER_ATTEMPT", 3))
26
    ),
27
    wait=wait_random(
28
        min=int(os.environ.get("MONGO_AUTO_RETRY_WAIT_RANDOM_MIN", 0.1)),
29
        max=int(os.environ.get("MONGO_AUTO_RETRY_WAIT_RANDOM_MAX", 1)),
30
    ),
31
    before_sleep=before_sleep,
32
    retry=retry_if_exception_type((AutoReconnect,)),
33
)
34 1
class TopoController:
35
    """TopoController."""
36
37 1
    def __init__(self, get_mongo=lambda: Mongo()) -> None:
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable Mongo does not seem to be defined.
Loading history...
38
        """Constructor of TopoController."""
39 1
        self.mongo = get_mongo()
40 1
        self.db_client = self.mongo.client
41 1
        self.db = self.db_client[self.mongo.db_name]
42
43 1
    def bootstrap_indexes(self) -> None:
44
        """Bootstrap all topology related indexes."""
45 1
        index_tuples = [
46
            ("switches", [("interfaces.id", pymongo.ASCENDING)]),
47
            ("links", [("endpoints.id", pymongo.ASCENDING)]),
48
        ]
49 1
        for collection, keys in index_tuples:
50 1
            if self.mongo.bootstrap_index(collection, keys):
51 1
                log.info(
52
                    f"Created DB index {keys}, collection: {collection})"
53
                )
54
55 1
    def get_topology(self) -> dict:
56
        """Get topology from DB."""
57 1
        switches = self.get_switches()
58 1
        links = self.get_links()
59 1
        return {"topology": {**links, **switches}}
60
61 1
    def get_switches(self) -> dict:
62
        """Get switches from DB."""
63 1
        switches = self.db.switches.aggregate(
64
            [
65
                {"$sort": {"_id": 1}},
66
                {"$project": SwitchDoc.projection()},
67
            ]
68
        )
69 1
        return {"switches": {value["id"]: value for value in switches}}
70
71 1
    def get_links(self) -> dict:
72
        """Get links from DB."""
73 1
        links = self.db.links.aggregate(
74
            [
75
                {"$sort": {"_id": 1}},
76
                {"$project": LinkDoc.projection()},
77
            ]
78
        )
79 1
        return {"links": {value["id"]: value for value in links}}
80
81 1
    def get_interfaces(self) -> dict:
82
        """Get interfaces from DB."""
83 1
        interfaces = self.db.switches.aggregate(
84
            [
85
                {"$sort": {"_id": 1}},
86
                {"$project": {"interfaces": 1, "_id": 0}},
87
                {"$unwind": "$interfaces"},
88
                {"$replaceRoot": {"newRoot": "$interfaces"}},
89
            ]
90
        )
91 1
        return {"interfaces": {value["id"]: value for value in interfaces}}
92
93 1
    @staticmethod
94 1
    def _set_updated_at(update_expr: dict) -> None:
95
        """Set updated_at on $set expression."""
96 1
        if "$set" in update_expr:
97 1
            update_expr["$set"].update({"updated_at": datetime.utcnow()})
98
        else:
99 1
            update_expr.update({"$set": {"updated_at": datetime.utcnow()}})
100
101 1
    def _update_switch(self, dpid: str, update_expr: dict) -> Optional[dict]:
102
        """Try to find one switch and update it given an update expression."""
103 1
        self._set_updated_at(update_expr)
104 1
        return self.db.switches.find_one_and_update({"_id": dpid}, update_expr)
105
106 1
    def upsert_switch(self, dpid: str, switch_dict: dict) -> Optional[dict]:
107
        """Update or insert switch."""
108 1
        utc_now = datetime.utcnow()
109 1
        model = SwitchDoc(
110
            **{**switch_dict, **{"_id": dpid, "updated_at": utc_now}}
111
        )
112 1
        updated = self.db.switches.find_one_and_update(
113
            {"_id": dpid},
114
            {
115
                "$set": model.dict(exclude={"inserted_at"}),
116
                "$setOnInsert": {"inserted_at": utc_now},
117
            },
118
            return_document=ReturnDocument.AFTER,
119
            upsert=True,
120
        )
121 1
        return updated
122
123 1
    def enable_switch(self, dpid: str) -> Optional[dict]:
124
        """Try to find one switch and enable it."""
125 1
        return self._update_switch(dpid, {"$set": {"enabled": True}})
126
127 1
    def disable_switch(self, dpid: str) -> Optional[dict]:
128
        """Try to find one switch and disable it."""
129 1
        return self._update_switch(
130
            dpid, {"$set": {"enabled": False, "interfaces.$[].enabled": False}}
131
        )
132
133 1
    def add_switch_metadata(self, dpid: str, metadata: dict) -> Optional[dict]:
134
        """Try to find a switch and add to its metadata."""
135 1
        update_expr = {
136
            "$set": {f"metadata.{k}": v for k, v in metadata.items()}
137
        }
138 1
        return self._update_switch(dpid, update_expr)
139
140 1
    def delete_switch_metadata_key(
141
        self, dpid: str, key: str
142
    ) -> Optional[dict]:
143
        """Try to find a switch and delete a metadata key."""
144 1
        return self._update_switch(dpid, {"$unset": {f"metadata.{key}": ""}})
145
146 1
    def enable_interface(self, interface_id: str) -> Optional[dict]:
147
        """Try to enable one interface and its embedded object on links."""
148 1
        return self._update_interface(
149
            interface_id, {"$set": {"enabled": True}}
150
        )
151
152 1
    def disable_interface(self, interface_id: str) -> Optional[dict]:
153
        """Try to disable one interface and its embedded object on links."""
154 1
        return self._update_interface(
155
            interface_id, {"$set": {"enabled": False}}
156
        )
157
158 1
    def add_interface_metadata(
159
        self, interface_id: str, metadata: dict
160
    ) -> Optional[dict]:
161
        """Try to find an interface and add to its metadata."""
162 1
        update_expr = {
163
            "$set": {f"metadata.{k}": v for k, v in metadata.items()}
164
        }
165 1
        return self._update_interface(interface_id, update_expr)
166
167 1
    def delete_interface_metadata_key(
168
        self, interface_id: str, key: str
169
    ) -> Optional[dict]:
170
        """Try to find an interface and delete a metadata key."""
171 1
        return self._update_interface(
172
            interface_id, {"$unset": {f"metadata.{key}": ""}}
173
        )
174
175 1
    def _update_interface(
176
        self, interface_id: str, update_expr: dict
177
    ) -> Optional[dict]:
178
        """Try to update one interface and its embedded object on links."""
179 1
        self._set_updated_at(update_expr)
180 1
        interfaces_expression = {}
181 1
        for operator, values in update_expr.items():
182 1
            interfaces_expression[operator] = {
183
                f"interfaces.$.{k}": v for k, v in values.items()
184
            }
185 1
        return self.db.switches.find_one_and_update(
186
            {"interfaces.id": interface_id},
187
            interfaces_expression,
188
            return_document=ReturnDocument.AFTER,
189
        )
190
191 1
    def upsert_link(self, link_id: str, link_dict: dict) -> dict:
192
        """Update or insert a Link."""
193 1
        utc_now = datetime.utcnow()
194
195 1
        endpoint_a = link_dict.get("endpoint_a")
196 1
        endpoint_b = link_dict.get("endpoint_b")
197 1
        model = LinkDoc(
198
            **{
199
                **link_dict,
200
                **{
201
                    "updated_at": utc_now,
202
                    "_id": link_id,
203
                    "endpoints": [endpoint_a, endpoint_b],
204
                },
205
            }
206
        )
207 1
        updated = self.db.links.find_one_and_update(
208
            {"_id": link_id},
209
            {
210
                "$set": model.dict(exclude={"inserted_at"}),
211
                "$setOnInsert": {"inserted_at": utc_now},
212
            },
213
            return_document=ReturnDocument.AFTER,
214
            upsert=True,
215
        )
216 1
        self.db.switches.find_one_and_update(
217
            {"interfaces.id": endpoint_a},
218
            {
219
                "$set": {
220
                    "interfaces.$.link_id": link_id,
221
                    "interfaces.$.link_side": "endpoint_a",
222
                    "updated_at": utc_now,
223
                }
224
            },
225
        )
226 1
        self.db.switches.find_one_and_update(
227
            {"interfaces.id": endpoint_b},
228
            {
229
                "$set": {
230
                    "interfaces.$.link_id": link_id,
231
                    "interfaces.$.link_side": "endpoint_b",
232
                    "updated_at": utc_now,
233
                }
234
            },
235
        )
236 1
        return updated
237
238 1
    def _update_link(self, link_id: str, update_expr: dict) -> Optional[dict]:
239
        """Try to find one link and update it given an update expression."""
240 1
        self._set_updated_at(update_expr)
241 1
        return self.db.links.find_one_and_update({"_id": link_id}, update_expr)
242
243 1
    def enable_link(self, link_id: str) -> Optional[dict]:
244
        """Try to find one link and enable it."""
245 1
        return self._update_link(link_id, {"$set": {"enabled": True}})
246
247 1
    def disable_link(self, link_id: str) -> Optional[dict]:
248
        """Try to find one link and disable it."""
249 1
        return self._update_link(link_id, {"$set": {"enabled": False}})
250
251 1
    def add_link_metadata(
252
        self, link_id: str, metadata: dict
253
    ) -> Optional[dict]:
254
        """Try to find link and add to its metadata."""
255 1
        update_expr = {
256
            "$set": {f"metadata.{k}": v for k, v in metadata.items()}
257
        }
258 1
        return self._update_link(link_id, update_expr)
259
260 1
    def delete_link_metadata_key(
261
        self, link_id: str, key: str
262
    ) -> Optional[dict]:
263
        """Try to find a link and delete a metadata key."""
264 1
        return self._update_link(link_id, {"$unset": {f"metadata.{key}": ""}})
265
266 1
    def bulk_delete_link_metadata_key(
267
        self, link_ids: List[str], key: str
268
    ) -> Optional[dict]:
269
        """Bulk delelete link metadata key."""
270
        update_expr = {"$unset": {f"metadata.{key}": 1}}
271
        self._set_updated_at(update_expr)
272
        return self.db.links.update_many({"_id": {"$in": link_ids}},
273
                                         update_expr)
274
275 1
    def upsert_interface_details(
276
        self,
277
        id_: str,
278
        available_tags: dict[str, list[list[int]]],
279
        tag_ranges: dict[str, list[list[int]]]
280
    ) -> Optional[dict]:
281
        """Update or insert interfaces details."""
282 1
        utc_now = datetime.utcnow()
283 1
        model = InterfaceDetailDoc(**{
284
                "_id": id_,
285
                "available_tags": available_tags,
286
                "tag_ranges": tag_ranges,
287
                "updated_at": utc_now
288
        }).dict(exclude={"inserted_at"})
289 1
        updated = self.db.interface_details.find_one_and_update(
290
            {"_id": id_},
291
            {
292
                "$set": model,
293
                "$setOnInsert": {"inserted_at": utc_now},
294
            },
295
            return_document=ReturnDocument.AFTER,
296
            upsert=True,
297
        )
298 1
        return updated
299
300 1
    def get_interfaces_details(
301
        self, interface_ids: List[str]
302
    ) -> Optional[dict]:
303
        """Try to get interfaces details given a list of interface ids."""
304 1
        return self.db.interface_details.aggregate(
305
            [
306
                {"$match": {"_id": {"$in": interface_ids}}},
307
            ]
308
        )
309