Passed
Pull Request — master (#121)
by Vinicius
04:01
created

LivenessController.delete_interfaces()   A

Complexity

Conditions 2

Size

Total Lines 10
Code Lines 8

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 5
CRAP Score 2.0185

Importance

Changes 0
Metric Value
cc 2
eloc 8
nop 2
dl 0
loc 10
ccs 5
cts 6
cp 0.8333
crap 2.0185
rs 10
c 0
b 0
f 0
1
"""LivenessController."""
2
3
# pylint: disable=invalid-name
4 1
import os
5 1
from datetime import datetime
6 1
from typing import List, Optional
7
8 1
import pymongo
9 1
from pymongo.errors import ConnectionFailure, ExecutionTimeout
10 1
from pymongo.operations import DeleteOne, UpdateOne
11 1
from tenacity import retry_if_exception_type, stop_after_attempt, wait_random
12
13 1
from kytos.core import log
14 1
from kytos.core.db import Mongo
15 1
from kytos.core.retry import before_sleep, for_all_methods, retries
16
17 1
from ..db.models import LivenessDoc
18
19
20 1
@for_all_methods(
21
    retries,
22
    stop=stop_after_attempt(
23
        int(os.environ.get("MONGO_AUTO_RETRY_STOP_AFTER_ATTEMPT", 3))
24
    ),
25
    wait=wait_random(
26
        min=int(os.environ.get("MONGO_AUTO_RETRY_WAIT_RANDOM_MIN", 0.1)),
27
        max=int(os.environ.get("MONGO_AUTO_RETRY_WAIT_RANDOM_MAX", 1)),
28
    ),
29
    before_sleep=before_sleep,
30
    retry=retry_if_exception_type((ConnectionFailure, ExecutionTimeout)),
31
)
32 1
class LivenessController:
33
    """LivenessController."""
34
35 1
    def __init__(self, get_mongo=lambda: Mongo()) -> None:
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable Mongo does not seem to be defined.
Loading history...
36
        """LivenessController."""
37 1
        self.mongo = get_mongo()
38 1
        self.db_client = self.mongo.client
39 1
        self.db = self.db_client[self.mongo.db_name]
40
41 1
    def bootstrap_indexes(self) -> None:
42
        """Bootstrap all topology related indexes."""
43 1
        index_tuples = [
44
            ("liveness", [("enabled", pymongo.ASCENDING)]),
45
        ]
46 1
        for collection, keys in index_tuples:
47 1
            if self.mongo.bootstrap_index(collection, keys):
48 1
                log.info(f"Created DB index {keys}, collection: {collection})")
49
50 1
    def get_enabled_interfaces(self) -> List[dict]:
51
        """Get enabled liveness interfaces from DB."""
52 1
        return self.db.liveness.aggregate(
53
            [
54
                {"$match": {"enabled": True}},
55
                {"$sort": {"_id": 1}},
56
                {"$project": {"_id": 0}},
57
            ]
58
        )
59
60 1
    def upsert_interfaces(
61
        self,
62
        interface_ids: List[str],
63
        interface_dicts: List[dict],
64
        upsert=True,
65
    ) -> int:
66
        """Update or insert liveness interfaces."""
67 1
        utc_now = datetime.utcnow()
68 1
        ops = []
69 1
        for interface_id, interface_dict in zip(
70
            interface_ids, interface_dicts
71
        ):
72 1
            model = LivenessDoc(
73
                **{
74
                    **interface_dict,
75
                    **{"_id": interface_id, "updated_at": utc_now},
76
                }
77
            )
78 1
            payload = model.model_dump(
79
                exclude={"inserted_at"}, exclude_none=True
80
            )
81 1
            ops.append(
82
                UpdateOne(
83
                    {"_id": interface_id},
84
                    {
85
                        "$set": payload,
86
                        "$setOnInsert": {"inserted_at": utc_now},
87
                    },
88
                    upsert=upsert,
89
                )
90
            )
91 1
        response = self.db.liveness.bulk_write(ops)
92 1
        return response.upserted_count or response.modified_count
93
94 1
    def enable_interfaces(self, interface_ids: List[str]) -> int:
95
        """Enable liveness interfaces."""
96 1
        return self.upsert_interfaces(
97
            interface_ids,
98
            [{"enabled": True} for _ in interface_ids],
99
            upsert=True,
100
        )
101
102 1
    def disable_interfaces(self, interface_ids: List[str]) -> int:
103
        """Disable liveness interfaces."""
104 1
        return self.upsert_interfaces(
105
            interface_ids,
106
            [{"enabled": False} for _ in interface_ids],
107
            upsert=False,
108
        )
109
110 1
    def delete_interface(self, interface_id: str) -> Optional[dict]:
111
        """Hard delete one interface."""
112 1
        return self.db.liveness.find_one_and_delete({"_id": interface_id})
113
114 1
    def delete_interfaces(self, interface_ids: List[str]) -> int:
115
        """Hard delete liveness interfaces."""
116 1
        if not interface_ids:
117
            return 0
118 1
        ops = [
119
            DeleteOne({"_id": interface_id})
120
            for interface_id in interface_ids
121
        ]
122 1
        response = self.db.liveness.bulk_write(ops)
123
        return response.deleted_count
124