Passed
Pull Request — master (#84)
by Vinicius
02:44
created

TopoController._update_interface()   A

Complexity

Conditions 2

Size

Total Lines 14
Code Lines 11

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 6
CRAP Score 2

Importance

Changes 0
Metric Value
cc 2
eloc 11
nop 3
dl 0
loc 14
ccs 6
cts 6
cp 1
crap 2
rs 9.85
c 0
b 0
f 0
1
"""TopoController."""
2
3
# pylint: disable=invalid-name
4 1
from datetime import datetime
5 1
from threading import Lock
6 1
from typing import List, Optional, Tuple
7
8 1
import pymongo
9 1
from pymongo.collection import ReturnDocument
10 1
from pymongo.operations import UpdateOne
11
12 1
from kytos.core import log
13 1
from kytos.core.db import Mongo
14 1
from napps.kytos.topology.db.models import (InterfaceDetailDoc, LinkDoc,
15
                                            SwitchDoc)
16
17
18 1
class TopoController:
19
    """TopoController."""
20
21 1
    def __init__(self, get_mongo=lambda: Mongo()) -> None:
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable Mongo does not seem to be defined.
Loading history...
22
        """Constructor of TopoController."""
23 1
        self.mongo = get_mongo()
24 1
        self.db_client = self.mongo.client
25 1
        self.db = self.db_client[self.mongo.db_name]
26 1
        self.interface_details_lock = Lock()
27
28 1
    def bootstrap_indexes(self) -> None:
29
        """Bootstrap all topology related indexes."""
30 1
        index_tuples = [
31
            ("switches", [("interfaces.id", pymongo.ASCENDING)]),
32
            ("links", [("endpoints.id", pymongo.ASCENDING)]),
33
        ]
34 1
        for collection, keys in index_tuples:
35 1
            if self.mongo.bootstrap_index(collection, keys):
36 1
                log.info(
37
                    f"Created DB index {keys}, " f"collection: {collection})"
38
                )
39
40 1
    def get_topology(self) -> dict:
41
        """Get topology from DB."""
42 1
        switches = self.get_switches()
43 1
        links = self.get_links()
44 1
        return {"topology": {**links, **switches}}
45
46 1
    def get_switches(self) -> dict:
47
        """Get switches from DB."""
48 1
        switches = self.db.switches.aggregate(
49
            [
50
                {"$sort": {"_id": 1}},
51
                {"$project": SwitchDoc.projection()},
52
            ]
53
        )
54 1
        return {"switches": {value["id"]: value for value in switches}}
55
56 1
    def get_links(self) -> dict:
57
        """Get links from DB."""
58 1
        links = self.db.links.aggregate(
59
            [
60
                {"$sort": {"_id": 1}},
61
                {"$project": LinkDoc.projection()},
62
            ]
63
        )
64 1
        return {"links": {value["id"]: value for value in links}}
65
66 1
    def get_interfaces(self) -> dict:
67
        """Get interfaces from DB."""
68 1
        interfaces = self.db.switches.aggregate(
69
            [
70
                {"$sort": {"_id": 1}},
71
                {"$project": {"interfaces": 1, "_id": 0}},
72
                {"$unwind": "$interfaces"},
73
                {"$replaceRoot": {"newRoot": "$interfaces"}},
74
            ]
75
        )
76 1
        return {"interfaces": {value["id"]: value for value in interfaces}}
77
78 1
    @staticmethod
79 1
    def _set_updated_at(update_expr: dict) -> None:
80
        """Set updated_at on $set expression."""
81 1
        if "$set" in update_expr:
82 1
            update_expr["$set"].update({"updated_at": datetime.utcnow()})
83
        else:
84 1
            update_expr.update({"$set": {"updated_at": datetime.utcnow()}})
85
86 1
    def _update_switch(self, dpid: str, update_expr: dict) -> Optional[dict]:
87
        """Try to find one switch and update it given an update expression."""
88 1
        self._set_updated_at(update_expr)
89 1
        return self.db.switches.find_one_and_update({"_id": dpid}, update_expr)
90
91 1
    def upsert_switch(self, dpid: str, switch_dict: dict) -> Optional[dict]:
92
        """Update or insert switch."""
93 1
        utc_now = datetime.utcnow()
94 1
        model = SwitchDoc(
95
            **{**switch_dict, **{"_id": dpid, "updated_at": utc_now}}
96
        )
97 1
        updated = self.db.switches.find_one_and_update(
98
            {"_id": dpid},
99
            {
100
                "$set": model.dict(exclude={"inserted_at"}),
101
                "$setOnInsert": {"inserted_at": utc_now},
102
            },
103
            return_document=ReturnDocument.AFTER,
104
            upsert=True,
105
        )
106 1
        return updated
107
108 1
    def enable_switch(self, dpid: str) -> Optional[dict]:
109
        """Try to find one switch and enable it."""
110 1
        return self._update_switch(dpid, {"$set": {"enabled": True}})
111
112 1
    def deactivate_switch(self, dpid: str) -> Optional[dict]:
113
        """Try to find one switch and deactivate it."""
114 1
        return self._update_switch(dpid, {"$set": {"active": False}})
115
116 1
    def disable_switch(self, dpid: str) -> Optional[dict]:
117
        """Try to find one switch and disable it."""
118 1
        return self._update_switch(
119
            dpid, {"$set": {"enabled": False, "interfaces.$[].enabled": False}}
120
        )
121
122 1
    def add_switch_metadata(self, dpid: str, metadata: dict) -> Optional[dict]:
123
        """Try to find a switch and add to its metadata."""
124 1
        update_expr = {
125
            "$set": {f"metadata.{k}": v for k, v in metadata.items()}
126
        }
127 1
        return self._update_switch(dpid, update_expr)
128
129 1
    def delete_switch_metadata_key(
130
        self, dpid: str, key: str
131
    ) -> Optional[dict]:
132
        """Try to find a switch and delete a metadata key."""
133 1
        return self._update_switch(dpid, {"$unset": {f"metadata.{key}": ""}})
134
135 1
    def enable_interface(self, interface_id: str) -> Optional[dict]:
136
        """Try to enable one interface and its embedded object on links."""
137 1
        return self._update_interface(
138
            interface_id, {"$set": {"enabled": True}}
139
        )
140
141 1
    def disable_interface(self, interface_id: str) -> Optional[dict]:
142
        """Try to disable one interface and its embedded object on links."""
143 1
        return self._update_interface(
144
            interface_id, {"$set": {"enabled": False}}
145
        )
146
147 1
    def activate_interface(self, interface_id: str) -> Optional[dict]:
148
        """Try to activate one interface."""
149 1
        return self._update_interface(interface_id, {"$set": {"active": True}})
150
151 1
    def deactivate_interface(self, interface_id: str) -> Optional[dict]:
152
        """Try to deactivate one interface."""
153 1
        return self._update_interface(
154
            interface_id, {"$set": {"active": False}}
155
        )
156
157 1
    def enable_interface_lldp(self, interface_id: str) -> Optional[dict]:
158
        """Try to enable LLDP one interface."""
159 1
        return self._update_interface(interface_id, {"$set": {"lldp": True}})
160
161 1
    def disable_interface_lldp(self, interface_id: str) -> Optional[dict]:
162
        """Try to disable LLDP one interface."""
163 1
        return self._update_interface(interface_id, {"$set": {"lldp": False}})
164
165 1
    def add_interface_metadata(
166
        self, interface_id: str, metadata: dict
167
    ) -> Optional[dict]:
168
        """Try to find an interface and add to its metadata."""
169 1
        update_expr = {
170
            "$set": {f"metadata.{k}": v for k, v in metadata.items()}
171
        }
172 1
        return self._update_interface(interface_id, update_expr)
173
174 1
    def delete_interface_metadata_key(
175
        self, interface_id: str, key: str
176
    ) -> Optional[dict]:
177
        """Try to find an interface and delete a metadata key."""
178 1
        return self._update_interface(
179
            interface_id, {"$unset": {f"metadata.{key}": ""}}
180
        )
181
182 1
    def _update_interface(
183
        self, interface_id: str, update_expr: dict
184
    ) -> Optional[dict]:
185
        """Try to update one interface and its embedded object on links."""
186 1
        self._set_updated_at(update_expr)
187 1
        interfaces_expression = {}
188 1
        for operator, values in update_expr.items():
189 1
            interfaces_expression[operator] = {
190
                f"interfaces.$.{k}": v for k, v in values.items()
191
            }
192 1
        return self.db.switches.find_one_and_update(
193
            {"interfaces.id": interface_id},
194
            interfaces_expression,
195
            return_document=ReturnDocument.AFTER,
196
        )
197
198 1
    def upsert_link(self, link_id: str, link_dict: dict) -> dict:
199
        """Update or insert a Link."""
200 1
        utc_now = datetime.utcnow()
201
202 1
        endpoint_a = link_dict.get("endpoint_a")
203 1
        endpoint_b = link_dict.get("endpoint_b")
204 1
        model = LinkDoc(
205
            **{
206
                **link_dict,
207
                **{
208
                    "updated_at": utc_now,
209
                    "_id": link_id,
210
                    "endpoints": [endpoint_a, endpoint_b],
211
                },
212
            }
213
        )
214 1
        updated = self.db.links.find_one_and_update(
215
            {"_id": link_id},
216
            {
217
                "$set": model.dict(exclude={"inserted_at"}),
218
                "$setOnInsert": {"inserted_at": utc_now},
219
            },
220
            return_document=ReturnDocument.AFTER,
221
            upsert=True,
222
        )
223 1
        self.db.switches.find_one_and_update(
224
            {"interfaces.id": endpoint_a},
225
            {
226
                "$set": {
227
                    "interfaces.$.link_id": link_id,
228
                    "interfaces.$.link_side": "endpoint_a",
229
                    "updated_at": utc_now,
230
                }
231
            },
232
        )
233 1
        self.db.switches.find_one_and_update(
234
            {"interfaces.id": endpoint_b},
235
            {
236
                "$set": {
237
                    "interfaces.$.link_id": link_id,
238
                    "interfaces.$.link_side": "endpoint_b",
239
                    "updated_at": utc_now,
240
                }
241
            },
242
        )
243 1
        return updated
244
245 1
    def _update_link(self, link_id: str, update_expr: dict) -> Optional[dict]:
246
        """Try to find one link and update it given an update expression."""
247 1
        self._set_updated_at(update_expr)
248 1
        return self.db.links.find_one_and_update({"_id": link_id}, update_expr)
249
250 1
    def enable_link(self, link_id: str) -> Optional[dict]:
251
        """Try to find one link and enable it."""
252 1
        return self._update_link(link_id, {"$set": {"enabled": True}})
253
254 1
    def disable_link(self, link_id: str) -> Optional[dict]:
255
        """Try to find one link and disable it."""
256 1
        return self._update_link(link_id, {"$set": {"enabled": False}})
257
258 1
    def add_link_metadata(
259
        self, link_id: str, metadata: dict
260
    ) -> Optional[dict]:
261
        """Try to find link and add to its metadata."""
262 1
        update_expr = {
263
            "$set": {f"metadata.{k}": v for k, v in metadata.items()}
264
        }
265 1
        return self._update_link(link_id, update_expr)
266
267 1
    def delete_link_metadata_key(
268
        self, link_id: str, key: str
269
    ) -> Optional[dict]:
270
        """Try to find a link and delete a metadata key."""
271 1
        return self._update_link(link_id, {"$unset": {f"metadata.{key}": ""}})
272
273 1
    def bulk_upsert_interface_details(
274
        self, ids_details: List[Tuple[str, dict]]
275
    ) -> Optional[dict]:
276
        """Update or insert interfaces details."""
277 1
        utc_now = datetime.utcnow()
278 1
        ops = []
279 1
        for _id, detail_dict in ids_details:
280 1
            ops.append(
281
                UpdateOne(
282
                    {"_id": _id},
283
                    {
284
                        "$set": InterfaceDetailDoc(
285
                            **{
286
                                **detail_dict,
287
                                **{
288
                                    "updated_at": utc_now,
289
                                    "_id": _id,
290
                                },
291
                            }
292
                        ).dict(exclude={"inserted_at"}),
293
                        "$setOnInsert": {"inserted_at": utc_now},
294
                    },
295
                    upsert=True,
296
                ),
297
            )
298
299 1
        with self.interface_details_lock:
300 1
            with self.db_client.start_session() as session:
301 1
                with session.start_transaction():
302 1
                    return self.db.interface_details.bulk_write(
303
                        ops, ordered=False, session=session
304
                    )
305
306 1
    def get_interfaces_details(
307
        self, interface_ids: List[str]
308
    ) -> Optional[dict]:
309
        """Try to get interfaces details given a list of interface ids."""
310 1
        return self.db.interface_details.aggregate(
311
            [
312
                {"$match": {"_id": {"$in": interface_ids}}},
313
            ]
314
        )
315