build.controllers   A
last analyzed

Complexity

Total Complexity 21

Size/Duplication

Total Lines 174
Duplicated Lines 0 %

Test Coverage

Coverage 85.53%

Importance

Changes 0
Metric Value
eloc 126
dl 0
loc 174
ccs 65
cts 76
cp 0.8553
rs 10
c 0
b 0
f 0
wmc 21

8 Methods

Rating   Name   Duplication   Size   Complexity  
A ELineController.__init__() 0 4 2
A ELineController.bootstrap_indexes() 0 10 3
A ELineController.get_circuit() 0 4 1
B ELineController.get_circuits() 0 26 6
A ELineController.update_evc() 0 19 1
A ELineController.upsert_evc() 0 20 1
A ELineController.update_evcs() 0 26 3
A ELineController.update_evcs_metadata() 0 23 4
1
"""ELineController."""
2
# pylint: disable=unnecessary-lambda,invalid-name
3 1
import os
4 1
from datetime import datetime
5 1
from typing import Dict, Optional
6
7 1
import pymongo
8 1
from pymongo.collection import ReturnDocument
9 1
from pymongo.errors import ConnectionFailure, ExecutionTimeout
10 1
from pymongo.operations import UpdateOne
11 1
from tenacity import retry_if_exception_type, stop_after_attempt, wait_random
12
13 1
from kytos.core import log
14 1
from kytos.core.db import Mongo
15 1
from kytos.core.retry import before_sleep, for_all_methods, retries
16 1
from napps.kytos.mef_eline.db.models import EVCBaseDoc, EVCUpdateDoc
17
18
19 1
@for_all_methods(
20
    retries,
21
    stop=stop_after_attempt(
22
        int(os.environ.get("MONGO_AUTO_RETRY_STOP_AFTER_ATTEMPT", "3"))
23
    ),
24
    wait=wait_random(
25
        min=int(os.environ.get("MONGO_AUTO_RETRY_WAIT_RANDOM_MIN", "1")),
26
        max=int(os.environ.get("MONGO_AUTO_RETRY_WAIT_RANDOM_MAX", "1")),
27
    ),
28
    before_sleep=before_sleep,
29
    retry=retry_if_exception_type((ConnectionFailure, ExecutionTimeout)),
30
)
31 1
class ELineController:
32
    """E-Line Controller"""
33
34 1
    def __init__(self, get_mongo=lambda: Mongo()) -> None:
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable Mongo does not seem to be defined.
Loading history...
35 1
        self.mongo = get_mongo()
36 1
        self.db_client = self.mongo.client
37 1
        self.db = self.db_client[self.mongo.db_name]
38
39 1
    def bootstrap_indexes(self) -> None:
40
        """Bootstrap mef_eline relaeted indexes."""
41 1
        index_tuples = [
42
            ("evcs", [("circuit_scheduler.id", pymongo.ASCENDING)]),
43
            ("evcs", [("archived", pymongo.ASCENDING)]),
44
        ]
45 1
        for collection, keys in index_tuples:
46 1
            if self.mongo.bootstrap_index(collection, keys):
47 1
                log.info(
48
                    f"Created DB index {keys}, collection: {collection}"
49
                )
50
51 1
    def get_circuits(self, archived: Optional[bool] = False,
52
                     metadata: dict = None) -> Dict:
53
        """Get all circuits from database."""
54 1
        aggregation = []
55 1
        options = {"null": None, "true": True, "false": False}
56 1
        match_filters = {"$match": {}}
57 1
        aggregation.append(match_filters)
58 1
        if archived is not None:
59 1
            archived = options.get(archived, False)
60 1
            match_filters["$match"]["archived"] = archived
61 1
        if metadata:
62 1
            for key in metadata:
63 1
                if "metadata." in key[:9]:
64 1
                    try:
65 1
                        match_filters["$match"][key] = int(metadata[key])
