build.db.models.SwitchDoc.projection()   A
last analyzed

Complexity

Conditions 1

Size

Total Lines 26
Code Lines 22

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 3
CRAP Score 1

Importance

Changes 0
Metric Value
cc 1
eloc 22
nop 0
dl 0
loc 26
ccs 3
cts 3
cp 1
crap 1
rs 9.352
c 0
b 0
f 0
1
"""DB models."""
2
# pylint: disable=unused-argument,no-self-argument,invalid-name,
3
# pylint: disable=no-name-in-module
4
5 1
from datetime import datetime
6 1
from typing import Dict, Optional
7
8 1
from pydantic import BaseModel, Field, field_validator
9 1
from typing_extensions import Annotated
10
11
12 1
class DocumentBaseModel(BaseModel):
13
    """DocumentBaseModel."""
14
15 1
    id: str = Field(None, alias="_id")
16 1
    inserted_at: Optional[datetime] = None
17 1
    updated_at: Optional[datetime] = None
18
19 1
    def model_dump(self, **kwargs) -> dict:
20
        """Model to dict."""
21 1
        values = super().model_dump(**kwargs)
22 1
        if "id" in values and values["id"]:
23 1
            values["_id"] = values["id"]
24 1
        if "exclude" in kwargs and "_id" in kwargs["exclude"]:
25 1
            values.pop("_id")
26 1
        return values
27
28
29 1
class InterfaceSubDoc(BaseModel):
30
    """Interface DB SubDocument Model."""
31
32 1
    id: str
33 1
    enabled: bool
34 1
    mac: str
35 1
    speed: Optional[float]
36 1
    port_number: int
37 1
    name: str
38 1
    nni: bool = False
39 1
    lldp: bool
40 1
    switch: str
41 1
    link: Optional[str] = None
42 1
    link_side: Optional[str] = None
43 1
    metadata: dict = {}
44 1
    updated_at: Optional[datetime] = None
45
46
47 1
class SwitchDoc(DocumentBaseModel):
48
    """Switch DB Document Model."""
49
50 1
    enabled: bool
51 1
    data_path: Optional[str] = None
52 1
    hardware: Optional[str] = None
53 1
    manufacturer: Optional[str] = None
54 1
    software: Optional[str] = None
55 1
    connection: Optional[str] = None
56 1
    ofp_version: Optional[str] = None
57 1
    serial: Optional[str] = None
58 1
    metadata: dict = {}
59 1
    interfaces: list[InterfaceSubDoc] = []
60
61 1
    @field_validator("interfaces", mode="before")
62 1
    def preset_interfaces(cls, v, values, **kwargs) -> list[InterfaceSubDoc]:
63
        """Preset interfaces."""
64 1
        if isinstance(v, dict):
65 1
            return list(v.values())
66 1
        return v
67
68 1
    @staticmethod
69 1
    def projection() -> dict:
70
        """Base projection of this model."""
71 1
        return {
72
            "_id": 0,
73
            "id": 1,
74
            "enabled": 1,
75
            "data_path": 1,
76
            "hardware": 1,
77
            "manufacturer": 1,
78
            "software": 1,
79
            "connection": 1,
80
            "ofp_version": 1,
81
            "serial": 1,
82
            "metadata": 1,
83
            "interfaces": {
84
                "$arrayToObject": {
85
                    "$map": {
86
                        "input": "$interfaces",
87
                        "as": "intf",
88
                        "in": {"k": "$$intf.id", "v": "$$intf"},
89
                    }
90
                }
91
            },
92
            "updated_at": 1,
93
            "inserted_at": 1,
94
        }
95
96
97 1
class InterfaceIdSubDoc(BaseModel):
98
    """InterfaceId DB SubDocument Model."""
99
100 1
    id: str
101
102
103 1
class LinkDoc(DocumentBaseModel):
104
    """Link DB Document Model."""
105
106 1
    enabled: bool
107 1
    metadata: dict = {}
108 1
    endpoints: Annotated[list[InterfaceIdSubDoc],
109
                         Field(min_length=2, max_length=2)]
110
111 1
    @staticmethod
112 1
    def projection() -> dict:
113
        """Base projection of this model."""
114 1
        return {
115
            "_id": 0,
116
            "id": 1,
117
            "enabled": 1,
118
            "metadata": 1,
119
            "endpoint_a": {"$first": "$endpoints"},
120
            "endpoint_b": {"$last": "$endpoints"},
121
            "updated_at": 1,
122
            "inserted_at": 1,
123
        }
124
125
126 1
class InterfaceDetailDoc(DocumentBaseModel):
127
    """InterfaceDetail DB Document Model."""
128
129 1
    available_tags: Dict[str, list[list[int]]]
130 1
    tag_ranges: Dict[str, list[list[int]]]
131 1
    special_available_tags: Dict[str, list[str]]
132
    special_tags: Dict[str, list[str]]
133