Passed
Pull Request — master (#98)
by Vinicius
03:12
created

build.controllers   A

Complexity

Total Complexity 41

Size/Duplication

Total Lines 367
Duplicated Lines 0 %

Test Coverage

Coverage 97.58%

Importance

Changes 0
Metric Value
eloc 233
dl 0
loc 367
ccs 121
cts 124
cp 0.9758
rs 9.1199
c 0
b 0
f 0
wmc 41

32 Methods

Rating   Name   Duplication   Size   Complexity  
A TopoController.add_interface_metadata() 0 8 1
A TopoController.get_links() 0 9 1
A TopoController.deactivate_link() 0 14 1
A TopoController.get_switches() 0 9 1
A TopoController.deactivate_switch() 0 3 1
A TopoController.enable_interface() 0 4 1
A TopoController.delete_interface_metadata_key() 0 6 1
A TopoController.enable_switch() 0 3 1
A TopoController.disable_interface() 0 4 1
A TopoController.bootstrap_indexes() 0 10 3
A TopoController.get_topology() 0 5 1
A TopoController.add_switch_metadata() 0 6 1
A TopoController._update_switch() 0 4 1
A TopoController.add_link_metadata() 0 8 1
A TopoController.disable_switch() 0 4 1
A TopoController.delete_link_metadata_key() 0 5 1
A TopoController.delete_switch_metadata_key() 0 5 1
A TopoController._update_interface() 0 14 2
A TopoController.activate_interface() 0 3 1
A TopoController.enable_link() 0 3 1
A TopoController._update_link() 0 4 1
A TopoController.disable_link() 0 3 1
A TopoController._set_updated_at() 0 7 2
A TopoController.__init__() 0 6 2
A TopoController.upsert_switch() 0 16 1
A TopoController.get_interfaces() 0 11 1
A TopoController.deactivate_interface() 0 4 1
A TopoController.upsert_link() 0 46 1
A TopoController.activate_link() 0 14 1
B TopoController.bulk_upsert_interface_details() 0 31 5
A TopoController.bulk_delete_link_metadata_key() 0 8 1
A TopoController.get_interfaces_details() 0 7 1

How to fix   Complexity   

Complexity

Complex classes like build.controllers often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""TopoController."""
2
3
# pylint: disable=invalid-name
4 1
import os
5 1
import time
6 1
from datetime import datetime
7 1
from threading import Lock
8 1
from typing import List, Optional, Tuple
9
10 1
import pymongo
11 1
from pymongo.collection import ReturnDocument
12 1
from pymongo.errors import AutoReconnect
13 1
from pymongo.operations import UpdateOne
14 1
from tenacity import retry_if_exception_type, stop_after_attempt, wait_random
15
16 1
from kytos.core import log
17 1
from kytos.core.db import Mongo
18 1
from kytos.core.retry import before_sleep, for_all_methods, retries
19 1
from napps.kytos.topology.db.models import (InterfaceDetailDoc, LinkDoc,
20
                                            SwitchDoc)
21
22
23 1
@for_all_methods(
24
    retries,
25
    stop=stop_after_attempt(
26
        int(os.environ.get("MONGO_AUTO_RETRY_STOP_AFTER_ATTEMPT", 3))
27
    ),
28
    wait=wait_random(
29
        min=int(os.environ.get("MONGO_AUTO_RETRY_WAIT_RANDOM_MIN", 0.1)),
30
        max=int(os.environ.get("MONGO_AUTO_RETRY_WAIT_RANDOM_MAX", 1)),
31
    ),
32
    before_sleep=before_sleep,
33
    retry=retry_if_exception_type((AutoReconnect,)),
34
)
35 1
class TopoController:
36
    """TopoController."""
37
38 1
    def __init__(self, get_mongo=lambda: Mongo()) -> None:
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable Mongo does not seem to be defined.
Loading history...
39
        """Constructor of TopoController."""
40 1
        self.mongo = get_mongo()
41 1
        self.db_client = self.mongo.client
42 1
        self.db = self.db_client[self.mongo.db_name]
43 1
        self.interface_details_lock = Lock()
44
45 1
    def bootstrap_indexes(self) -> None:
46
        """Bootstrap all topology related indexes."""
47 1
        index_tuples = [
48
            ("switches", [("interfaces.id", pymongo.ASCENDING)]),
49
            ("links", [("endpoints.id", pymongo.ASCENDING)]),
50
        ]
51 1
        for collection, keys in index_tuples:
52 1
            if self.mongo.bootstrap_index(collection, keys):
53 1
                log.info(
54
                    f"Created DB index {keys}, collection: {collection})"
55
                )
56
57 1
    def get_topology(self) -> dict:
58
        """Get topology from DB."""
59 1
        switches = self.get_switches()
60 1
        links = self.get_links()
61 1
        return {"topology": {**links, **switches}}
62
63 1
    def get_switches(self) -> dict:
64
        """Get switches from DB."""
65 1
        switches = self.db.switches.aggregate(
66
            [
67
                {"$sort": {"_id": 1}},
68
                {"$project": SwitchDoc.projection()},
69
            ]
70
        )
71 1
        return {"switches": {value["id"]: value for value in switches}}
72
73 1
    def get_links(self) -> dict:
74
        """Get links from DB."""
75 1
        links = self.db.links.aggregate(
76
            [
77
                {"$sort": {"_id": 1}},
78
                {"$project": LinkDoc.projection()},
79
            ]
80
        )
81 1
        return {"links": {value["id"]: value for value in links}}
82
83 1
    def get_interfaces(self) -> dict:
84
        """Get interfaces from DB."""
85 1
        interfaces = self.db.switches.aggregate(
86
            [
87
                {"$sort": {"_id": 1}},
88
                {"$project": {"interfaces": 1, "_id": 0}},
89
                {"$unwind": "$interfaces"},
90
                {"$replaceRoot": {"newRoot": "$interfaces"}},
91
            ]
92
        )
93 1
        return {"interfaces": {value["id"]: value for value in interfaces}}
94
95 1
    @staticmethod
96 1
    def _set_updated_at(update_expr: dict) -> None:
97
        """Set updated_at on $set expression."""
98 1
        if "$set" in update_expr:
99 1
            update_expr["$set"].update({"updated_at": datetime.utcnow()})
100
        else:
101 1
            update_expr.update({"$set": {"updated_at": datetime.utcnow()}})
102
103 1
    def _update_switch(self, dpid: str, update_expr: dict) -> Optional[dict]:
104
        """Try to find one switch and update it given an update expression."""
105 1
        self._set_updated_at(update_expr)
106 1
        return self.db.switches.find_one_and_update({"_id": dpid}, update_expr)
107
108 1
    def upsert_switch(self, dpid: str, switch_dict: dict) -> Optional[dict]:
109
        """Update or insert switch."""
110 1
        utc_now = datetime.utcnow()
111 1
        model = SwitchDoc(
112
            **{**switch_dict, **{"_id": dpid, "updated_at": utc_now}}
113
        )
114 1
        updated = self.db.switches.find_one_and_update(
115
            {"_id": dpid},
116
            {
117
                "$set": model.dict(exclude={"inserted_at"}),
118
                "$setOnInsert": {"inserted_at": utc_now},
119
            },
120
            return_document=ReturnDocument.AFTER,
121
            upsert=True,
122
        )
123 1
        return updated
124
125 1
    def enable_switch(self, dpid: str) -> Optional[dict]:
126
        """Try to find one switch and enable it."""
127 1
        return self._update_switch(dpid, {"$set": {"enabled": True}})
128
129 1
    def deactivate_switch(self, dpid: str) -> Optional[dict]:
130
        """Try to find one switch and deactivate it."""
131 1
        return self._update_switch(dpid, {"$set": {"active": False}})
132
133 1
    def disable_switch(self, dpid: str) -> Optional[dict]:
134
        """Try to find one switch and disable it."""
135 1
        return self._update_switch(
136
            dpid, {"$set": {"enabled": False, "interfaces.$[].enabled": False}}
137
        )
138
139 1
    def add_switch_metadata(self, dpid: str, metadata: dict) -> Optional[dict]:
140
        """Try to find a switch and add to its metadata."""
141 1
        update_expr = {
142
            "$set": {f"metadata.{k}": v for k, v in metadata.items()}
143
        }
144 1
        return self._update_switch(dpid, update_expr)
145
146 1
    def delete_switch_metadata_key(
147
        self, dpid: str, key: str
148
    ) -> Optional[dict]:
149
        """Try to find a switch and delete a metadata key."""
150 1
        return self._update_switch(dpid, {"$unset": {f"metadata.{key}": ""}})
151
152 1
    def enable_interface(self, interface_id: str) -> Optional[dict]:
153
        """Try to enable one interface and its embedded object on links."""
154 1
        return self._update_interface(
155
            interface_id, {"$set": {"enabled": True}}
156
        )
157
158 1
    def disable_interface(self, interface_id: str) -> Optional[dict]:
159
        """Try to disable one interface and its embedded object on links."""
160 1
        return self._update_interface(
161
            interface_id, {"$set": {"enabled": False}}
162
        )
163
164 1
    def activate_interface(self, interface_id: str) -> Optional[dict]:
165
        """Try to activate one interface."""
166 1
        return self._update_interface(interface_id, {"$set": {"active": True}})
167
168 1
    def deactivate_interface(self, interface_id: str) -> Optional[dict]:
169
        """Try to deactivate one interface."""
170 1
        return self._update_interface(
171
            interface_id, {"$set": {"active": False}}
172
        )
173
174 1
    def add_interface_metadata(
175
        self, interface_id: str, metadata: dict
176
    ) -> Optional[dict]:
177
        """Try to find an interface and add to its metadata."""
178 1
        update_expr = {
179
            "$set": {f"metadata.{k}": v for k, v in metadata.items()}
180
        }
181 1
        return self._update_interface(interface_id, update_expr)
182
183 1
    def delete_interface_metadata_key(
184
        self, interface_id: str, key: str
185
    ) -> Optional[dict]:
186
        """Try to find an interface and delete a metadata key."""
187 1
        return self._update_interface(
188
            interface_id, {"$unset": {f"metadata.{key}": ""}}
189
        )
190
191 1
    def _update_interface(
192
        self, interface_id: str, update_expr: dict
193
    ) -> Optional[dict]:
194
        """Try to update one interface and its embedded object on links."""
195 1
        self._set_updated_at(update_expr)
196 1
        interfaces_expression = {}
197 1
        for operator, values in update_expr.items():
198 1
            interfaces_expression[operator] = {
199
                f"interfaces.$.{k}": v for k, v in values.items()
200
            }
201 1
        return self.db.switches.find_one_and_update(
202
            {"interfaces.id": interface_id},
203
            interfaces_expression,
204
            return_document=ReturnDocument.AFTER,
205
        )
206
207 1
    def upsert_link(self, link_id: str, link_dict: dict) -> dict:
208
        """Update or insert a Link."""
209 1
        utc_now = datetime.utcnow()
210
211 1
        endpoint_a = link_dict.get("endpoint_a")
212 1
        endpoint_b = link_dict.get("endpoint_b")
213 1
        model = LinkDoc(
214
            **{
215
                **link_dict,
216
                **{
217
                    "updated_at": utc_now,
218
                    "_id": link_id,
219
                    "endpoints": [endpoint_a, endpoint_b],
220
                },
221
            }
222
        )
223 1
        updated = self.db.links.find_one_and_update(
224
            {"_id": link_id},
225
            {
226
                "$set": model.dict(exclude={"inserted_at"}),
227
                "$setOnInsert": {"inserted_at": utc_now},
228
            },
229
            return_document=ReturnDocument.AFTER,
230
            upsert=True,
231
        )
232 1
        self.db.switches.find_one_and_update(
233
            {"interfaces.id": endpoint_a},
234
            {
235
                "$set": {
236
                    "interfaces.$.link_id": link_id,
237
                    "interfaces.$.link_side": "endpoint_a",
238
                    "updated_at": utc_now,
239
                }
240
            },
241
        )
242 1
        self.db.switches.find_one_and_update(
243
            {"interfaces.id": endpoint_b},
244
            {
245
                "$set": {
246
                    "interfaces.$.link_id": link_id,
247
                    "interfaces.$.link_side": "endpoint_b",
248
                    "updated_at": utc_now,
249
                }
250
            },
251
        )
252 1
        return updated
253
254 1
    def _update_link(self, link_id: str, update_expr: dict) -> Optional[dict]:
255
        """Try to find one link and update it given an update expression."""
256 1
        self._set_updated_at(update_expr)
257 1
        return self.db.links.find_one_and_update({"_id": link_id}, update_expr)
258
259 1
    def enable_link(self, link_id: str) -> Optional[dict]:
260
        """Try to find one link and enable it."""
261 1
        return self._update_link(link_id, {"$set": {"enabled": True}})
262
263 1
    def disable_link(self, link_id: str) -> Optional[dict]:
264
        """Try to find one link and disable it."""
265 1
        return self._update_link(link_id, {"$set": {"enabled": False}})
266
267 1
    def deactivate_link(self, link_id: str,
268
                        last_status_change: Optional[float] = None,
269
                        last_status_is_active=False
270
                        ) -> Optional[dict]:
271
        """Try to find one link and deactivate it."""
272
        # It might be worth using datetime in the future
273 1
        last_status_change = last_status_change or time.time()
274 1
        return self._update_link(
275
            link_id,
276
            {
277
                "$set": {
278
                    "metadata.last_status_change": last_status_change,
279
                    "metadata.last_status_is_active": last_status_is_active,
280
                    "active": False,
281
                }
282
            },
283
        )
284
285 1
    def activate_link(self, link_id: str,
286
                      last_status_change: Optional[float] = None,
287
                      last_status_is_active=True
288
                      ) -> Optional[dict]:
289
        """Try to find one link and activate it."""
290
        # It might be worth using datetime in the future
291 1
        last_status_change = last_status_change or time.time()
292 1
        return self._update_link(
293
            link_id,
294
            {
295
                "$set": {
296
                    "metadata.last_status_change": last_status_change,
297
                    "metadata.last_status_is_active": last_status_is_active,
298
                    "active": True,
299
                }
300
            },
301
        )
302
303 1
    def add_link_metadata(
304
        self, link_id: str, metadata: dict
305
    ) -> Optional[dict]:
306
        """Try to find link and add to its metadata."""
307 1
        update_expr = {
308
            "$set": {f"metadata.{k}": v for k, v in metadata.items()}
309
        }
310 1
        return self._update_link(link_id, update_expr)
311
312 1
    def delete_link_metadata_key(
313
        self, link_id: str, key: str
314
    ) -> Optional[dict]:
315
        """Try to find a link and delete a metadata key."""
316 1
        return self._update_link(link_id, {"$unset": {f"metadata.{key}": ""}})
317
318 1
    def bulk_delete_link_metadata_key(
319
        self, link_ids: List[str], key: str
320
    ) -> Optional[dict]:
321
        """Bulk delelete link metadata key."""
322
        update_expr = {"$unset": {f"metadata.{key}": 1}}
323
        self._set_updated_at(update_expr)
324
        return self.db.links.update_many({"_id": {"$in": link_ids}},
325
                                         update_expr)
326
327 1
    def bulk_upsert_interface_details(
328
        self, ids_details: List[Tuple[str, dict]]
329
    ) -> Optional[dict]:
330
        """Update or insert interfaces details."""
331 1
        utc_now = datetime.utcnow()
332 1
        ops = []
333 1
        for _id, detail_dict in ids_details:
334 1
            ops.append(
335
                UpdateOne(
336
                    {"_id": _id},
337
                    {
338
                        "$set": InterfaceDetailDoc(
339
                            **{
340
                                **detail_dict,
341
                                **{
342
                                    "updated_at": utc_now,
343
                                    "_id": _id,
344
                                },
345
                            }
346
                        ).dict(exclude={"inserted_at"}),
347
                        "$setOnInsert": {"inserted_at": utc_now},
348
                    },
349
                    upsert=True,
350
                ),
351
            )
352
353 1
        with self.interface_details_lock:
354 1
            with self.db_client.start_session() as session:
355 1
                with session.start_transaction():
356 1
                    return self.db.interface_details.bulk_write(
357
                        ops, ordered=False, session=session
358
                    )
359
360 1
    def get_interfaces_details(
361
        self, interface_ids: List[str]
362
    ) -> Optional[dict]:
363
        """Try to get interfaces details given a list of interface ids."""
364 1
        return self.db.interface_details.aggregate(
365
            [
366
                {"$match": {"_id": {"$in": interface_ids}}},
367
            ]
368
        )
369