Test Failed
Pull Request — master (#84)
by Vinicius
07:12
created

build.controllers   A

Complexity

Total Complexity 40

Size/Duplication

Total Lines 290
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 184
dl 0
loc 290
rs 9.2
c 0
b 0
f 0
wmc 40

31 Methods

Rating   Name   Duplication   Size   Complexity  
A TopoController.add_interface_metadata() 0 6 1
A TopoController.enable_switch() 0 3 1
A TopoController.disable_interface() 0 3 1
A TopoController.bootstrap_indexes() 0 10 3
A TopoController.get_topology() 0 5 1
A TopoController.get_links() 0 9 1
A TopoController.add_switch_metadata() 0 4 1
A TopoController._update_switch() 0 4 1
A TopoController.add_link_metadata() 0 4 1
A TopoController.disable_switch() 0 4 1
A TopoController.delete_link_metadata_key() 0 3 1
A TopoController.get_switches() 0 9 1
A TopoController.deactivate_switch() 0 3 1
A TopoController.delete_switch_metadata_key() 0 3 1
A TopoController._update_interface() 0 12 2
A TopoController.activate_interface() 0 3 1
A TopoController.enable_interface_lldp() 0 3 1
A TopoController.enable_link() 0 3 1
A TopoController._update_link() 0 4 1
A TopoController.disable_link() 0 3 1
A TopoController._set_updated_at() 0 6 2
A TopoController.enable_interface() 0 3 1
A TopoController.__init__() 0 6 2
B TopoController.bulk_upsert_interface_details() 0 31 5
A TopoController.delete_interface_metadata_key() 0 5 1
A TopoController.upsert_switch() 0 14 1
A TopoController.get_interfaces() 0 11 1
A TopoController.deactivate_interface() 0 3 1
A TopoController.upsert_link() 0 46 1
A TopoController.get_interfaces_details() 0 5 1
A TopoController.disable_interface_lldp() 0 3 1

How to fix   Complexity   

Complexity

Complex classes like build.controllers often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""TopoController."""
2
3
from typing import List
4
from typing import Optional
5
from typing import Tuple
6
from threading import Lock
7
8
from datetime import datetime
9
10
from napps.kytos.topology.db.models import SwitchDoc
11
from napps.kytos.topology.db.models import LinkDoc
12
from napps.kytos.topology.db.models import InterfaceDetailDoc
13
14
from kytos.core import log
15
from kytos.core.db import Mongo
16
import pymongo
17
from pymongo.collection import ReturnDocument
18
from pymongo.operations import UpdateOne
19
20
21
class TopoController:
22
    """TopoController."""
23
24
    def __init__(self, get_mongo=lambda: Mongo()) -> None:
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable Mongo does not seem to be defined.
Loading history...
25
        """Constructor of TopoController."""
26
        self.mongo = get_mongo()
27
        self.db_client = self.mongo.client
28
        self.db = self.db_client[self.mongo.db_name]
29
        self.interface_details_lock = Lock()
30
31
    def bootstrap_indexes(self) -> None:
32
        """Bootstrap all topology related indexes."""
33
        index_tuples = [
34
            ("switches", "interfaces.id", pymongo.ASCENDING),
35
            ("links", "endpoints.id", pymongo.ASCENDING),
36
        ]
37
        for collection, index, direction in index_tuples:
38
            if self.mongo.bootstrap_index(collection, index, direction):
39
                log.info(
40
                    f"Created DB index ({index}, {direction}), "
41
                    f"collection: {collection})"
42
                )
43
44
    def get_topology(self) -> dict:
45
        """Get topology from DB."""
46
        switches = self.get_switches()
47
        links = self.get_links()
48
        return {"topology": {**links, **switches}}
49
50
    def get_switches(self) -> dict:
51
        """Get switches from DB."""
52
        switches = self.db.switches.aggregate(
53
            [
54
                {"$sort": {"_id": 1}},
55
                {"$project": SwitchDoc.projection()},
56
            ]
57
        )
58
        return {"switches": {value["id"]: value for value in switches}}
59
60
    def get_links(self) -> dict:
61
        """Get links from DB."""
62
        links = self.db.links.aggregate(
63
            [
64
                {"$sort": {"_id": 1}},
65
                {"$project": LinkDoc.projection()},
66
            ]
67
        )
68
        return {"links": {value["id"]: value for value in links}}
69
70
    def get_interfaces(self) -> dict:
71
        """Get interfaces from DB."""
72
        interfaces = self.db.switches.aggregate(
73
            [
74
                {"$sort": {"_id": 1}},
75
                {"$project": {"interfaces": 1, "_id": 0}},
76
                {"$unwind": "$interfaces"},
77
                {"$replaceRoot": {"newRoot": "$interfaces"}},
78
            ]
79
        )
80
        return {"interfaces": {value["id"]: value for value in interfaces}}
81
82
    def _set_updated_at(self, update_expr: dict) -> None:
83
        """Set updated_at on $set expression."""
84
        if "$set" in update_expr:
85
            update_expr["$set"].update({"updated_at": datetime.utcnow()})
86
        else:
87
            update_expr.update({"$set": {"updated_at": datetime.utcnow()}})
88
89
    def _update_switch(self, dpid: str, update_expr: dict) -> Optional[dict]:
90
        """Try to find one switch and update it given an update expression."""
91
        self._set_updated_at(update_expr)
92
        return self.db.switches.find_one_and_update({"_id": dpid}, update_expr)
93
94
    def upsert_switch(self, dpid: str, switch_dict: dict) -> Optional[dict]:
95
        """Update or insert switch."""
96
        utc_now = datetime.utcnow()
97
        model = SwitchDoc(**{**switch_dict, **{"_id": dpid, "updated_at": utc_now}})
98
        updated = self.db.switches.find_one_and_update(
99
            {"_id": dpid},
100
            {
101
                "$set": model.dict(exclude={"inserted_at"}),
102
                "$setOnInsert": {"inserted_at": utc_now},
103
            },
104
            return_document=ReturnDocument.AFTER,
105
            upsert=True,
106
        )
107
        return updated
108
109
    def enable_switch(self, dpid: str) -> Optional[dict]:
110
        """Try to find one switch and enable it."""
111
        return self._update_switch(dpid, {"$set": {"enabled": True}})
112
113
    def deactivate_switch(self, dpid: str) -> Optional[dict]:
114
        """Try to find one switch and deactivate it."""
115
        return self._update_switch(dpid, {"$set": {"active": False}})
116
117
    def disable_switch(self, dpid: str) -> Optional[dict]:
118
        """Try to find one switch and disable it."""
119
        return self._update_switch(
120
            dpid, {"$set": {"enabled": False, "interfaces.$[].enabled": False}}
121
        )
122
123
    def add_switch_metadata(self, dpid: str, metadata: dict) -> Optional[dict]:
124
        """Try to find a switch and add to its metadata."""
125
        update_expr = {"$set": {f"metadata.{k}": v for k, v in metadata.items()}}
126
        return self._update_switch(dpid, update_expr)
127
128
    def delete_switch_metadata_key(self, dpid: str, key: str) -> Optional[dict]:
129
        """Try to find a switch and delete a metadata key."""
130
        return self._update_switch(dpid, {"$unset": {f"metadata.{key}": ""}})
131
132
    def enable_interface(self, interface_id: str) -> Optional[dict]:
133
        """Try to enable one interface and its embedded object on links."""
134
        return self._update_interface(interface_id, {"$set": {"enabled": True}})
135
136
    def disable_interface(self, interface_id: str) -> Optional[dict]:
137
        """Try to disable one interface and its embedded object on links."""
138
        return self._update_interface(interface_id, {"$set": {"enabled": False}})
139
140
    def activate_interface(self, interface_id: str) -> Optional[dict]:
141
        """Try to activate one interface."""
142
        return self._update_interface(interface_id, {"$set": {"active": True}})
143
144
    def deactivate_interface(self, interface_id: str) -> Optional[dict]:
145
        """Try to deactivate one interface."""
146
        return self._update_interface(interface_id, {"$set": {"active": False}})
147
148
    def enable_interface_lldp(self, interface_id: str) -> Optional[dict]:
149
        """Try to enable LLDP one interface."""
150
        return self._update_interface(interface_id, {"$set": {"lldp": True}})
151
152
    def disable_interface_lldp(self, interface_id: str) -> Optional[dict]:
153
        """Try to disable LLDP one interface."""
154
        return self._update_interface(interface_id, {"$set": {"lldp": False}})
155
156
    def add_interface_metadata(
157
        self, interface_id: str, metadata: dict
158
    ) -> Optional[dict]:
159
        """Try to find an interface and add to its metadata."""
160
        update_expr = {"$set": {f"metadata.{k}": v for k, v in metadata.items()}}
161
        return self._update_interface(interface_id, update_expr)
162
163
    def delete_interface_metadata_key(
164
        self, interface_id: str, key: str
165
    ) -> Optional[dict]:
166
        """Try to find an interface and delete a metadata key."""
167
        return self._update_interface(interface_id, {"$unset": {f"metadata.{key}": ""}})
168
169
    def _update_interface(self, interface_id: str, update_expr: dict) -> Optional[dict]:
170
        """Try to update one interface and its embedded object on links."""
171
        self._set_updated_at(update_expr)
172
        interfaces_expression = {}
173
        for operator, values in update_expr.items():
174
            interfaces_expression[operator] = {
175
                f"interfaces.$.{k}": v for k, v in values.items()
176
            }
177
        return self.db.switches.find_one_and_update(
178
            {"interfaces.id": interface_id},
179
            interfaces_expression,
180
            return_document=ReturnDocument.AFTER,
181
        )
182
183
    def upsert_link(self, link_id: str, link_dict: dict) -> dict:
184
        """Update or insert a Link."""
185
        utc_now = datetime.utcnow()
186
187
        endpoint_a = link_dict.get("endpoint_a")
188
        endpoint_b = link_dict.get("endpoint_b")
189
        model = LinkDoc(
190
            **{
191
                **link_dict,
192
                **{
193
                    "updated_at": utc_now,
194
                    "_id": link_id,
195
                    "endpoints": [endpoint_a, endpoint_b],
196
                },
197
            }
198
        )
199
        updated = self.db.links.find_one_and_update(
200
            {"_id": link_id},
201
            {
202
                "$set": model.dict(exclude={"inserted_at"}),
203
                "$setOnInsert": {"inserted_at": utc_now},
204
            },
205
            return_document=ReturnDocument.AFTER,
206
            upsert=True,
207
        )
208
        self.db.switches.find_one_and_update(
209
            {"interfaces.id": endpoint_a},
210
            {
211
                "$set": {
212
                    "interfaces.$.link_id": link_id,
213
                    "interfaces.$.link_side": "endpoint_a",
214
                    "updated_at": utc_now,
215
                }
216
            },
217
        )
218
        self.db.switches.find_one_and_update(
219
            {"interfaces.id": endpoint_b},
220
            {
221
                "$set": {
222
                    "interfaces.$.link_id": link_id,
223
                    "interfaces.$.link_side": "endpoint_b",
224
                    "updated_at": utc_now,
225
                }
226
            },
227
        )
228
        return updated
229
230
    def _update_link(self, link_id: str, update_expr: dict) -> Optional[dict]:
231
        """Try to find one link and update it given an update expression."""
232
        self._set_updated_at(update_expr)
233
        return self.db.links.find_one_and_update({"_id": link_id}, update_expr)
234
235
    def enable_link(self, link_id: str) -> Optional[dict]:
236
        """Try to find one link and enable it."""
237
        return self._update_link(link_id, {"$set": {"enabled": True}})
238
239
    def disable_link(self, link_id: str) -> Optional[dict]:
240
        """Try to find one link and disable it."""
241
        return self._update_link(link_id, {"$set": {"enabled": False}})
242
243
    def add_link_metadata(self, link_id: str, metadata: dict) -> Optional[dict]:
244
        """Try to find link and add to its metadata."""
245
        update_expr = {"$set": {f"metadata.{k}": v for k, v in metadata.items()}}
246
        return self._update_link(link_id, update_expr)
247
248
    def delete_link_metadata_key(self, link_id: str, key: str) -> Optional[dict]:
249
        """Try to find a link and delete a metadata key."""
250
        return self._update_link(link_id, {"$unset": {f"metadata.{key}": ""}})
251
252
    def bulk_upsert_interface_details(
253
        self, ids_details: List[Tuple[str, dict]]
254
    ) -> Optional[dict]:
255
        """Update or insert interfaces details."""
256
        utc_now = datetime.utcnow()
257
        ops = []
258
        for _id, detail_dict in ids_details:
259
            ops.append(
260
                UpdateOne(
261
                    {"_id": _id},
262
                    {
263
                        "$set": InterfaceDetailDoc(
264
                            **{
265
                                **detail_dict,
266
                                **{
267
                                    "updated_at": utc_now,
268
                                    "_id": _id,
269
                                },
270
                            }
271
                        ).dict(exclude={"inserted_at"}),
272
                        "$setOnInsert": {"inserted_at": utc_now},
273
                    },
274
                    upsert=True,
275
                ),
276
            )
277
278
        with self.interface_details_lock:
279
            with self.db_client.start_session() as session:
280
                with session.start_transaction():
281
                    return self.db.interface_details.bulk_write(
282
                        ops, ordered=False, session=session
283
                    )
284
285
    def get_interfaces_details(self, interface_ids: List[str]) -> Optional[dict]:
286
        """Try to get interfaces details given a list of interface ids."""
287
        return self.db.interface_details.aggregate(
288
            [
289
                {"$match": {"_id": {"$in": interface_ids}}},
290
            ]
291
        )
292