Passed
Push — master ( d4a335...2700e4 )
by Vinicius
13:56 queued 10:57
created

build.db.models   A

Complexity

Total Complexity 11

Size/Duplication

Total Lines 197
Duplicated Lines 0 %

Test Coverage

Coverage 100%

Importance

Changes 0
Metric Value
wmc 11
eloc 151
dl 0
loc 197
ccs 108
cts 108
cp 1
rs 10
c 0
b 0
f 0

3 Methods

Rating   Name   Duplication   Size   Complexity  
A TAGDoc.validate_value() 0 11 5
B EVCBaseDoc.projection() 0 54 1
A DocumentBaseModel.model_dump() 0 8 5
1
"""Models for Mongo DB"""
2
# pylint: disable=unused-argument,invalid-name,unused-argument
3
# pylint: disable=no-self-argument,no-name-in-module
4
5 1
from datetime import datetime
6 1
from typing import Dict, Literal, Optional, Union
7
8 1
from pydantic import BaseModel, Field, field_validator
9
10
11 1
class DocumentBaseModel(BaseModel):
12
    """Base model for Mongo documents"""
13
14 1
    id: str = Field(None, alias="_id")
15 1
    inserted_at: Optional[datetime] = None
16 1
    updated_at: Optional[datetime] = None
17
18 1
    def model_dump(self, **kwargs) -> Dict:
19
        """Return a dictionary representation of the model"""
20 1
        values = super().model_dump(**kwargs)
21 1
        if "id" in values and values["id"]:
22 1
            values["_id"] = values["id"]
23 1
        if "exclude" in kwargs and "_id" in kwargs["exclude"]:
24 1
            del values["_id"]
25 1
        return values
26
27
28 1
class CircuitScheduleDoc(BaseModel):
29
    """EVC circuit schedule model"""
30
31 1
    id: str
32 1
    date: Optional[str] = None
33 1
    frequency: Optional[str] = None
34 1
    interval: Optional[int] = None
35 1
    action: str
36
37
38 1
class TAGDoc(BaseModel):
39
    """TAG model"""
40 1
    tag_type: str
41 1
    value: Union[int, str, list[list[int]]]
42 1
    mask_list: Optional[list[Union[str, int]]] = None
43
44 1
    @field_validator('value')
45 1
    def validate_value(cls, value):
46
        """Validate value when is a string"""
47 1
        if isinstance(value, list):
48 1
            return value
49 1
        if isinstance(value, int):
50 1
            return value
51 1
        if isinstance(value, str) and value in ("any", "untagged"):
52 1
            return value
53 1
        raise ValueError(f"{value} is not allowed as {type(value)}. " +
54
                         "Allowed strings are 'any' and 'untagged'.")
55
56
57 1
class UNIDoc(BaseModel):
58
    """UNI model"""
59 1
    tag: Optional[TAGDoc] = None
60 1
    interface_id: str
61
62
63 1
class LinkConstraints(BaseModel):
64
    """LinkConstraints."""
65 1
    bandwidth: Optional[float] = None
66 1
    ownership: Optional[str] = None
67 1
    reliability: Optional[float] = None
68 1
    utilization: Optional[float] = None
69 1
    delay: Optional[float] = None
70 1
    priority: Optional[int] = None
71 1
    not_ownership: Optional[list[str]] = None
72
73
74 1
class PathConstraints(BaseModel):
75
    """Pathfinder Constraints."""
76 1
    spf_attribute: Optional[Literal["hop", "delay", "priority"]] = None
77 1
    spf_max_path_cost: Optional[float] = None
78 1
    mandatory_metrics: Optional[LinkConstraints] = None
79 1
    flexible_metrics: Optional[LinkConstraints] = None
80 1
    minimum_flexible_hits: Optional[int] = None
81 1
    undesired_links: Optional[list[str]] = None
82
83
84 1
class EVCUpdateDoc(DocumentBaseModel):
85
    """Base model when updating an EVC document"""
86 1
    uni_a: Optional[UNIDoc] = None
87 1
    uni_z: Optional[UNIDoc] = None
88 1
    name: Optional[str] = None
89 1
    request_time: Optional[datetime] = None
90 1
    start_date: Optional[datetime] = None
91 1
    end_date: Optional[datetime] = None
92 1
    queue_id: Optional[int] = None
93 1
    flow_removed_at: Optional[datetime] = None
94 1
    execution_rounds: Optional[int] = None
95 1
    bandwidth: Optional[int] = None
