Test Failed
Pull Request — master (#348)
by
unknown
03:43
created

build.db.models.PathConstraints.__init__()   A

Complexity

Conditions 1

Size

Total Lines 3
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 1
CRAP Score 1

Importance

Changes 0
Metric Value
cc 1
eloc 3
nop 2
dl 0
loc 3
ccs 1
cts 1
cp 1
crap 1
rs 10
c 0
b 0
f 0
1
"""Models for Mongo DB"""
2
# pylint: disable=unused-argument,invalid-name,unused-argument
3
# pylint: disable=no-self-argument,no-name-in-module
4
5 1
from datetime import datetime
6 1
from typing import Dict, List, Literal, Optional, Union, Any
7
8 1
from pydantic import BaseModel, Field, validator
9
10
11 1
class DocumentBaseModel(BaseModel):
12
    """Base model for Mongo documents"""
13
14 1
    id: str = Field(None, alias="_id")
15 1
    inserted_at: Optional[datetime]
16 1
    updated_at: Optional[datetime]
17
18 1
    def dict(self, **kwargs) -> Dict:
19
        """Return a dictionary representation of the model"""
20 1
        values = super().dict(**kwargs)
21 1
        if "id" in values and values["id"]:
22 1
            values["_id"] = values["id"]
23 1
        if "exclude" in kwargs and "_id" in kwargs["exclude"]:
24 1
            del values["_id"]
25 1
        return values
26
27
28 1
class CircuitScheduleDoc(BaseModel):
29
    """EVC circuit schedule model"""
30
31 1
    id: str
32 1
    date: Optional[str]
33 1
    frequency: Optional[str]
34 1
    interval: Optional[int]
35 1
    action: str
36
37
38 1
class TAGDoc(BaseModel):
39
    """TAG model"""
40 1
    tag_type: int
41 1
    value: Union[int, str]
42
43 1
    @validator('value')
44 1
    def validate_value(cls, value):
45
        """Validate value when is a string"""
46 1
        if isinstance(value, int):
47 1
            return value
48 1
        if isinstance(value, str) and value in ("any", "untagged"):
49 1
            return value
50 1
        raise ValueError(f"{value} is not allowed as {type(value)}. " +
51
                         "Allowed strings are 'any' and 'untagged'.")
52
53
54 1
class UNIDoc(BaseModel):
55
    """UNI model"""
56 1
    tag: Optional[TAGDoc]
57 1
    interface_id: str
58
59
60 1
class LinkConstraints(BaseModel):
61
    """LinkConstraints."""
62 1
    bandwidth: Optional[float]
63 1
    ownership: Optional[str]
64 1
    reliability: Optional[float]
65 1
    utilization: Optional[float]
66 1
    delay: Optional[float]
67 1
    priority: Optional[int]
68
69
70 1
class PathConstraints(BaseModel):
71
    """Pathfinder Constraints."""
72 1
    spf_attribute: Literal["hop", "delay", "priority"] = "hop"
73 1
    spf_max_path_cost: Optional[float]
74 1
    mandatory_metrics: Optional[LinkConstraints]
75 1
    flexible_metrics: Optional[LinkConstraints]
76 1
    minimum_flexible_hits: Optional[int]
77 1
    undesired_links: Optional[List[str]]
78
79
    def __init__(__pydantic_self__, **data: Any) -> None:
80 1
        super().__init__(**data)
81
        __pydantic_self__.__fields_set__.add('spf_attribute')
82
83 1
class EVCBaseDoc(DocumentBaseModel):
84 1
    """Base model for EVC documents"""
85 1
86 1
    uni_a: UNIDoc
87 1
    uni_z: UNIDoc
88 1
    name: str
89 1
    request_time: Optional[datetime]
90 1
    start_date: Optional[datetime]
91 1
    end_date: Optional[datetime]
92 1
    queue_id: Optional[int]
93 1
    flow_removed_at: Optional[datetime]
94 1
    execution_rounds: int = 0
95 1
    bandwidth: int = 0
96 1
    primary_path: Optional[List]
97 1
    backup_path: Optional[List]
98 1
    current_path: Optional[List]
99 1
    failover_path: Optional[List]
100 1
    primary_links: Optional[List]
101 1
    backup_links: Optional[List]
102 1
    backup_links: Optional[List]
103 1
    dynamic_backup_path: bool
104 1
    primary_constraints: Optional[PathConstraints]
105 1
    secondary_constraints: Optional[PathConstraints]
106 1
    creation_time: datetime
107 1
    owner: Optional[str]
108 1
    sb_priority: Optional[int]
109 1
    service_level: int = 0
110 1
    circuit_scheduler: List[CircuitScheduleDoc]
111 1
    archived: bool = False
112
    metadata: Dict = {}
113 1
    active: bool
114 1
    enabled: bool
115
116 1
    @staticmethod
117 1
    def projection() -> Dict:
118
        """Base projection of EVCBaseDoc model."""
119
        time_fmt = "%Y-%m-%dT%H:%M:%S"
120
        return {
121
            "_id": 0,
122
            "id": 1,
123
            "uni_a": 1,
124
            "uni_z": 1,
125
            "name": 1,
126
            "bandwidth": 1,
127
            "primary_path": 1,
128
            "backup_path": 1,
129
            "current_path": 1,
130
            "failover_path": 1,
131
            "dynamic_backup_path": 1,
132
            "sb_priority": {"$ifNull": ["$sb_priority", "$priority", None]},
133
            "service_level": 1,
134
            "circuit_scheduler": 1,
135
            "archived": 1,
136
            "metadata": 1,
137
            "active": 1,
138
            "enabled": 1,
139
            "execution_rounds": {"$ifNull": ["$execution_rounds", 0]},
140
            "owner": {"$ifNull": ["$owner", None]},
141
            "queue_id": {"$ifNull": ["$queue_id", None]},
142
            "primary_constraints": {"$ifNull": ["$primary_constraints", {}]},
143
            "secondary_constraints": {"$ifNull": ["$secondary_constraints",
144
                                      {}]},
145
            "primary_links": {"$ifNull": ["$primary_links", []]},
146
            "backup_links": {"$ifNull": ["$backup_links", []]},
147
            "start_date": {"$dateToString": {
148
                "format": time_fmt, "date": "$start_date"
149
            }},
150
            "creation_time": {"$dateToString": {
151
                "format": time_fmt, "date": "$creation_time"
152
            }},
153
            "request_time": {"$dateToString": {
154
                "format": time_fmt, "date": {
155
                    "$ifNull": ["$request_time", "$inserted_at"]
156
                }
157
            }},
158
            "end_date": {"$dateToString": {
159
                "format": time_fmt, "date": {
160
                    "$ifNull": ["$end_date", None]
161
                }
162
            }},
163
            "flow_removed_at": {"$dateToString": {
164
                "format": time_fmt, "date": {
165
                    "$ifNull": ["$flow_removed_at", None]
166
                }
167
            }},
168
            "updated_at": {"$dateToString": {
169
                "format": time_fmt, "date": "$updated_at"
170
            }}
171
        }
172