Passed
Pull Request — master (#84)
by Vinicius
03:38 queued 52s
created

build.controllers   A

Complexity

Total Complexity 40

Size/Duplication

Total Lines 290
Duplicated Lines 0 %

Test Coverage

Coverage 100%

Importance

Changes 0
Metric Value
eloc 184
dl 0
loc 290
ccs 115
cts 115
cp 1
rs 9.2
c 0
b 0
f 0
wmc 40

31 Methods

Rating   Name   Duplication   Size   Complexity  
A TopoController.add_interface_metadata() 0 6 1
A TopoController.enable_switch() 0 3 1
A TopoController.disable_interface() 0 3 1
A TopoController.bootstrap_indexes() 0 10 3
A TopoController.get_topology() 0 5 1
A TopoController.get_links() 0 9 1
A TopoController.add_switch_metadata() 0 4 1
A TopoController._update_switch() 0 4 1
A TopoController.add_link_metadata() 0 4 1
A TopoController.disable_switch() 0 4 1
A TopoController.delete_link_metadata_key() 0 3 1
A TopoController.get_switches() 0 9 1
A TopoController.deactivate_switch() 0 3 1
A TopoController.delete_switch_metadata_key() 0 3 1
A TopoController._update_interface() 0 12 2
A TopoController.activate_interface() 0 3 1
A TopoController.enable_interface_lldp() 0 3 1
A TopoController.enable_link() 0 3 1
A TopoController._update_link() 0 4 1
A TopoController.disable_link() 0 3 1
A TopoController._set_updated_at() 0 6 2
A TopoController.enable_interface() 0 3 1
A TopoController.__init__() 0 6 2
B TopoController.bulk_upsert_interface_details() 0 31 5
A TopoController.delete_interface_metadata_key() 0 5 1
A TopoController.upsert_switch() 0 14 1
A TopoController.get_interfaces() 0 11 1
A TopoController.deactivate_interface() 0 3 1
A TopoController.upsert_link() 0 46 1
A TopoController.get_interfaces_details() 0 5 1
A TopoController.disable_interface_lldp() 0 3 1

How to fix   Complexity   

Complexity

Complex classes like build.controllers often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""TopoController."""
2
3 1
from typing import List
4 1
from typing import Optional
5 1
from typing import Tuple
6 1
from threading import Lock
7
8 1
from datetime import datetime
9
10 1
from napps.kytos.topology.db.models import SwitchDoc
11 1
from napps.kytos.topology.db.models import LinkDoc
12 1
from napps.kytos.topology.db.models import InterfaceDetailDoc
13
14 1
from kytos.core import log
15 1
from kytos.core.db import Mongo
16 1
import pymongo
17 1
from pymongo.collection import ReturnDocument
18 1
from pymongo.operations import UpdateOne
19
20
21 1
class TopoController:
22
    """TopoController."""
23
24 1
    def __init__(self, get_mongo=lambda: Mongo()) -> None:
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable Mongo does not seem to be defined.
Loading history...
25
        """Constructor of TopoController."""
26 1
        self.mongo = get_mongo()
27 1
        self.db_client = self.mongo.client
28 1
        self.db = self.db_client[self.mongo.db_name]
29 1
        self.interface_details_lock = Lock()
30
31 1
    def bootstrap_indexes(self) -> None:
32
        """Bootstrap all topology related indexes."""
33 1
        index_tuples = [
34
            ("switches", [("interfaces.id", pymongo.ASCENDING)]),
35
            ("links", [("endpoints.id", pymongo.ASCENDING)]),
36
        ]
37 1
        for collection, keys in index_tuples:
38 1
            if self.mongo.bootstrap_index(collection, keys):
39 1
                log.info(
40
                    f"Created DB index {keys}, "
41
                    f"collection: {collection})"
42
                )
43
44 1
    def get_topology(self) -> dict:
45
        """Get topology from DB."""
46 1
        switches = self.get_switches()
47 1
        links = self.get_links()
48 1
        return {"topology": {**links, **switches}}
49
50 1
    def get_switches(self) -> dict:
51
        """Get switches from DB."""
52 1
        switches = self.db.switches.aggregate(
53
            [
54
                {"$sort": {"_id": 1}},
55
                {"$project": SwitchDoc.projection()},
56
            ]
57
        )
58 1
        return {"switches": {value["id"]: value for value in switches}}
59
60 1
    def get_links(self) -> dict:
61
        """Get links from DB."""
62 1
        links = self.db.links.aggregate(
63
            [
64
                {"$sort": {"_id": 1}},
65
                {"$project": LinkDoc.projection()},
66
            ]
67
        )
68 1
        return {"links": {value["id"]: value for value in links}}
69
70 1
    def get_interfaces(self) -> dict:
71
        """Get interfaces from DB."""
72 1
        interfaces = self.db.switches.aggregate(
73
            [
74
                {"$sort": {"_id": 1}},
75
                {"$project": {"interfaces": 1, "_id": 0}},
76
                {"$unwind": "$interfaces"},
77
                {"$replaceRoot": {"newRoot": "$interfaces"}},
78
            ]
79
        )
80 1
        return {"interfaces": {value["id"]: value for value in interfaces}}
81
82 1
    def _set_updated_at(self, update_expr: dict) -> None:
83
        """Set updated_at on $set expression."""
84 1
        if "$set" in update_expr:
85 1
            update_expr["$set"].update({"updated_at": datetime.utcnow()})
86
        else:
87 1
            update_expr.update({"$set": {"updated_at": datetime.utcnow()}})
88
89 1
    def _update_switch(self, dpid: str, update_expr: dict) -> Optional[dict]:
90
        """Try to find one switch and update it given an update expression."""
91 1
        self._set_updated_at(update_expr)
92 1
        return self.db.switches.find_one_and_update({"_id": dpid}, update_expr)
93
94 1
    def upsert_switch(self, dpid: str, switch_dict: dict) -> Optional[dict]:
95
        """Update or insert switch."""
96 1
        utc_now = datetime.utcnow()
97 1
        model = SwitchDoc(**{**switch_dict, **{"_id": dpid, "updated_at": utc_now}})
98 1
        updated = self.db.switches.find_one_and_update(
99
            {"_id": dpid},
100
            {
101
                "$set": model.dict(exclude={"inserted_at"}),
102
                "$setOnInsert": {"inserted_at": utc_now},
103
            },
104
            return_document=ReturnDocument.AFTER,
105
            upsert=True,
106
        )
107 1
        return updated
108
109 1
    def enable_switch(self, dpid: str) -> Optional[dict]:
110
        """Try to find one switch and enable it."""
111 1
        return self._update_switch(dpid, {"$set": {"enabled": True}})
112
113 1
    def deactivate_switch(self, dpid: str) -> Optional[dict]:
114
        """Try to find one switch and deactivate it."""
115 1
        return self._update_switch(dpid, {"$set": {"active": False}})
116
117 1
    def disable_switch(self, dpid: str) -> Optional[dict]:
118
        """Try to find one switch and disable it."""
119 1
        return self._update_switch(
120
            dpid, {"$set": {"enabled": False, "interfaces.$[].enabled": False}}
121
        )
122
123 1
    def add_switch_metadata(self, dpid: str, metadata: dict) -> Optional[dict]:
124
        """Try to find a switch and add to its metadata."""
125 1
        update_expr = {"$set": {f"metadata.{k}": v for k, v in metadata.items()}}
126 1
        return self._update_switch(dpid, update_expr)
127
128 1
    def delete_switch_metadata_key(self, dpid: str, key: str) -> Optional[dict]:
129
        """Try to find a switch and delete a metadata key."""
130 1
        return self._update_switch(dpid, {"$unset": {f"metadata.{key}": ""}})
131
132 1
    def enable_interface(self, interface_id: str) -> Optional[dict]:
133
        """Try to enable one interface and its embedded object on links."""
134 1
        return self._update_interface(interface_id, {"$set": {"enabled": True}})
135
136 1
    def disable_interface(self, interface_id: str) -> Optional[dict]:
137
        """Try to disable one interface and its embedded object on links."""
138 1
        return self._update_interface(interface_id, {"$set": {"enabled": False}})
139
140 1
    def activate_interface(self, interface_id: str) -> Optional[dict]:
141
        """Try to activate one interface."""
142 1
        return self._update_interface(interface_id, {"$set": {"active": True}})
143
144 1
    def deactivate_interface(self, interface_id: str) -> Optional[dict]:
145
        """Try to deactivate one interface."""
146 1
        return self._update_interface(interface_id, {"$set": {"active": False}})
147
148 1
    def enable_interface_lldp(self, interface_id: str) -> Optional[dict]:
149
        """Try to enable LLDP one interface."""
150 1
        return self._update_interface(interface_id, {"$set": {"lldp": True}})
151
152 1
    def disable_interface_lldp(self, interface_id: str) -> Optional[dict]:
153
        """Try to disable LLDP one interface."""
154 1
        return self._update_interface(interface_id, {"$set": {"lldp": False}})
155
156 1
    def add_interface_metadata(
157
        self, interface_id: str, metadata: dict
158
    ) -> Optional[dict]:
159
        """Try to find an interface and add to its metadata."""
160 1
        update_expr = {"$set": {f"metadata.{k}": v for k, v in metadata.items()}}
161 1
        return self._update_interface(interface_id, update_expr)
162
163 1
    def delete_interface_metadata_key(
164
        self, interface_id: str, key: str
165
    ) -> Optional[dict]:
166
        """Try to find an interface and delete a metadata key."""
167 1
        return self._update_interface(interface_id, {"$unset": {f"metadata.{key}": ""}})
168
169 1
    def _update_interface(self, interface_id: str, update_expr: dict) -> Optional[dict]:
170
        """Try to update one interface and its embedded object on links."""
171 1
        self._set_updated_at(update_expr)
172 1
        interfaces_expression = {}
173 1
        for operator, values in update_expr.items():
174 1
            interfaces_expression[operator] = {
175
                f"interfaces.$.{k}": v for k, v in values.items()
176
            }
177 1
        return self.db.switches.find_one_and_update(
178
            {"interfaces.id": interface_id},
179
            interfaces_expression,
180
            return_document=ReturnDocument.AFTER,
181
        )
182
183 1
    def upsert_link(self, link_id: str, link_dict: dict) -> dict:
184
        """Update or insert a Link."""
185 1
        utc_now = datetime.utcnow()
186
187 1
        endpoint_a = link_dict.get("endpoint_a")
188 1
        endpoint_b = link_dict.get("endpoint_b")
189 1
        model = LinkDoc(
190
            **{
191
                **link_dict,
192
                **{
193
                    "updated_at": utc_now,
194
                    "_id": link_id,
195
                    "endpoints": [endpoint_a, endpoint_b],
196
                },
197
            }
198
        )
199 1
        updated = self.db.links.find_one_and_update(
200
            {"_id": link_id},
201
            {
202
                "$set": model.dict(exclude={"inserted_at"}),
203
                "$setOnInsert": {"inserted_at": utc_now},
204
            },
205
            return_document=ReturnDocument.AFTER,
206
            upsert=True,
207
        )
208 1
        self.db.switches.find_one_and_update(
209
            {"interfaces.id": endpoint_a},
210
            {
211
                "$set": {
212
                    "interfaces.$.link_id": link_id,
213
                    "interfaces.$.link_side": "endpoint_a",
214
                    "updated_at": utc_now,
215
                }
216
            },
217
        )
218 1
        self.db.switches.find_one_and_update(
219
            {"interfaces.id": endpoint_b},
220
            {
221
                "$set": {
222
                    "interfaces.$.link_id": link_id,
223
                    "interfaces.$.link_side": "endpoint_b",
224
                    "updated_at": utc_now,
225
                }
226
            },
227
        )
228 1
        return updated
229
230 1
    def _update_link(self, link_id: str, update_expr: dict) -> Optional[dict]:
231
        """Try to find one link and update it given an update expression."""
232 1
        self._set_updated_at(update_expr)
233 1
        return self.db.links.find_one_and_update({"_id": link_id}, update_expr)
234
235 1
    def enable_link(self, link_id: str) -> Optional[dict]:
236
        """Try to find one link and enable it."""
237 1
        return self._update_link(link_id, {"$set": {"enabled": True}})
238
239 1
    def disable_link(self, link_id: str) -> Optional[dict]:
240
        """Try to find one link and disable it."""
241 1
        return self._update_link(link_id, {"$set": {"enabled": False}})
242
243 1
    def add_link_metadata(self, link_id: str, metadata: dict) -> Optional[dict]:
244
        """Try to find link and add to its metadata."""
245 1
        update_expr = {"$set": {f"metadata.{k}": v for k, v in metadata.items()}}
246 1
        return self._update_link(link_id, update_expr)
247
248 1
    def delete_link_metadata_key(self, link_id: str, key: str) -> Optional[dict]:
249
        """Try to find a link and delete a metadata key."""
250 1
        return self._update_link(link_id, {"$unset": {f"metadata.{key}": ""}})
251
252 1
    def bulk_upsert_interface_details(
253
        self, ids_details: List[Tuple[str, dict]]
254
    ) -> Optional[dict]:
255
        """Update or insert interfaces details."""
256 1
        utc_now = datetime.utcnow()
257 1
        ops = []
258 1
        for _id, detail_dict in ids_details:
259 1
            ops.append(
260
                UpdateOne(
261
                    {"_id": _id},
262
                    {
263
                        "$set": InterfaceDetailDoc(
264
                            **{
265
                                **detail_dict,
266
                                **{
267
                                    "updated_at": utc_now,
268
                                    "_id": _id,
269
                                },
270
                            }
271
                        ).dict(exclude={"inserted_at"}),
272
                        "$setOnInsert": {"inserted_at": utc_now},
273
                    },
274
                    upsert=True,
275
                ),
276
            )
277
278 1
        with self.interface_details_lock:
279 1
            with self.db_client.start_session() as session:
280 1
                with session.start_transaction():
281 1
                    return self.db.interface_details.bulk_write(
282
                        ops, ordered=False, session=session
283
                    )
284
285 1
    def get_interfaces_details(self, interface_ids: List[str]) -> Optional[dict]:
286
        """Try to get interfaces details given a list of interface ids."""
287 1
        return self.db.interface_details.aggregate(
288
            [
289
                {"$match": {"_id": {"$in": interface_ids}}},
290
            ]
291
        )
292