Passed
Pull Request — master (#258)
by
unknown
03:22
created

build.controllers   A

Complexity

Total Complexity 10

Size/Duplication

Total Lines 95
Duplicated Lines 0 %

Test Coverage

Coverage 92.86%

Importance

Changes 0
Metric Value
eloc 67
dl 0
loc 95
ccs 39
cts 42
cp 0.9286
rs 10
c 0
b 0
f 0
wmc 10

5 Methods

Rating   Name   Duplication   Size   Complexity  
A ELineController.__init__() 0 4 2
A ELineController.upsert_evc() 0 23 2
A ELineController.bootstrap_indexes() 0 10 3
A ELineController.get_circuit() 0 4 1
A ELineController.get_circuits() 0 15 2
1
"""ELineController."""
2
# pylint: disable=unnecessary-lambda,invalid-name
3 1
import os
4 1
from datetime import datetime
5 1
from typing import Dict, Optional
6
7 1
import pymongo
8 1
from pydantic import ValidationError
9 1
from pymongo.collection import ReturnDocument
10 1
from pymongo.errors import AutoReconnect
11 1
from tenacity import retry_if_exception_type, stop_after_attempt, wait_random
12
13 1
from kytos.core import log
14 1
from kytos.core.db import Mongo
15 1
from kytos.core.retry import before_sleep, for_all_methods, retries
16 1
from napps.kytos.mef_eline.db.models import EVCBaseDoc
17
18
19 1
@for_all_methods(
20
    retries,
21
    stop=stop_after_attempt(
22
        int(os.environ.get("MONGO_AUTO_RETRY_STOP_AFTER_ATTEMPT", "3"))
23
    ),
24
    wait=wait_random(
25
        min=int(os.environ.get("MONGO_AUTO_RETRY_WAIT_RANDOM_MIN", "1")),
26
        max=int(os.environ.get("MONGO_AUTO_RETRY_WAIT_RANDOM_MAX", "1")),
27
    ),
28
    before_sleep=before_sleep,
29
    retry=retry_if_exception_type((AutoReconnect,)),
30
)
31 1
class ELineController:
32
    """E-Line Controller"""
33
34 1
    def __init__(self, get_mongo=lambda: Mongo()) -> None:
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable Mongo does not seem to be defined.
Loading history...
35 1
        self.mongo = get_mongo()
36 1
        self.db_client = self.mongo.client
37 1
        self.db = self.db_client[self.mongo.db_name]
38
39 1
    def bootstrap_indexes(self) -> None:
40
        """Bootstrap mef_eline relaeted indexes."""
41 1
        index_tuples = [
42
            ("evcs", [("circuit_scheduler.id", pymongo.ASCENDING)]),
43
            ("evcs", [("archived", pymongo.ASCENDING)]),
44
        ]
45 1
        for collection, keys in index_tuples:
46 1
            if self.mongo.bootstrap_index(collection, keys):
47 1
                log.info(
48
                    f"Created DB index {keys}, collection: {collection}"
49
                )
50
51 1
    def get_circuits(self, archived: Optional[bool] = False) -> Dict:
52
        """Get all circuits from database."""
53 1
        aggregation = []
54 1
        match_filters = {"$match": {}}
55 1
        if archived is not None:
56 1
            match_filters["$match"]["archived"] = archived
57 1
            aggregation.append(match_filters)
58
59 1
        aggregation.extend([
60
                {"$sort": {"_id": 1}},
61
                {"$project": EVCBaseDoc.projection()},
62
            ]
63
        )
64 1
        circuits = self.db.evcs.aggregate(aggregation)
65 1
        return {"circuits": {value["id"]: value for value in circuits}}
66
67 1
    def get_circuit(self, circuit_id: str) -> Optional[Dict]:
68
        """Get a circuit."""
69
        return self.db.evcs.find_one({"_id": circuit_id},
70
                                     EVCBaseDoc.projection())
71
72 1
    def upsert_evc(self, evc: Dict) -> Optional[Dict]:
73
        """Update or insert an EVC"""
74 1
        utc_now = datetime.utcnow()
75 1
        try:
76 1
            model = EVCBaseDoc(
77
                **{
78
                    **evc,
79
                    **{"_id": evc["id"]}
80
                }
81
            )
82
        except ValidationError as err:
83
            raise err
84
85 1
        updated = self.db.evcs.find_one_and_update(
86
            {"_id": evc["id"]},
87
            {
88
                "$set": model.dict(exclude={"inserted_at"}, exclude_none=True),
89
                "$setOnInsert": {"inserted_at": utc_now},
90
            },
91
            return_document=ReturnDocument.AFTER,
92
            upsert=True,
93
        )
94
        return updated
95