Passed
Pull Request — master (#84)
by Vinicius
05:23 queued 01:50
created

build.controllers   A

Complexity

Total Complexity 39

Size/Duplication

Total Lines 295
Duplicated Lines 0 %

Test Coverage

Coverage 100%

Importance

Changes 0
Metric Value
eloc 188
dl 0
loc 295
ccs 118
cts 118
cp 1
rs 9.28
c 0
b 0
f 0
wmc 39

31 Methods

Rating   Name   Duplication   Size   Complexity  
A TopoController.add_interface_metadata() 0 6 1
A TopoController.enable_switch() 0 3 1
A TopoController.disable_interface() 0 3 1
A TopoController.bootstrap_indexes() 0 10 3
A TopoController.get_topology() 0 5 1
A TopoController.get_links() 0 9 1
A TopoController.add_switch_metadata() 0 4 1
A TopoController._update_switch() 0 4 1
A TopoController.add_link_metadata() 0 4 1
A TopoController.disable_switch() 0 4 1
A TopoController.delete_link_metadata_key() 0 3 1
A TopoController.get_switches() 0 9 1
A TopoController.deactivate_switch() 0 3 1
A TopoController.delete_switch_metadata_key() 0 3 1
A TopoController._update_interface() 0 12 2
A TopoController.activate_interface() 0 3 1
A TopoController.enable_interface_lldp() 0 3 1
A TopoController.enable_link() 0 3 1
A TopoController._update_link() 0 4 1
A TopoController.disable_link() 0 3 1
A TopoController._set_updated_at() 0 6 2
A TopoController.enable_interface() 0 3 1
A TopoController.__init__() 0 9 1
B TopoController.bulk_upsert_interface_details() 0 31 5
A TopoController.delete_interface_metadata_key() 0 5 1
A TopoController.upsert_switch() 0 14 1
A TopoController.get_interfaces() 0 11 1
A TopoController.deactivate_interface() 0 3 1
A TopoController.upsert_link() 0 46 1
A TopoController.get_interfaces_details() 0 5 1
A TopoController.disable_interface_lldp() 0 3 1
1
"""TopoController."""
2 1
import os
3
4 1
from typing import List
5 1
from typing import Optional
6 1
from typing import Tuple
7 1
from threading import Lock
8
9 1
from datetime import datetime
10
11 1
from napps.kytos.topology.db.client import mongo_client
12 1
from napps.kytos.topology.db.client import bootstrap_index
13 1
from napps.kytos.topology.db.models import SwitchDoc
14 1
from napps.kytos.topology.db.models import LinkDoc
15 1
from napps.kytos.topology.db.models import InterfaceDetailDoc
16
17 1
from kytos.core import log
18 1
import pymongo
19 1
from pymongo.collection import ReturnDocument
20 1
from pymongo.operations import UpdateOne
21
22
23 1
class TopoController:
24
    """TopoController."""
25
26 1
    def __init__(self, db_client=mongo_client, db_client_options=None) -> None:
27
        """Constructor of TopoController."""
28 1
        client_kwargs = db_client_options or {}
29 1
        db_name = client_kwargs.get("database") or os.environ.get(
30
            "MONGO_DBNAME", "napps"
31
        )
32 1
        self.db_client = db_client(**client_kwargs)
33 1
        self.db = self.db_client[db_name]
34 1
        self.interface_details_lock = Lock()
35
36 1
    def bootstrap_indexes(self) -> None:
37
        """Bootstrap all topology related indexes."""
38 1
        index_tuples = [
39
            ("switches", "interfaces.id", pymongo.ASCENDING),
40
            ("links", "endpoints.id", pymongo.ASCENDING),
41
        ]
42 1
        for collection, index, direction in index_tuples:
43 1
            if bootstrap_index(self.db, collection, index, direction):
44 1
                log.info(
45
                    f"Created DB index ({index}, {direction}), "
46
                    f"collection: {collection})"
47
                )
48
49 1
    def get_topology(self) -> dict:
50
        """Get topology from DB."""
51 1
        switches = self.get_switches()
52 1
        links = self.get_links()
53 1
        return {"topology": {**links, **switches}}
54
55 1
    def get_switches(self) -> dict:
56
        """Get switches from DB."""
57 1
        switches = self.db.switches.aggregate(
58
            [
59
                {"$sort": {"_id": 1}},
60
                {"$project": SwitchDoc.projection()},
61
            ]
62
        )
63 1
        return {"switches": {value["id"]: value for value in switches}}
64
65 1
    def get_links(self) -> dict:
66
        """Get links from DB."""
67 1
        links = self.db.links.aggregate(
68
            [
69
                {"$sort": {"_id": 1}},
70
                {"$project": LinkDoc.projection()},
71
            ]
72
        )
73 1
        return {"links": {value["id"]: value for value in links}}
74
75 1
    def get_interfaces(self) -> dict:
76
        """Get interfaces from DB."""
77 1
        interfaces = self.db.switches.aggregate(
78
            [
79
                {"$sort": {"_id": 1}},
80
                {"$project": {"interfaces": 1, "_id": 0}},
81
                {"$unwind": "$interfaces"},
82
                {"$replaceRoot": {"newRoot": "$interfaces"}},
83
            ]
84
        )
85 1
        return {"interfaces": {value["id"]: value for value in interfaces}}
86
87 1
    def _set_updated_at(self, update_expr: dict) -> None:
88
        """Set updated_at on $set expression."""
89 1
        if "$set" in update_expr:
90 1
            update_expr["$set"].update({"updated_at": datetime.utcnow()})
91
        else:
92 1
            update_expr.update({"$set": {"updated_at": datetime.utcnow()}})
93
94 1
    def _update_switch(self, dpid: str, update_expr: dict) -> Optional[dict]:
95
        """Try to find one switch and update it given an update expression."""
96 1
        self._set_updated_at(update_expr)
97 1
        return self.db.switches.find_one_and_update({"_id": dpid}, update_expr)
98
99 1
    def upsert_switch(self, dpid: str, switch_dict: dict) -> Optional[dict]:
100
        """Update or insert switch."""
101 1
        utc_now = datetime.utcnow()
102 1
        model = SwitchDoc(**{**switch_dict, **{"_id": dpid, "updated_at": utc_now}})
103 1
        updated = self.db.switches.find_one_and_update(
104
            {"_id": dpid},
105
            {
106
                "$set": model.dict(exclude={"inserted_at"}),
107
                "$setOnInsert": {"inserted_at": utc_now},
108
            },
109
            return_document=ReturnDocument.AFTER,
110
            upsert=True,
111
        )
112 1
        return updated
113
114 1
    def enable_switch(self, dpid: str) -> Optional[dict]:
115
        """Try to find one switch and enable it."""
116 1
        return self._update_switch(dpid, {"$set": {"enabled": True}})
117
118 1
    def deactivate_switch(self, dpid: str) -> Optional[dict]:
119
        """Try to find one switch and deactivate it."""
120 1
        return self._update_switch(dpid, {"$set": {"active": False}})
121
122 1
    def disable_switch(self, dpid: str) -> Optional[dict]:
123
        """Try to find one switch and disable it."""
124 1
        return self._update_switch(
125
            dpid, {"$set": {"enabled": False, "interfaces.$[].enabled": False}}
126
        )
127
128 1
    def add_switch_metadata(self, dpid: str, metadata: dict) -> Optional[dict]:
129
        """Try to find a switch and add to its metadata."""
130 1
        update_expr = {"$set": {f"metadata.{k}": v for k, v in metadata.items()}}
131 1
        return self._update_switch(dpid, update_expr)
132
133 1
    def delete_switch_metadata_key(self, dpid: str, key: str) -> Optional[dict]:
134
        """Try to find a switch and delete a metadata key."""
135 1
        return self._update_switch(dpid, {"$unset": {f"metadata.{key}": ""}})
136
137 1
    def enable_interface(self, interface_id: str) -> Optional[dict]:
138
        """Try to enable one interface and its embedded object on links."""
139 1
        return self._update_interface(interface_id, {"$set": {"enabled": True}})
140
141 1
    def disable_interface(self, interface_id: str) -> Optional[dict]:
142
        """Try to disable one interface and its embedded object on links."""
143 1
        return self._update_interface(interface_id, {"$set": {"enabled": False}})
144
145 1
    def activate_interface(self, interface_id: str) -> Optional[dict]:
146
        """Try to activate one interface."""
147 1
        return self._update_interface(interface_id, {"$set": {"active": True}})
148
149 1
    def deactivate_interface(self, interface_id: str) -> Optional[dict]:
150
        """Try to deactivate one interface."""
151 1
        return self._update_interface(interface_id, {"$set": {"active": False}})
152
153 1
    def enable_interface_lldp(self, interface_id: str) -> Optional[dict]:
154
        """Try to enable LLDP one interface."""
155 1
        return self._update_interface(interface_id, {"$set": {"lldp": True}})
156
157 1
    def disable_interface_lldp(self, interface_id: str) -> Optional[dict]:
158
        """Try to disable LLDP one interface."""
159 1
        return self._update_interface(interface_id, {"$set": {"lldp": False}})
160
161 1
    def add_interface_metadata(
162
        self, interface_id: str, metadata: dict
163
    ) -> Optional[dict]:
164
        """Try to find an interface and add to its metadata."""
165 1
        update_expr = {"$set": {f"metadata.{k}": v for k, v in metadata.items()}}
166 1
        return self._update_interface(interface_id, update_expr)
167
168 1
    def delete_interface_metadata_key(
169
        self, interface_id: str, key: str
170
    ) -> Optional[dict]:
171
        """Try to find an interface and delete a metadata key."""
172 1
        return self._update_interface(interface_id, {"$unset": {f"metadata.{key}": ""}})
173
174 1
    def _update_interface(self, interface_id: str, update_expr: dict) -> Optional[dict]:
175
        """Try to update one interface and its embedded object on links."""
176 1
        self._set_updated_at(update_expr)
177 1
        interfaces_expression = {}
178 1
        for operator, values in update_expr.items():
179 1
            interfaces_expression[operator] = {
180
                f"interfaces.$.{k}": v for k, v in values.items()
181
            }
182 1
        return self.db.switches.find_one_and_update(
183
            {"interfaces.id": interface_id},
184
            interfaces_expression,
185
            return_document=ReturnDocument.AFTER,
186
        )
187
188 1
    def upsert_link(self, link_id: str, link_dict: dict) -> dict:
189
        """Update or insert a Link."""
190 1
        utc_now = datetime.utcnow()
191
192 1
        endpoint_a = link_dict.get("endpoint_a")
193 1
        endpoint_b = link_dict.get("endpoint_b")
194 1
        model = LinkDoc(
195
            **{
196
                **link_dict,
197
                **{
198
                    "updated_at": utc_now,
199
                    "_id": link_id,
200
                    "endpoints": [endpoint_a, endpoint_b],
201
                },
202
            }
203
        )
204 1
        updated = self.db.links.find_one_and_update(
205
            {"_id": link_id},
206
            {
207
                "$set": model.dict(exclude={"inserted_at"}),
208
                "$setOnInsert": {"inserted_at": utc_now},
209
            },
210
            return_document=ReturnDocument.AFTER,
211
            upsert=True,
212
        )
213 1
        self.db.switches.find_one_and_update(
214
            {"interfaces.id": endpoint_a},
215
            {
216
                "$set": {
217
                    "interfaces.$.link_id": link_id,
218
                    "interfaces.$.link_side": "endpoint_a",
219
                    "updated_at": utc_now,
220
                }
221
            },
222
        )
223 1
        self.db.switches.find_one_and_update(
224
            {"interfaces.id": endpoint_b},
225
            {
226
                "$set": {
227
                    "interfaces.$.link_id": link_id,
228
                    "interfaces.$.link_side": "endpoint_b",
229
                    "updated_at": utc_now,
230
                }
231
            },
232
        )
233 1
        return updated
234
235 1
    def _update_link(self, link_id: str, update_expr: dict) -> Optional[dict]:
236
        """Try to find one link and update it given an update expression."""
237 1
        self._set_updated_at(update_expr)
238 1
        return self.db.links.find_one_and_update({"_id": link_id}, update_expr)
239
240 1
    def enable_link(self, link_id: str) -> Optional[dict]:
241
        """Try to find one link and enable it."""
242 1
        return self._update_link(link_id, {"$set": {"enabled": True}})
243
244 1
    def disable_link(self, link_id: str) -> Optional[dict]:
245
        """Try to find one link and disable it."""
246 1
        return self._update_link(link_id, {"$set": {"enabled": False}})
247
248 1
    def add_link_metadata(self, link_id: str, metadata: dict) -> Optional[dict]:
249
        """Try to find link and add to its metadata."""
250 1
        update_expr = {"$set": {f"metadata.{k}": v for k, v in metadata.items()}}
251 1
        return self._update_link(link_id, update_expr)
252
253 1
    def delete_link_metadata_key(self, link_id: str, key: str) -> Optional[dict]:
254
        """Try to find a link and delete a metadata key."""
255 1
        return self._update_link(link_id, {"$unset": {f"metadata.{key}": ""}})
256
257 1
    def bulk_upsert_interface_details(
258
        self, ids_details: List[Tuple[str, dict]]
259
    ) -> Optional[dict]:
260
        """Update or insert interfaces details."""
261 1
        utc_now = datetime.utcnow()
262 1
        ops = []
263 1
        for _id, detail_dict in ids_details:
264 1
            ops.append(
265
                UpdateOne(
266
                    {"_id": _id},
267
                    {
268
                        "$set": InterfaceDetailDoc(
269
                            **{
270
                                **detail_dict,
271
                                **{
272
                                    "updated_at": utc_now,
273
                                    "_id": _id,
274
                                },
275
                            }
276
                        ).dict(exclude={"inserted_at"}),
277
                        "$setOnInsert": {"inserted_at": utc_now},
278
                    },
279
                    upsert=True,
280
                ),
281
            )
282
283 1
        with self.interface_details_lock:
284 1
            with self.db_client.start_session() as session:
285 1
                with session.start_transaction():
286 1
                    return self.db.interface_details.bulk_write(
287
                        ops, ordered=False, session=session
288
                    )
289
290 1
    def get_interfaces_details(self, interface_ids: List[str]) -> Optional[dict]:
291
        """Try to get interfaces details given a list of interface ids."""
292 1
        return self.db.interface_details.aggregate(
293
            [
294
                {"$match": {"_id": {"$in": interface_ids}}},
295
            ]
296
        )
297