Passed
Pull Request — master (#224)
by
unknown
03:21
created

build.controllers.ELineController.upsert_evc()   A

Complexity

Conditions 2

Size

Total Lines 21
Code Lines 16

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 6
CRAP Score 2.0116

Importance

Changes 0
Metric Value
eloc 16
dl 0
loc 21
ccs 6
cts 7
cp 0.8571
rs 9.6
c 0
b 0
f 0
cc 2
nop 2
crap 2.0116
1
"""ELineController."""
2
# pylint: disable=unnecessary-lambda,invalid-name
3 1
import os
4 1
from datetime import datetime
5 1
from typing import Dict, Optional
6
7 1
import pymongo
8 1
from pymongo.collection import ReturnDocument
9 1
from pymongo.errors import AutoReconnect
10 1
from tenacity import retry_if_exception_type, stop_after_attempt, wait_random
11
12 1
from kytos.core import log
13 1
from kytos.core.db import Mongo
14 1
from kytos.core.retry import before_sleep, for_all_methods, retries
15 1
from napps.kytos.mef_eline.db.models import EVCBaseDoc
16
17
18 1
@for_all_methods(
19
    retries,
20
    stop=stop_after_attempt(
21
        int(os.environ.get("MONGO_AUTO_RETRY_STOP_AFTER_ATTEMPT", "3"))
22
    ),
23
    wait=wait_random(
24
        min=int(os.environ.get("MONGO_AUTO_RETRY_WAIT_RANDOM_MIN", "1")),
25
        max=int(os.environ.get("MONGO_AUTO_RETRY_WAIT_RANDOM_MAX", "1")),
26
    ),
27
    before_sleep=before_sleep,
28
    retry=retry_if_exception_type((AutoReconnect,)),
29
)
30 1
class ELineController:
31
    """E-Line Controller"""
32
33 1
    def __init__(self, get_mongo=lambda: Mongo()) -> None:
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable Mongo does not seem to be defined.
Loading history...
34 1
        self.mongo = get_mongo()
35 1
        self.db_client = self.mongo.client
36 1
        self.db = self.db_client[self.mongo.db_name]
37
38 1
    def bootstrap_indexes(self) -> None:
39
        """Bootstrap mef_eline relaeted indexes."""
40 1
        index_tuples = [
41
            ("evcs", [("circuit_scheduler.id", pymongo.ASCENDING)]),
42
            ("evcs", [("archived", pymongo.ASCENDING)]),
43
        ]
44 1
        for collection, keys in index_tuples:
45 1
            if self.mongo.bootstrap_index(collection, keys):
46 1
                log.info(
47
                    f"Created DB index {keys}, collection: {collection}"
48
                )
49
50 1
    def get_circuits(self, archived: Optional[bool] = False) -> Dict:
51
        """Get all circuits from database."""
52 1
        aggregation = []
53 1
        match_filters = {"$match": {}}
54 1
        if archived is not None:
55 1
            match_filters["$match"]["archived"] = archived
56 1
            aggregation.append(match_filters)
57
58 1
        aggregation.extend([
59
                {"$sort": {"_id": 1}},
60
                {"$project": EVCBaseDoc.projection()},
61
            ]
62
        )
63 1
        circuits = self.db.evcs.aggregate(aggregation)
64 1
        return {"circuits": {value["id"]: value for value in circuits}}
65
66 1
    def get_circuit(self, circuit_id: str) -> Optional[Dict]:
67
        """Get a circuit."""
68
        return self.db.evcs.find_one({"_id": circuit_id},
69
                                     EVCBaseDoc.projection())
70
71 1
    def upsert_evc(self, evc: Dict) -> Optional[Dict]:
72
        """Update or insert an EVC"""
73 1
        utc_now = datetime.utcnow()
74 1
        model = EVCBaseDoc(
75
            **{
76
                **evc,
77
                **{"_id": evc["id"], "updated_at": utc_now}
78
            }
79
        )
80 1
        updated = self.db.evcs.find_one_and_update(
81
            {"_id": evc["id"]},
82
            {
83
                "$set": model.dict(exclude={"inserted_at"}, exclude_none=True),
84
                "$setOnInsert": {"inserted_at": utc_now},
85
            },
86
            return_document=ReturnDocument.AFTER,
87
            upsert=True,
88
        )
89 1
        if updated is not None:
90 1
            return utc_now
91
        return updated
92
93 1
    def get_circuits_by_update_date(self, dt: datetime) -> Optional[Dict]:
94
        """Filter for EVCs where updated_at is lte to a given datetime"""
95 1
        return self.db.evcs.find(
96
            {
97
                "updated_at": {"$lte": dt}
98
            }
99
        )
100