Test Failed
Pull Request — master (#166)
by Italo Valcy
03:43
created

build.db.models.DocumentBaseModel.dict()   A

Complexity

Conditions 5

Size

Total Lines 8
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 5
eloc 7
nop 2
dl 0
loc 8
rs 9.3333
c 0
b 0
f 0
1
"""Models for Mongo DB"""
2
3
from datetime import date, datetime
4
from pathlib import Path
5
from typing import Dict, List, Optional
6
from pydantic import BaseModel, Field
7
8
9
class DocumentBaseModel(BaseModel):
10
    """Base model for Mongo documents"""
11
12
    id: str = Field(None, alias="_id")
13
    inserted_at: Optional[datetime]
14
    updated_at: Optional[datetime]
15
16
    def dict(self, **kwargs) -> Dict:
17
        """Return a dictionary representation of the model"""
18
        values = super().dict(**kwargs)
19
        if "id" in values and values["id"]:
20
            values["_id"] = values["id"]
21
        if "exclude" in kwargs and "_id" in kwargs["exclude"]:
22
            del values["_id"]
23
        return values
24
25
26
class CircuitScheduleDoc(BaseModel):
27
    """EVC circuit schedule model"""
28
29
    id: str
30
    date: Optional[str]
31
    frequency: Optional[str]
32
    interval: Optional[int]
33
    action: str
34
35
36
class TAGDoc(BaseModel):
37
    tag_type: int
38
    value: int
39
40
41
class UNIDoc(BaseModel):
42
    """UNI model"""
43
44
    tag: Optional[TAGDoc]
45
    interface_id: str
46
47
        
48
class EVCBaseDoc(DocumentBaseModel):
49
    """Base model for EVC documents"""
50
51
    uni_a: UNIDoc
52
    uni_z: UNIDoc
53
    name: str
54
    request_time: Optional[datetime]
55
    start_date: Optional[datetime]
56
    end_date: Optional[datetime]
57
    queue_id: Optional[int]
58
    bandwidth: int = 0
59
    primary_path: Optional[List]
60
    backup_path: Optional[List]
61
    current_path: Optional[List]
62
    primary_links: Optional[List]
63
    backup_links: Optional[List]
64
    backup_links: Optional[List]
65
    dynamic_backup_path: bool
66
    creation_time: datetime
67
    owner: Optional[str]
68
    priority: int
69
    circuit_scheduler: List[CircuitScheduleDoc]
70
    archived: bool = False
71
    metadata: Optional[Dict] = None
72
    active: bool
73
    enabled: bool
74
    
75
    @staticmethod
76
    def projection() -> Dict:
77
        """Base projection of EVCBaseDoc model."""
78
        time_fmt = "%Y-%m-%dT%H:%M:%S"
79
        return {
80
            "_id": 0,
81
            "id": 1,
82
            "uni_a": 1,
83
            "uni_z": 1,
84
            "name": 1,
85
            "bandwidth": 1,
86
            "primary_path": 1,
87
            "backup_path": 1,
88
            "current_path": 1,
89
            "dynamic_backup_path": 1,
90
            "priority": 1,
91
            "circuit_scheduler": 1,
92
            "archived": 1,
93
            "metadata": 1,
94
            "active": 1,
95
            "enabled": 1,
96
            "owner": { "$ifNull": [ "$owner", None ]},
97
            "queue_id": { "$ifNull": [ "$queue_id", None ]},
98
            "primary_links": { "$ifNull": [ "$primary_links", [] ]},
99
            "backup_links": { "$ifNull": [ "$backup_links", [] ]},
100
            "start_date": { "$dateToString": {
101
                "format": time_fmt, "date": "$start_date"
102
            }},
103
            "creation_time": { "$dateToString": {
104
                "format": time_fmt, "date": "$creation_time"
105
            }},
106
            "request_time": { "$dateToString": {
107
                "format": time_fmt, "date": {
108
                    "$ifNull": [ "$request_time", "$inserted_at" ]
109
                }
110
            }},
111
            "end_date": { "$dateToString": {
112
                "format": time_fmt, "date": {
113
                    "$ifNull": [ "$end_date", None ]
114
                }
115
            }},
116
        }
117