Passed
Pull Request — master (#84)
by Vinicius
02:48
created

build.controllers   A

Complexity

Total Complexity 40

Size/Duplication

Total Lines 343
Duplicated Lines 0 %

Test Coverage

Coverage 100%

Importance

Changes 0
Metric Value
eloc 214
dl 0
loc 343
ccs 115
cts 115
cp 1
rs 9.2
c 0
b 0
f 0
wmc 40

31 Methods

Rating   Name   Duplication   Size   Complexity  
A TopoController.add_interface_metadata() 0 8 1
A TopoController.enable_switch() 0 3 1
A TopoController.disable_interface() 0 4 1
A TopoController.bootstrap_indexes() 0 10 3
A TopoController.get_topology() 0 5 1
A TopoController.get_links() 0 9 1
A TopoController.add_switch_metadata() 0 6 1
A TopoController._update_switch() 0 4 1
A TopoController.deactivate_link() 0 14 1
A TopoController.add_link_metadata() 0 8 1
A TopoController.disable_switch() 0 4 1
A TopoController.delete_link_metadata_key() 0 5 1
A TopoController.get_switches() 0 9 1
A TopoController.deactivate_switch() 0 3 1
A TopoController.delete_switch_metadata_key() 0 5 1
A TopoController._update_interface() 0 14 2
A TopoController.activate_interface() 0 3 1
A TopoController.enable_link() 0 3 1
A TopoController._update_link() 0 4 1
A TopoController.disable_link() 0 3 1
A TopoController._set_updated_at() 0 7 2
A TopoController.enable_interface() 0 4 1
A TopoController.__init__() 0 6 2
B TopoController.bulk_upsert_interface_details() 0 31 5
A TopoController.delete_interface_metadata_key() 0 6 1
A TopoController.upsert_switch() 0 16 1
A TopoController.get_interfaces() 0 11 1
A TopoController.deactivate_interface() 0 4 1
A TopoController.upsert_link() 0 46 1
A TopoController.get_interfaces_details() 0 7 1
A TopoController.activate_link() 0 14 1

How to fix   Complexity   

Complexity

Complex classes like build.controllers often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""TopoController."""
2
3 1
import time
4
# pylint: disable=invalid-name
5 1
from datetime import datetime
6 1
from threading import Lock
7 1
from typing import List, Optional, Tuple
8
9 1
import pymongo
10 1
from pymongo.collection import ReturnDocument
11 1
from pymongo.operations import UpdateOne
12
13 1
from kytos.core import log
14 1
from kytos.core.db import Mongo
15 1
from napps.kytos.topology.db.models import (InterfaceDetailDoc, LinkDoc,
16
                                            SwitchDoc)
17
18
19 1
class TopoController:
20
    """TopoController."""
21
22 1
    def __init__(self, get_mongo=lambda: Mongo()) -> None:
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable Mongo does not seem to be defined.
Loading history...
23
        """Constructor of TopoController."""
24 1
        self.mongo = get_mongo()
25 1
        self.db_client = self.mongo.client
26 1
        self.db = self.db_client[self.mongo.db_name]
27 1
        self.interface_details_lock = Lock()
28
29 1
    def bootstrap_indexes(self) -> None:
30
        """Bootstrap all topology related indexes."""
31 1
        index_tuples = [
32
            ("switches", [("interfaces.id", pymongo.ASCENDING)]),
33
            ("links", [("endpoints.id", pymongo.ASCENDING)]),
34
        ]
35 1
        for collection, keys in index_tuples:
36 1
            if self.mongo.bootstrap_index(collection, keys):
37 1
                log.info(
38
                    f"Created DB index {keys}, "
39
                    f"collection: {collection})"
40
                )
41
42 1
    def get_topology(self) -> dict:
43
        """Get topology from DB."""
44 1
        switches = self.get_switches()
45 1
        links = self.get_links()
46 1
        return {"topology": {**links, **switches}}
47
48 1
    def get_switches(self) -> dict:
49
        """Get switches from DB."""
50 1
        switches = self.db.switches.aggregate(
51
            [
52
                {"$sort": {"_id": 1}},
53
                {"$project": SwitchDoc.projection()},
54
            ]
55
        )
56 1
        return {"switches": {value["id"]: value for value in switches}}
57
58 1
    def get_links(self) -> dict:
59
        """Get links from DB."""
60 1
        links = self.db.links.aggregate(
61
            [
62
                {"$sort": {"_id": 1}},
63
                {"$project": LinkDoc.projection()},
64
            ]
65
        )
66 1
        return {"links": {value["id"]: value for value in links}}
67
68 1
    def get_interfaces(self) -> dict:
69
        """Get interfaces from DB."""
70 1
        interfaces = self.db.switches.aggregate(
71
            [
72
                {"$sort": {"_id": 1}},
73
                {"$project": {"interfaces": 1, "_id": 0}},
74
                {"$unwind": "$interfaces"},
75
                {"$replaceRoot": {"newRoot": "$interfaces"}},
76
            ]
77
        )
78 1
        return {"interfaces": {value["id"]: value for value in interfaces}}
79
80 1
    @staticmethod
81 1
    def _set_updated_at(update_expr: dict) -> None:
82
        """Set updated_at on $set expression."""
83 1
        if "$set" in update_expr:
84 1
            update_expr["$set"].update({"updated_at": datetime.utcnow()})
85
        else:
86 1
            update_expr.update({"$set": {"updated_at": datetime.utcnow()}})
87
88 1
    def _update_switch(self, dpid: str, update_expr: dict) -> Optional[dict]:
89
        """Try to find one switch and update it given an update expression."""
90 1
        self._set_updated_at(update_expr)
91 1
        return self.db.switches.find_one_and_update({"_id": dpid}, update_expr)
92
93 1
    def upsert_switch(self, dpid: str, switch_dict: dict) -> Optional[dict]:
94
        """Update or insert switch."""
95 1
        utc_now = datetime.utcnow()
96 1
        model = SwitchDoc(
97
            **{**switch_dict, **{"_id": dpid, "updated_at": utc_now}}
98
        )
99 1
        updated = self.db.switches.find_one_and_update(
100
            {"_id": dpid},
101
            {
102
                "$set": model.dict(exclude={"inserted_at"}),
103
                "$setOnInsert": {"inserted_at": utc_now},
104
            },
105
            return_document=ReturnDocument.AFTER,
106
            upsert=True,
107
        )
108 1
        return updated
109
110 1
    def enable_switch(self, dpid: str) -> Optional[dict]:
111
        """Try to find one switch and enable it."""
112 1
        return self._update_switch(dpid, {"$set": {"enabled": True}})
113
114 1
    def deactivate_switch(self, dpid: str) -> Optional[dict]:
115
        """Try to find one switch and deactivate it."""
116 1
        return self._update_switch(dpid, {"$set": {"active": False}})
117
118 1
    def disable_switch(self, dpid: str) -> Optional[dict]:
119
        """Try to find one switch and disable it."""
120 1
        return self._update_switch(
121
            dpid, {"$set": {"enabled": False, "interfaces.$[].enabled": False}}
122
        )
123
124 1
    def add_switch_metadata(self, dpid: str, metadata: dict) -> Optional[dict]:
125
        """Try to find a switch and add to its metadata."""
126 1
        update_expr = {
127
            "$set": {f"metadata.{k}": v for k, v in metadata.items()}
128
        }
129 1
        return self._update_switch(dpid, update_expr)
130
131 1
    def delete_switch_metadata_key(
132
        self, dpid: str, key: str
133
    ) -> Optional[dict]:
134
        """Try to find a switch and delete a metadata key."""
135 1
        return self._update_switch(dpid, {"$unset": {f"metadata.{key}": ""}})
136
137 1
    def enable_interface(self, interface_id: str) -> Optional[dict]:
138
        """Try to enable one interface and its embedded object on links."""
139 1
        return self._update_interface(
140
            interface_id, {"$set": {"enabled": True}}
141
        )
142
143 1
    def disable_interface(self, interface_id: str) -> Optional[dict]:
144
        """Try to disable one interface and its embedded object on links."""
145 1
        return self._update_interface(
146
            interface_id, {"$set": {"enabled": False}}
147
        )
148
149 1
    def activate_interface(self, interface_id: str) -> Optional[dict]:
150
        """Try to activate one interface."""
151 1
        return self._update_interface(interface_id, {"$set": {"active": True}})
152
153 1
    def deactivate_interface(self, interface_id: str) -> Optional[dict]:
154
        """Try to deactivate one interface."""
155 1
        return self._update_interface(
156
            interface_id, {"$set": {"active": False}}
157
        )
158
159 1
    def add_interface_metadata(
160
        self, interface_id: str, metadata: dict
161
    ) -> Optional[dict]:
162
        """Try to find an interface and add to its metadata."""
163 1
        update_expr = {
164
            "$set": {f"metadata.{k}": v for k, v in metadata.items()}
165
        }
166 1
        return self._update_interface(interface_id, update_expr)
167
168 1
    def delete_interface_metadata_key(
169
        self, interface_id: str, key: str
170
    ) -> Optional[dict]:
171
        """Try to find an interface and delete a metadata key."""
172 1
        return self._update_interface(
173
            interface_id, {"$unset": {f"metadata.{key}": ""}}
174
        )
175
176 1
    def _update_interface(
177
        self, interface_id: str, update_expr: dict
178
    ) -> Optional[dict]:
179
        """Try to update one interface and its embedded object on links."""
180 1
        self._set_updated_at(update_expr)
181 1
        interfaces_expression = {}
182 1
        for operator, values in update_expr.items():
183 1
            interfaces_expression[operator] = {
184
                f"interfaces.$.{k}": v for k, v in values.items()
185
            }
186 1
        return self.db.switches.find_one_and_update(
187
            {"interfaces.id": interface_id},
188
            interfaces_expression,
189
            return_document=ReturnDocument.AFTER,
190
        )
191
192 1
    def upsert_link(self, link_id: str, link_dict: dict) -> dict:
193
        """Update or insert a Link."""
194 1
        utc_now = datetime.utcnow()
195
196 1
        endpoint_a = link_dict.get("endpoint_a")
197 1
        endpoint_b = link_dict.get("endpoint_b")
198 1
        model = LinkDoc(
199
            **{
200
                **link_dict,
201
                **{
202
                    "updated_at": utc_now,
203
                    "_id": link_id,
204
                    "endpoints": [endpoint_a, endpoint_b],
205
                },
206
            }
207
        )
208 1
        updated = self.db.links.find_one_and_update(
209
            {"_id": link_id},
210
            {
211
                "$set": model.dict(exclude={"inserted_at"}),
212
                "$setOnInsert": {"inserted_at": utc_now},
213
            },
214
            return_document=ReturnDocument.AFTER,
215
            upsert=True,
216
        )
217 1
        self.db.switches.find_one_and_update(
218
            {"interfaces.id": endpoint_a},
219
            {
220
                "$set": {
221
                    "interfaces.$.link_id": link_id,
222
                    "interfaces.$.link_side": "endpoint_a",
223
                    "updated_at": utc_now,
224
                }
225
            },
226
        )
227 1
        self.db.switches.find_one_and_update(
228
            {"interfaces.id": endpoint_b},
229
            {
230
                "$set": {
231
                    "interfaces.$.link_id": link_id,
232
                    "interfaces.$.link_side": "endpoint_b",
233
                    "updated_at": utc_now,
234
                }
235
            },
236
        )
237 1
        return updated
238
239 1
    def _update_link(self, link_id: str, update_expr: dict) -> Optional[dict]:
240
        """Try to find one link and update it given an update expression."""
241 1
        self._set_updated_at(update_expr)
242 1
        return self.db.links.find_one_and_update({"_id": link_id}, update_expr)
243
244 1
    def enable_link(self, link_id: str) -> Optional[dict]:
245
        """Try to find one link and enable it."""
246 1
        return self._update_link(link_id, {"$set": {"enabled": True}})
247
248 1
    def disable_link(self, link_id: str) -> Optional[dict]:
249
        """Try to find one link and disable it."""
250 1
        return self._update_link(link_id, {"$set": {"enabled": False}})
251
252 1
    def deactivate_link(self, link_id: str,
253
                        last_status_change: Optional[float] = None,
254
                        last_status_is_active=False
255
                        ) -> Optional[dict]:
256
        """Try to find one link and deactivate it."""
257
        # It might be worth using datetime in the future
258 1
        last_status_change = last_status_change or time.time()
259 1
        return self._update_link(
260
            link_id,
261
            {
262
                "$set": {
263
                    "metadata.last_status_change": last_status_change,
264
                    "metadata.last_status_is_active": last_status_is_active,
265
                    "active": False,
266
                }
267
            },
268
        )
269
270 1
    def activate_link(self, link_id: str,
271
                      last_status_change: Optional[float] = None,
272
                      last_status_is_active=True
273
                      ) -> Optional[dict]:
274
        """Try to find one link and activate it."""
275
        # It might be worth using datetime in the future
276 1
        last_status_change = last_status_change or time.time()
277 1
        return self._update_link(
278
            link_id,
279
            {
280
                "$set": {
281
                    "metadata.last_status_change": last_status_change,
282
                    "metadata.last_status_is_active": last_status_is_active,
283
                    "active": True,
284
                }
285
            },
286
        )
287
288 1
    def add_link_metadata(
289
        self, link_id: str, metadata: dict
290
    ) -> Optional[dict]:
291
        """Try to find link and add to its metadata."""
292 1
        update_expr = {
293
            "$set": {f"metadata.{k}": v for k, v in metadata.items()}
294
        }
295 1
        return self._update_link(link_id, update_expr)
296
297 1
    def delete_link_metadata_key(
298
        self, link_id: str, key: str
299
    ) -> Optional[dict]:
300
        """Try to find a link and delete a metadata key."""
301 1
        return self._update_link(link_id, {"$unset": {f"metadata.{key}": ""}})
302
303 1
    def bulk_upsert_interface_details(
304
        self, ids_details: List[Tuple[str, dict]]
305
    ) -> Optional[dict]:
306
        """Update or insert interfaces details."""
307 1
        utc_now = datetime.utcnow()
308 1
        ops = []
309 1
        for _id, detail_dict in ids_details:
310 1
            ops.append(
311
                UpdateOne(
312
                    {"_id": _id},
313
                    {
314
                        "$set": InterfaceDetailDoc(
315
                            **{
316
                                **detail_dict,
317
                                **{
318
                                    "updated_at": utc_now,
319
                                    "_id": _id,
320
                                },
321
                            }
322
                        ).dict(exclude={"inserted_at"}),
323
                        "$setOnInsert": {"inserted_at": utc_now},
324
                    },
325
                    upsert=True,
326
                ),
327
            )
328
329 1
        with self.interface_details_lock:
330 1
            with self.db_client.start_session() as session:
331 1
                with session.start_transaction():
332 1
                    return self.db.interface_details.bulk_write(
333
                        ops, ordered=False, session=session
334
                    )
335
336 1
    def get_interfaces_details(
337
        self, interface_ids: List[str]
338
    ) -> Optional[dict]:
339
        """Try to get interfaces details given a list of interface ids."""
340 1
        return self.db.interface_details.aggregate(
341
            [
342
                {"$match": {"_id": {"$in": interface_ids}}},
343
            ]
344
        )
345