66
                    except ValueError:
67
                        item = metadata[key]
68
                        item = options.get(item.lower(), item)
69
                        match_filters["$match"][key] = item
70 1
        aggregation.extend([
71
                {"$sort": {"_id": 1}},
72
                {"$project": EVCBaseDoc.projection()},
73
            ]
74
        )
75 1
        circuits = self.db.evcs.aggregate(aggregation)
76 1
        return {"circuits": {value["id"]: value for value in circuits}}
77
78 1
    def get_circuit(self, circuit_id: str) -> Optional[Dict]:
79
        """Get a circuit."""
80
        return self.db.evcs.find_one({"_id": circuit_id},
81
                                     EVCBaseDoc.projection())
82
83 1
    def upsert_evc(self, evc: Dict) -> Optional[Dict]:
84
        """Update or insert an EVC"""
85 1
        utc_now = datetime.utcnow()
86 1
        model = EVCBaseDoc(
87
            **{
88
                **evc,
89
                **{"_id": evc["id"]}
90
            }
91
        ).model_dump(exclude={"inserted_at"}, exclude_none=True)
92 1
        model.setdefault("queue_id", None)
93 1
        updated = self.db.evcs.find_one_and_update(
94
            {"_id": evc["id"]},
95
            {
96
                "$set": model,
97
                "$setOnInsert": {"inserted_at": utc_now},
98
            },
99
            return_document=ReturnDocument.AFTER,
100
            upsert=True,
101
        )
102 1
        return updated
103
104 1
    def update_evc(self, evc: Dict) -> Optional[Dict]:
105
        """Update an EVC.
106
        This is needed to correctly set None values to fields"""
107
108
        # Check for errors in fields only.
109
        EVCUpdateDoc(
110
            **{
111
                **evc,
112
                **{"_id": evc["id"]}
113
            }
114
        )
115
        updated = self.db.evcs.find_one_and_update(
116
            {"_id": evc["id"]},
117
            {
118
                "$set": evc,
119
            },
120
            return_document=ReturnDocument.AFTER,
121
        )
122
        return updated
123
124 1
    def update_evcs(self, evcs: list[dict]) -> int:
125
        """Update EVCs and return the number of modified documents."""
126 1
        if not evcs:
127
            return 0
128
129 1
        ops = []
130 1
        utc_now = datetime.utcnow()
131
132 1
        for evc in evcs:
133 1
            evc["updated_at"] = utc_now
134 1
            model = EVCBaseDoc(
135
                **{
136
                    **evc,
137
                    **{"_id": evc["id"]}
138
                }
139
            ).model_dump(exclude_none=True)
140 1
            ops.append(
141
                UpdateOne(
142
                    {"_id": evc["id"]},
143
                    {
144
                        "$set": model,
145
                        "$setOnInsert": {"inserted_at": utc_now}
146
                    },
147
                )
148
            )
149 1
        return self.db.evcs.bulk_write(ops).modified_count
150
151 1
    def update_evcs_metadata(
152
        self, circuit_ids: list, metadata: dict, action: str
153
    ):
154
        """Bulk update EVCs metadata."""
155 1
        utc_now = datetime.utcnow()
156 1
        metadata = {f"metadata.{k}": v for k, v in metadata.items()}
157 1
        if action == "add":
158 1
            payload = {"$set": metadata}
159
        elif action == "del":
160
            payload = {"$unset": metadata}
161 1
        ops = []
162 1
        for _id in circuit_ids:
163 1
            ops.append(
164
                UpdateOne(
165
                    {"_id": _id},
166
                    {
167
                        **payload,
0 ignored issues
show
introduced by
The variable payload does not seem to be defined for all execution paths.
Loading history...
168
                        "$setOnInsert": {"inserted_at": utc_now}
169
                    },
170
                    upsert=False,
171
                )
172
            )
173
        return self.db.evcs.bulk_write(ops).modified_count
174