Passed
Pull Request — master (#198)
by Vinicius
03:52
created

build.db.models.EVCBaseDoc.projection()   B

Complexity

Conditions 1

Size

Total Lines 44
Code Lines 39

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 4
CRAP Score 1

Importance

Changes 0
Metric Value
eloc 39
dl 0
loc 44
ccs 4
cts 4
cp 1
rs 8.9439
c 0
b 0
f 0
cc 1
nop 0
crap 1
1
"""Models for Mongo DB"""
2
# pylint: disable=unused-argument,invalid-name,unused-argument
3
# pylint: disable=no-self-argument,no-name-in-module
4
5 1
from datetime import datetime
6 1
from typing import Dict, List, Literal, Optional
7
8 1
from pydantic import BaseModel, Field
9
10
11 1
class DocumentBaseModel(BaseModel):
12
    """Base model for Mongo documents"""
13
14 1
    id: str = Field(None, alias="_id")
15 1
    inserted_at: Optional[datetime]
16 1
    updated_at: Optional[datetime]
17
18 1
    def dict(self, **kwargs) -> Dict:
19
        """Return a dictionary representation of the model"""
20 1
        values = super().dict(**kwargs)
21 1
        if "id" in values and values["id"]:
22 1
            values["_id"] = values["id"]
23 1
        if "exclude" in kwargs and "_id" in kwargs["exclude"]:
24 1
            del values["_id"]
25 1
        return values
26
27
28 1
class CircuitScheduleDoc(BaseModel):
29
    """EVC circuit schedule model"""
30
31 1
    id: str
32 1
    date: Optional[str]
33 1
    frequency: Optional[str]
34 1
    interval: Optional[int]
35 1
    action: str
36
37
38 1
class TAGDoc(BaseModel):
39
    """TAG model"""
40 1
    tag_type: int
41 1
    value: int
42
43
44 1
class UNIDoc(BaseModel):
45
    """UNI model"""
46
47 1
    tag: Optional[TAGDoc]
48 1
    interface_id: str
49
50
51 1
class LinkConstraints(BaseModel):
52
    """LinkConstraints."""
53 1
    bandwidth: Optional[float]
54 1
    ownership: Optional[str]
55 1
    reliability: Optional[float]
56 1
    utilization: Optional[float]
57 1
    delay: Optional[float]
58 1
    priority: Optional[int]
59
60
61 1
class PathConstraints(BaseModel):
62
    """Pathfinder Constraints."""
63 1
    spf_attribute: Literal["hop", "delay", "priority"] = "hop"
64 1
    spf_max_path_cost: Optional[float]
65 1
    mandatory_metrics: Optional[LinkConstraints]
66 1
    flexible_metrics: Optional[LinkConstraints]
67 1
    minimul_flexible_hits: Optional[int]
68 1
    desired_links: Optional[List[str]]
69 1
    undesired_links: Optional[List[str]]
70
71
72 1
class EVCBaseDoc(DocumentBaseModel):
73
    """Base model for EVC documents"""
74
75 1
    uni_a: UNIDoc
76 1
    uni_z: UNIDoc
77 1
    name: str
78 1
    request_time: Optional[datetime]
79 1
    start_date: Optional[datetime]
80 1
    end_date: Optional[datetime]
81 1
    queue_id: Optional[int]
82 1
    bandwidth: int = 0
83 1
    primary_path: Optional[List]
84 1
    backup_path: Optional[List]
85 1
    current_path: Optional[List]
86 1
    failover_path: Optional[List]
87 1
    primary_links: Optional[List]
88 1
    backup_links: Optional[List]
89 1
    backup_links: Optional[List]
90 1
    dynamic_backup_path: bool
91 1
    primary_constraints: Optional[PathConstraints]
92 1
    secondary_constraints: Optional[PathConstraints]
93 1
    creation_time: datetime
94 1
    owner: Optional[str]
95 1
    sb_priority: Optional[int]
96 1
    service_level: int = 0
97 1
    circuit_scheduler: List[CircuitScheduleDoc]
98 1
    archived: bool = False
99 1
    metadata: Optional[Dict] = None
100 1
    active: bool
101 1
    enabled: bool
102
103 1
    @staticmethod
104 1
    def projection() -> Dict:
105
        """Base projection of EVCBaseDoc model."""
106 1
        time_fmt = "%Y-%m-%dT%H:%M:%S"
107 1
        return {
108
            "_id": 0,
109
            "id": 1,
110
            "uni_a": 1,
111
            "uni_z": 1,
112
            "name": 1,
113
            "bandwidth": 1,
114
            "primary_path": 1,
115
            "backup_path": 1,
116
            "current_path": 1,
117
            "failover_path": 1,
118
            "dynamic_backup_path": 1,
119
            "sb_priority": {"$ifNull": ["$sb_priority", "$priority", None]},
120
            "service_level": 1,
121
            "circuit_scheduler": 1,
122
            "archived": 1,
123
            "metadata": 1,
124
            "active": 1,
125
            "enabled": 1,
126
            "owner": {"$ifNull": ["$owner", None]},
127
            "queue_id": {"$ifNull": ["$queue_id", None]},
128
            "primary_constraints": {"$ifNull": ["$primary_constraints", {}]},
129
            "secondary_constraints": {"$ifNull": ["$secondary_constraints",
130
                                      {}]},
131
            "primary_links": {"$ifNull": ["$primary_links", []]},
132
            "backup_links": {"$ifNull": ["$backup_links", []]},
133
            "start_date": {"$dateToString": {
134
                "format": time_fmt, "date": "$start_date"
135
            }},
136
            "creation_time": {"$dateToString": {
137
                "format": time_fmt, "date": "$creation_time"
138
            }},
139
            "request_time": {"$dateToString": {
140
                "format": time_fmt, "date": {
141
                    "$ifNull": ["$request_time", "$inserted_at"]
142
                }
143
            }},
144
            "end_date": {"$dateToString": {
145
                "format": time_fmt, "date": {
146
                    "$ifNull": ["$end_date", None]
147
                }
148
            }},
149
        }
150