96 1
    primary_path: Optional[list] = None
97 1
    backup_path: Optional[list] = None
98 1
    primary_links: Optional[list] = None
99 1
    backup_links: Optional[list] = None
100 1
    dynamic_backup_path: Optional[bool] = None
101 1
    primary_constraints: Optional[PathConstraints] = None
102 1
    secondary_constraints: Optional[PathConstraints] = None
103 1
    owner: Optional[str] = None
104 1
    sb_priority: Optional[int] = None
105 1
    service_level: Optional[int] = None
106 1
    circuit_scheduler: Optional[list[CircuitScheduleDoc]] = None
107 1
    metadata: Optional[dict] = None
108 1
    enabled: Optional[bool] = None
109
110
111 1
class EVCBaseDoc(DocumentBaseModel):
112
    """Base model for EVC documents"""
113
114 1
    uni_a: UNIDoc
115 1
    uni_z: UNIDoc
116 1
    name: str
117 1
    request_time: Optional[datetime] = None
118 1
    start_date: Optional[datetime] = None
119 1
    end_date: Optional[datetime] = None
120 1
    queue_id: Optional[int] = None
121 1
    flow_removed_at: Optional[datetime] = None
122 1
    execution_rounds: int = 0
123 1
    bandwidth: int = 0
124 1
    primary_path: Optional[list] = None
125 1
    backup_path: Optional[list] = None
126 1
    current_path: Optional[list] = None
127 1
    failover_path: Optional[list] = None
128 1
    primary_links: Optional[list] = None
129 1
    backup_links: Optional[list] = None
130 1
    dynamic_backup_path: bool
131 1
    primary_constraints: Optional[PathConstraints] = None
132 1
    secondary_constraints: Optional[PathConstraints] = None
133 1
    creation_time: datetime
134 1
    owner: Optional[str] = None
135 1
    sb_priority: Optional[int] = None
136 1
    service_level: int = 0
137 1
    circuit_scheduler: list[CircuitScheduleDoc]
138 1
    archived: bool = False
139 1
    metadata: Dict = {}
140 1
    active: bool
141 1
    enabled: bool
142
143 1
    @staticmethod
144 1
    def projection() -> Dict:
145
        """Base projection of EVCBaseDoc model."""
146 1
        time_fmt = "%Y-%m-%dT%H:%M:%S"
147 1
        return {
148
            "_id": 0,
149
            "id": 1,
150
            "uni_a": 1,
151
            "uni_z": 1,
152
            "name": 1,
153
            "bandwidth": 1,
154
            "primary_path": 1,
155
            "backup_path": 1,
156
            "current_path": 1,
157
            "failover_path": 1,
158
            "dynamic_backup_path": 1,
159
            "sb_priority": {"$ifNull": ["$sb_priority", "$priority", None]},
160
            "service_level": 1,
161
            "circuit_scheduler": 1,
162
            "archived": 1,
163
            "metadata": 1,
164
            "active": 1,
165
            "enabled": 1,
166
            "execution_rounds": {"$ifNull": ["$execution_rounds", 0]},
167
            "owner": {"$ifNull": ["$owner", None]},
168
            "queue_id": {"$ifNull": ["$queue_id", None]},
169
            "primary_constraints": {"$ifNull": ["$primary_constraints", {}]},
170
            "secondary_constraints": {"$ifNull": ["$secondary_constraints",
171
                                      {}]},
172
            "primary_links": {"$ifNull": ["$primary_links", []]},
173
            "backup_links": {"$ifNull": ["$backup_links", []]},
174
            "start_date": {"$dateToString": {
175
                "format": time_fmt, "date": "$start_date"
176
            }},
177
            "creation_time": {"$dateToString": {
178
                "format": time_fmt, "date": "$creation_time"
179
            }},
180
            "request_time": {"$dateToString": {
181
                "format": time_fmt, "date": {
182
                    "$ifNull": ["$request_time", "$inserted_at"]
183
                }
184
            }},
185
            "end_date": {"$dateToString": {
186
                "format": time_fmt, "date": {
187
                    "$ifNull": ["$end_date", None]
188
                }
189
            }},
190
            "flow_removed_at": {"$dateToString": {
191
                "format": time_fmt, "date": {
192
                    "$ifNull": ["$flow_removed_at", None]
193
                }
194
            }},
195
            "updated_at": {"$dateToString": {
196
                "format": time_fmt, "date": "$updated_at"
197
            }}
198
        }
199