Test Failed
Pull Request — master (#84)
by Vinicius
38:36
created

build.controllers   A

Complexity

Total Complexity 41

Size/Duplication

Total Lines 309
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 196
dl 0
loc 309
rs 9.1199
c 0
b 0
f 0
wmc 41

33 Methods

Rating   Name   Duplication   Size   Complexity  
A TopoController.add_interface_metadata() 0 6 1
A TopoController.enable_switch() 0 3 1
A TopoController.disable_interface() 0 3 1
A TopoController.bootstrap_indexes() 0 10 3
A TopoController.get_topology() 0 5 1
A TopoController.get_links() 0 9 1
A TopoController.add_switch_metadata() 0 4 1
A TopoController._update_switch() 0 4 1
A TopoController.deactivate_link() 0 5 1
A TopoController.add_link_metadata() 0 4 1
A TopoController.disable_switch() 0 4 1
A TopoController.delete_link_metadata_key() 0 3 1
A TopoController.get_switches() 0 9 1
A TopoController.deactivate_switch() 0 3 1
A TopoController.delete_switch_metadata_key() 0 3 1
A TopoController._update_interface() 0 12 2
A TopoController.activate_interface() 0 3 1
A TopoController.enable_interface_lldp() 0 3 1
A TopoController.enable_link() 0 3 1
A TopoController._update_link() 0 4 1
A TopoController.disable_link() 0 3 1
A TopoController._set_updated_at() 0 6 2
A TopoController.enable_interface() 0 3 1
A TopoController.__init__() 0 9 1
B TopoController.bulk_upsert_interface_details() 0 31 5
A TopoController.delete_interface_metadata_key() 0 5 1
A TopoController.upsert_switch() 0 15 1
A TopoController.get_interfaces() 0 11 1
A TopoController.deactivate_interface() 0 3 1
A TopoController.upsert_link() 0 46 1
A TopoController.get_interfaces_details() 0 5 1
A TopoController.activate_link() 0 4 1
A TopoController.disable_interface_lldp() 0 3 1

How to fix   Complexity   

Complexity

Complex classes like build.controllers often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""TopoController."""
2
import os
3
4
from typing import List
5
from typing import Optional
6
from typing import Tuple
7
from threading import Lock
8
9
from datetime import datetime
10
11
from napps.kytos.topology.db.client import mongo_client
12
from napps.kytos.topology.db.client import bootstrap_index
13
from napps.kytos.topology.db.models import SwitchDoc
14
from napps.kytos.topology.db.models import LinkDoc
15
from napps.kytos.topology.db.models import InterfaceDetailDoc
16
17
from kytos.core import log
18
import pymongo
19
from pymongo.collection import ReturnDocument
20
from pymongo.operations import UpdateOne
21
22
23
class TopoController:
24
    """TopoController."""
25
26
    def __init__(self, db_client=mongo_client, db_client_options=None) -> None:
27
        """Constructor of TopoController."""
28
        client_kwargs = db_client_options or {}
29
        db_name = client_kwargs.get("database") or os.environ.get(
30
            "MONGO_DBNAME", "napps"
31
        )
32
        self.db_client = db_client(**client_kwargs)
33
        self.db = self.db_client[db_name]
34
        self.interface_details_lock = Lock()
35
36
    def bootstrap_indexes(self) -> None:
37
        """Bootstrap all topology related indexes."""
38
        index_tuples = [
39
            ("switches", "interfaces.id", pymongo.ASCENDING),
40
            ("links", "endpoints.id", pymongo.ASCENDING),
41
        ]
42
        for collection, index, direction in index_tuples:
43
            if bootstrap_index(self.db, collection, index, direction):
44
                log.info(
45
                    f"Created DB index ({index}, {direction}), "
46
                    f"collection: {collection})"
47
                )
48
49
    def get_topology(self) -> dict:
50
        """Get topology from DB."""
51
        switches = self.get_switches()
52
        links = self.get_links()
53
        return {"topology": {**links, **switches}}
54
55
    def get_switches(self) -> dict:
56
        """Get switches from DB."""
57
        switches = self.db.switches.aggregate(
58
            [
59
                {"$sort": {"_id": 1}},
60
                {"$project": SwitchDoc.projection()},
61
            ]
62
        )
63
        return {"switches": {value["id"]: value for value in switches}}
64
65
    def get_links(self) -> dict:
66
        """Get links from DB."""
67
        links = self.db.links.aggregate(
68
            [
69
                {"$sort": {"_id": 1}},
70
                {"$project": LinkDoc.projection()},
71
            ]
72
        )
73
        return {"links": {value["id"]: value for value in links}}
74
75
    def get_interfaces(self) -> dict:
76
        """Get interfaces from DB."""
77
        interfaces = self.db.switches.aggregate(
78
            [
79
                {"$sort": {"_id": 1}},
80
                {"$project": {"interfaces": 1, "_id": 0}},
81
                {"$unwind": "$interfaces"},
82
                {"$replaceRoot": {"newRoot": "$interfaces"}},
83
            ]
84
        )
85
        return {"interfaces": {value["id"]: value for value in interfaces}}
86
87
    def _set_updated_at(self, update_expr: dict) -> None:
88
        """Set updated_at on $set expression."""
89
        if "$set" in update_expr:
90
            update_expr["$set"].update({"updated_at": datetime.utcnow()})
91
        else:
92
            update_expr.update({"$set": {"updated_at": datetime.utcnow()}})
93
94
    def _update_switch(self, dpid: str, update_expr: dict) -> Optional[dict]:
95
        """Try to find one switch and update it given an update expression."""
96
        self._set_updated_at(update_expr)
97
        return self.db.switches.find_one_and_update({"_id": dpid}, update_expr)
98
99
    def upsert_switch(self, dpid: str, switch_dict: dict, **kwargs) -> Optional[dict]:
100
        """Update or insert switch."""
101
        utc_now = datetime.utcnow()
102
        model = SwitchDoc(**{**switch_dict, **{"_id": dpid, "updated_at": utc_now}})
103
        updated = self.db.switches.find_one_and_update(
104
            {"_id": dpid},
105
            {
106
                "$set": model.dict(exclude={"inserted_at"}),
107
                "$setOnInsert": {"inserted_at": utc_now},
108
            },
109
            return_document=ReturnDocument.AFTER,
110
            upsert=True,
111
            **kwargs,
112
        )
113
        return updated
114
115
    def enable_switch(self, dpid: str) -> Optional[dict]:
116
        """Try to find one switch and enable it."""
117
        return self._update_switch(dpid, {"$set": {"enabled": True}})
118
119
    def deactivate_switch(self, dpid: str) -> Optional[dict]:
120
        """Try to find one switch and deactivate it."""
121
        return self._update_switch(dpid, {"$set": {"active": False}})
122
123
    def disable_switch(self, dpid: str) -> Optional[dict]:
124
        """Try to find one switch and disable it."""
125
        return self._update_switch(
126
            dpid, {"$set": {"enabled": False, "interfaces.$[].enabled": False}}
127
        )
128
129
    def add_switch_metadata(self, dpid: str, metadata: dict) -> Optional[dict]:
130
        """Try to find a switch and add to its metadata."""
131
        update_expr = {"$set": {f"metadata.{k}": v for k, v in metadata.items()}}
132
        return self._update_switch(dpid, update_expr)
133
134
    def delete_switch_metadata_key(self, dpid: str, key: str) -> Optional[dict]:
135
        """Try to find a switch and delete a metadata key."""
136
        return self._update_switch(dpid, {"$unset": {f"metadata.{key}": ""}})
137
138
    def enable_interface(self, interface_id: str) -> Optional[dict]:
139
        """Try to enable one interface and its embedded object on links."""
140
        return self._update_interface(interface_id, {"$set": {"enabled": True}})
141
142
    def disable_interface(self, interface_id: str) -> Optional[dict]:
143
        """Try to disable one interface and its embedded object on links."""
144
        return self._update_interface(interface_id, {"$set": {"enabled": False}})
145
146
    def activate_interface(self, interface_id: str) -> Optional[dict]:
147
        """Try to activate one interface."""
148
        return self._update_interface(interface_id, {"$set": {"active": True}})
149
150
    def deactivate_interface(self, interface_id: str) -> Optional[dict]:
151
        """Try to deactivate one interface."""
152
        return self._update_interface(interface_id, {"$set": {"active": False}})
153
154
    def enable_interface_lldp(self, interface_id: str) -> Optional[dict]:
155
        """Try to enable LLDP one interface."""
156
        return self._update_interface(interface_id, {"$set": {"lldp": True}})
157
158
    def disable_interface_lldp(self, interface_id: str) -> Optional[dict]:
159
        """Try to disable LLDP one interface."""
160
        return self._update_interface(interface_id, {"$set": {"lldp": False}})
161
162
    def add_interface_metadata(
163
        self, interface_id: str, metadata: dict
164
    ) -> Optional[dict]:
165
        """Try to find an interface and add to its metadata."""
166
        update_expr = {"$set": {f"metadata.{k}": v for k, v in metadata.items()}}
167
        return self._update_interface(interface_id, update_expr)
168
169
    def delete_interface_metadata_key(
170
        self, interface_id: str, key: str
171
    ) -> Optional[dict]:
172
        """Try to find an interface and delete a metadata key."""
173
        return self._update_interface(interface_id, {"$unset": {f"metadata.{key}": ""}})
174
175
    def _update_interface(self, interface_id: str, update_expr: dict) -> Optional[dict]:
176
        """Try to update one interface and its embedded object on links."""
177
        self._set_updated_at(update_expr)
178
        interfaces_expression = {}
179
        for operator, values in update_expr.items():
180
            interfaces_expression[operator] = {
181
                f"interfaces.$.{k}": v for k, v in values.items()
182
            }
183
        return self.db.switches.find_one_and_update(
184
            {"interfaces.id": interface_id},
185
            interfaces_expression,
186
            return_document=ReturnDocument.AFTER,
187
        )
188
189
    def upsert_link(self, link_id: str, link_dict: dict) -> dict:
190
        """Update or insert a Link."""
191
        utc_now = datetime.utcnow()
192
193
        endpoint_a = link_dict.get("endpoint_a")
194
        endpoint_b = link_dict.get("endpoint_b")
195
        model = LinkDoc(
196
            **{
197
                **link_dict,
198
                **{
199
                    "updated_at": utc_now,
200
                    "_id": link_id,
201
                    "endpoints": [endpoint_a, endpoint_b],
202
                },
203
            }
204
        )
205
        updated = self.db.links.find_one_and_update(
206
            {"_id": link_id},
207
            {
208
                "$set": model.dict(exclude={"inserted_at"}),
209
                "$setOnInsert": {"inserted_at": utc_now},
210
            },
211
            return_document=ReturnDocument.AFTER,
212
            upsert=True,
213
        )
214
        self.db.switches.find_one_and_update(
215
            {"interfaces.id": endpoint_a},
216
            {
217
                "$set": {
218
                    "interfaces.$.link_id": link_id,
219
                    "interfaces.$.link_side": "endpoint_a",
220
                    "updated_at": utc_now,
221
                }
222
            },
223
        )
224
        self.db.switches.find_one_and_update(
225
            {"interfaces.id": endpoint_b},
226
            {
227
                "$set": {
228
                    "interfaces.$.link_id": link_id,
229
                    "interfaces.$.link_side": "endpoint_b",
230
                    "updated_at": utc_now,
231
                }
232
            },
233
        )
234
        return updated
235
236
    def _update_link(self, link_id: str, update_expr: dict) -> Optional[dict]:
237
        """Try to find one link and update it given an update expression."""
238
        self._set_updated_at(update_expr)
239
        return self.db.links.find_one_and_update({"_id": link_id}, update_expr)
240
241
    def activate_link(self, link_id: str) -> Optional[dict]:
242
        """Try to find one link and activate it."""
243
        return self._update_link(
244
            link_id, {"$set": {"active": True, "metadata.last_status_is_active": True}}
245
        )
246
247
    def deactivate_link(self, link_id: str, interface_id: str) -> Optional[dict]:
248
        """Try to find one link and deactivate it."""
249
        return self._update_link(
250
            link_id,
251
            {"$set": {"active": False, "metadata.last_status_is_active": False}},
252
        )
253
254
    def enable_link(self, link_id: str) -> Optional[dict]:
255
        """Try to find one link and enable it."""
256
        return self._update_link(link_id, {"$set": {"enabled": True}})
257
258
    def disable_link(self, link_id: str) -> Optional[dict]:
259
        """Try to find one link and disable it."""
260
        return self._update_link(link_id, {"$set": {"enabled": False}})
261
262
    def add_link_metadata(self, link_id: str, metadata: dict) -> Optional[dict]:
263
        """Try to find link and add to its metadata."""
264
        update_expr = {"$set": {f"metadata.{k}": v for k, v in metadata.items()}}
265
        return self._update_link(link_id, update_expr)
266
267
    def delete_link_metadata_key(self, link_id: str, key: str) -> Optional[dict]:
268
        """Try to find a link and delete a metadata key."""
269
        return self._update_link(link_id, {"$unset": {f"metadata.{key}": ""}})
270
271
    def bulk_upsert_interface_details(
272
        self, ids_details: Tuple[str, dict]
273
    ) -> Optional[dict]:
274
        """Update or insert interfaces details."""
275
        utc_now = datetime.utcnow()
276
        ops = []
277
        for _id, detail_dict in ids_details:
278
            ops.append(
279
                UpdateOne(
280
                    {"_id": _id},
281
                    {
282
                        "$set": InterfaceDetailDoc(
283
                            **{
284
                                **detail_dict,
285
                                **{
286
                                    "updated_at": utc_now,
287
                                    "_id": _id,
288
                                },
289
                            }
290
                        ).dict(exclude={"inserted_at"}),
291
                        "$setOnInsert": {"inserted_at": utc_now},
292
                    },
293
                    upsert=True,
294
                ),
295
            )
296
297
        with self.interface_details_lock:
298
            with self.db_client.start_session() as session:
299
                with session.start_transaction():
300
                    return self.db.interface_details.bulk_write(
301
                        ops, ordered=False, session=session
302
                    )
303
304
    def get_interfaces_details(self, interface_ids: List[str]) -> Optional[dict]:
305
        """Try to get interfaces details given a list of interface ids."""
306
        return self.db.interface_details.aggregate(
307
            [
308
                {"$match": {"_id": {"$in": interface_ids}}},
309
            ]
310
        )
311