Passed
Pull Request — master (#258)
by
unknown
05:24 queued 01:48
created

build.db.models   A

Complexity

Total Complexity 9

Size/Duplication

Total Lines 165
Duplicated Lines 0 %

Test Coverage

Coverage 98.75%

Importance

Changes 0
Metric Value
eloc 122
dl 0
loc 165
ccs 79
cts 80
cp 0.9875
rs 10
c 0
b 0
f 0
wmc 9

3 Methods

Rating   Name   Duplication   Size   Complexity  
A DocumentBaseModel.dict() 0 8 5
A TAGDoc.validate_value() 0 6 3
B EVCBaseDoc.projection() 0 54 1
1
"""Models for Mongo DB"""
2
# pylint: disable=unused-argument,invalid-name,unused-argument
3
# pylint: disable=no-self-argument,no-name-in-module
4
5 1
from datetime import datetime
6 1
from typing import Dict, List, Literal, Optional, Union
7
8 1
from pydantic import BaseModel, Field, validator
9
10
11 1
class DocumentBaseModel(BaseModel):
12
    """Base model for Mongo documents"""
13
14 1
    id: str = Field(None, alias="_id")
15 1
    inserted_at: Optional[datetime]
16 1
    updated_at: Optional[datetime]
17
18 1
    def dict(self, **kwargs) -> Dict:
19
        """Return a dictionary representation of the model"""
20 1
        values = super().dict(**kwargs)
21 1
        if "id" in values and values["id"]:
22 1
            values["_id"] = values["id"]
23 1
        if "exclude" in kwargs and "_id" in kwargs["exclude"]:
24 1
            del values["_id"]
25 1
        return values
26
27
28 1
class CircuitScheduleDoc(BaseModel):
29
    """EVC circuit schedule model"""
30
31 1
    id: str
32 1
    date: Optional[str]
33 1
    frequency: Optional[str]
34 1
    interval: Optional[int]
35 1
    action: str
36
37
38 1
class TAGDoc(BaseModel):
39
    """TAG model"""
40 1
    tag_type: int
41 1
    value: Union[int, str]
42
43 1
    @validator('value')
44 1
    def validate_value(cls, value):
45
        """Validate value when is a string"""
46 1
        if isinstance(value, str) and (value not in ("any", "untagged")):
47
            raise ValueError("value only allows 'any' and 'untagged' strings")
48 1
        return value
49
50
51 1
class UNIDoc(BaseModel):
52
    """UNI model"""
53 1
    tag: Optional[TAGDoc]
54 1
    interface_id: str
55
56
57 1
class LinkConstraints(BaseModel):
58
    """LinkConstraints."""
59 1
    bandwidth: Optional[float]
60 1
    ownership: Optional[str]
61 1
    reliability: Optional[float]
62 1
    utilization: Optional[float]
63 1
    delay: Optional[float]
64 1
    priority: Optional[int]
65
66
67 1
class PathConstraints(BaseModel):
68
    """Pathfinder Constraints."""
69 1
    spf_attribute: Literal["hop", "delay", "priority"] = "hop"
70 1
    spf_max_path_cost: Optional[float]
71 1
    mandatory_metrics: Optional[LinkConstraints]
72 1
    flexible_metrics: Optional[LinkConstraints]
73 1
    minimul_flexible_hits: Optional[int]
74 1
    desired_links: Optional[List[str]]
75 1
    undesired_links: Optional[List[str]]
76
77
78 1
class EVCBaseDoc(DocumentBaseModel):
79
    """Base model for EVC documents"""
80
81 1
    uni_a: UNIDoc
82 1
    uni_z: UNIDoc
83 1
    name: str
84 1
    request_time: Optional[datetime]
85 1
    start_date: Optional[datetime]
86 1
    end_date: Optional[datetime]
87 1
    queue_id: Optional[int]
88 1
    flow_removed_at: Optional[datetime]
89 1
    execution_rounds: int = 0
90 1
    bandwidth: int = 0
91 1
    primary_path: Optional[List]
92 1
    backup_path: Optional[List]
93 1
    current_path: Optional[List]
94 1
    failover_path: Optional[List]
95 1
    primary_links: Optional[List]
96 1
    backup_links: Optional[List]
97 1
    backup_links: Optional[List]
98 1
    dynamic_backup_path: bool
99 1
    primary_constraints: Optional[PathConstraints]
100 1
    secondary_constraints: Optional[PathConstraints]
101 1
    creation_time: datetime
102 1
    owner: Optional[str]
103 1
    sb_priority: Optional[int]
104 1
    service_level: int = 0
105 1
    circuit_scheduler: List[CircuitScheduleDoc]
106 1
    archived: bool = False
107 1
    metadata: Optional[Dict] = None
108 1
    active: bool
109 1
    enabled: bool
110
111 1
    @staticmethod
112 1
    def projection() -> Dict:
113
        """Base projection of EVCBaseDoc model."""
114 1
        time_fmt = "%Y-%m-%dT%H:%M:%S"
115 1
        return {
116
            "_id": 0,
117
            "id": 1,
118
            "uni_a": 1,
119
            "uni_z": 1,
120
            "name": 1,
121
            "bandwidth": 1,
122
            "primary_path": 1,
123
            "backup_path": 1,
124
            "current_path": 1,
125
            "failover_path": 1,
126
            "dynamic_backup_path": 1,
127
            "sb_priority": {"$ifNull": ["$sb_priority", "$priority", None]},
128
            "service_level": 1,
129
            "circuit_scheduler": 1,
130
            "archived": 1,
131
            "metadata": 1,
132
            "active": 1,
133
            "enabled": 1,
134
            "execution_rounds": {"$ifNull": ["$execution_rounds", 0]},
135
            "owner": {"$ifNull": ["$owner", None]},
136
            "queue_id": {"$ifNull": ["$queue_id", None]},
137
            "primary_constraints": {"$ifNull": ["$primary_constraints", {}]},
138
            "secondary_constraints": {"$ifNull": ["$secondary_constraints",
139
                                      {}]},
140
            "primary_links": {"$ifNull": ["$primary_links", []]},
141
            "backup_links": {"$ifNull": ["$backup_links", []]},
142
            "start_date": {"$dateToString": {
143
                "format": time_fmt, "date": "$start_date"
144
            }},
145
            "creation_time": {"$dateToString": {
146
                "format": time_fmt, "date": "$creation_time"
147
            }},
148
            "request_time": {"$dateToString": {
149
                "format": time_fmt, "date": {
150
                    "$ifNull": ["$request_time", "$inserted_at"]
151
                }
152
            }},
153
            "end_date": {"$dateToString": {
154
                "format": time_fmt, "date": {
155
                    "$ifNull": ["$end_date", None]
156
                }
157
            }},
158
            "flow_removed_at": {"$dateToString": {
159
                "format": time_fmt, "date": {
160
                    "$ifNull": ["$flow_removed_at", None]
161
                }
162
            }},
163
            "updated_at": {"$dateToString": {
164
                "format": time_fmt, "date": "$updated_at"
165
            }}
166
        }
167