Test Failed
Pull Request — master (#56)
by Vinicius
07:13
created

LivenessController.get_enabled_interfaces()   A

Complexity

Conditions 1

Size

Total Lines 7
Code Lines 6

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 6
nop 1
dl 0
loc 7
rs 10
c 0
b 0
f 0
1
"""LivenessController."""
2
3
# pylint: disable=invalid-name
4
import os
5
from datetime import datetime
6
from typing import List
7
8
import pymongo
9
from pymongo.errors import AutoReconnect
10
from pymongo.operations import UpdateOne
11
from tenacity import retry_if_exception_type, stop_after_attempt, wait_random
12
13
from kytos.core import log
14
from kytos.core.db import Mongo
15
from kytos.core.retry import before_sleep, for_all_methods, retries
16
17
from ..db.models import LivenessDoc
18
19
20
@for_all_methods(
21
    retries,
22
    stop=stop_after_attempt(
23
        int(os.environ.get("MONGO_AUTO_RETRY_STOP_AFTER_ATTEMPT", 3))
24
    ),
25
    wait=wait_random(
26
        min=int(os.environ.get("MONGO_AUTO_RETRY_WAIT_RANDOM_MIN", 0.1)),
27
        max=int(os.environ.get("MONGO_AUTO_RETRY_WAIT_RANDOM_MAX", 1)),
28
    ),
29
    before_sleep=before_sleep,
30
    retry=retry_if_exception_type((AutoReconnect,)),
31
)
32
class LivenessController:
33
    """LivenessController."""
34
35
    def __init__(self, get_mongo=lambda: Mongo()) -> None:
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable Mongo does not seem to be defined.
Loading history...
36
        """LivenessController."""
37
        self.mongo = get_mongo()
38
        self.db_client = self.mongo.client
39
        self.db = self.db_client[self.mongo.db_name]
40
41
    def bootstrap_indexes(self) -> None:
42
        """Bootstrap all topology related indexes."""
43
        index_tuples = [
44
            ("liveness", [("enabled", pymongo.ASCENDING)]),
45
        ]
46
        for collection, keys in index_tuples:
47
            if self.mongo.bootstrap_index(collection, keys):
48
                log.info(
49
                    f"Created DB index {keys}, collection: {collection})"
50
                )
51
52
    def get_enabled_interfaces(self) -> List[dict]:
53
        """Get enabled liveness interfaces from DB."""
54
        return self.db.liveness.aggregate(
55
            [
56
                {"$match": {"enabled": True}},
57
                {"$sort": {"_id": 1}},
58
                {"$project": {"_id": 0}},
59
            ]
60
        )
61
62
    def upsert_interfaces(
63
        self, interface_ids: List[str], interface_dicts: List[dict], upsert=True
64
    ) -> int:
65
        """Update or insert liveness interfaces."""
66
        utc_now = datetime.utcnow()
67
        ops = []
68
        for interface_id, interface_dict in zip(interface_ids, interface_dicts):
69
            model = LivenessDoc(
70
                **{
71
                    **interface_dict,
72
                    **{"_id": interface_id, "updated_at": utc_now},
73
                }
74
            )
75
            payload = model.dict(exclude={"inserted_at"}, exclude_none=True)
76
            ops.append(
77
                UpdateOne(
78
                    {"_id": interface_id},
79
                    {
80
                        "$set": payload,
81
                        "$setOnInsert": {"inserted_at": utc_now},
82
                    },
83
                    upsert=upsert,
84
                )
85
            )
86
        response = self.db.liveness.bulk_write(ops)
87
        return response.upserted_count or response.modified_count
88
89
    def enable_interfaces(self, interface_ids: List[str]) -> int:
90
        """Enable liveness interfaces."""
91
        return self.upsert_interfaces(
92
            interface_ids, [{"enabled": True} for _ in interface_ids], upsert=True
93
        )
94
95
    def disable_interfaces(self, interface_ids: List[str]) -> int:
96
        """Disable liveness interfaces."""
97
        return self.upsert_interfaces(
98
            interface_ids, [{"enabled": False} for _ in interface_ids], upsert=False
99
        )
100