Passed
Pull Request — master (#166)
by Antonio
06:26 queued 54s
created

build.db.models   A

Complexity

Total Complexity 6

Size/Duplication

Total Lines 96
Duplicated Lines 0 %

Test Coverage

Coverage 98%

Importance

Changes 0
Metric Value
wmc 6
eloc 72
dl 0
loc 96
ccs 49
cts 50
cp 0.98
rs 10
c 0
b 0
f 0

2 Methods

Rating   Name   Duplication   Size   Complexity  
A DocumentBaseModel.dict() 0 8 5
A EVCBaseDoc.projection() 0 25 1
1
"""Models for Mongo DB"""
2
3 1
from datetime import date, datetime
4 1
from pathlib import Path
5 1
from typing import Dict, List, Optional
6 1
from pydantic import BaseModel, Field
7
8
9 1
class DocumentBaseModel(BaseModel):
10
    """Base model for Mongo documents"""
11
12 1
    id: str = Field(None, alias="_id")
13 1
    inserted_at: Optional[datetime]
14 1
    updated_at: Optional[datetime]
15
16 1
    def dict(self, **kwargs) -> Dict:
17
        """Return a dictionary representation of the model"""
18 1
        values = super().dict(**kwargs)
19 1
        if "id" in values and values["id"]:
20 1
            values["_id"] = values["id"]
21 1
        if "exclude" in kwargs and "_id" in kwargs["exclude"]:
22
            del values["_id"]
23 1
        return values
24
25
26 1
class CircuitScheduleDoc(BaseModel):
27
    """EVC circuit schedule model"""
28
29 1
    id: str
30 1
    date: Optional[str]
31 1
    frequency: Optional[str]
32 1
    interval: Optional[int]
33 1
    action: str
34
35
36 1
class TAGDoc(BaseModel):
37 1
    tag_type: int
38 1
    value: int
39
40
41 1
class UNIDoc(BaseModel):
42
    """UNI model"""
43
44 1
    tag: Optional[TAGDoc]
45 1
    interface_id: str
46
47
        
48 1
class EVCBaseDoc(DocumentBaseModel):
49
    """Base model for EVC documents"""
50
51 1
    uni_a: UNIDoc
52 1
    uni_z: UNIDoc
53 1
    name: str
54 1
    start_date: Optional[datetime]
55 1
    end_date: Optional[datetime]
56 1
    queue_id: Optional[int]
57 1
    bandwidth: int = 0
58 1
    primary_path: Optional[List]
59 1
    backup_path: Optional[List]
60 1
    current_path: Optional[List]
61 1
    dynamic_backup_path: bool
62 1
    creation_time: datetime
63 1
    owner: Optional[str]
64 1
    priority: int
65 1
    circuit_scheduler: List[CircuitScheduleDoc]
66 1
    archived: bool = False
67 1
    metadata: Optional[Dict] = None
68 1
    active: bool
69 1
    enabled: bool
70
    
71 1
    @staticmethod
72 1
    def projection() -> Dict:
73
        """Base projection of EVCBaseDoc model."""
74 1
        return {
75
            "_id": 0,
76
            "id": 1,
77
            "uni_a": 1,
78
            "uni_z": 1,
79
            "name": 1,
80
            "start_date": 1,
81
            "end_date": 1,
82
            "queue_id": 1,
83
            "bandwidth": 1,
84
            "primary_path": 1,
85
            "backup_path": 1,
86
            "current_path": 1,
87
            "dynamic_backup_path": 1,
88
            "creation_time": 1,
89
            "owner": 1,
90
            "priority": 1,
91
            "circuit_scheduler": 1,
92
            "archived": 1,
93
            "metadata": 1,
94
            "active": 1,
95
            "enabled": 1,
96
        }