1
|
|
|
"""DB models.""" |
2
|
|
|
# pylint: disable=unused-argument,no-self-argument,invalid-name, |
3
|
|
|
# pylint: disable=no-name-in-module |
4
|
|
|
|
5
|
1 |
|
from datetime import datetime |
6
|
1 |
|
from typing import Dict, Optional |
7
|
|
|
|
8
|
1 |
|
from pydantic import BaseModel, Field, field_validator |
9
|
1 |
|
from typing_extensions import Annotated |
10
|
|
|
|
11
|
|
|
|
12
|
1 |
|
class DocumentBaseModel(BaseModel): |
13
|
|
|
"""DocumentBaseModel.""" |
14
|
|
|
|
15
|
1 |
|
id: str = Field(None, alias="_id") |
16
|
1 |
|
inserted_at: Optional[datetime] = None |
17
|
1 |
|
updated_at: Optional[datetime] = None |
18
|
|
|
|
19
|
1 |
|
def model_dump(self, **kwargs) -> dict: |
20
|
|
|
"""Model to dict.""" |
21
|
1 |
|
values = super().model_dump(**kwargs) |
22
|
1 |
|
if "id" in values and values["id"]: |
23
|
1 |
|
values["_id"] = values["id"] |
24
|
1 |
|
if "exclude" in kwargs and "_id" in kwargs["exclude"]: |
25
|
1 |
|
values.pop("_id") |
26
|
1 |
|
return values |
27
|
|
|
|
28
|
|
|
|
29
|
1 |
|
class InterfaceSubDoc(BaseModel): |
30
|
|
|
"""Interface DB SubDocument Model.""" |
31
|
|
|
|
32
|
1 |
|
id: str |
33
|
1 |
|
enabled: bool |
34
|
1 |
|
mac: str |
35
|
1 |
|
speed: Optional[float] |
36
|
1 |
|
port_number: int |
37
|
1 |
|
name: str |
38
|
1 |
|
nni: bool = False |
39
|
1 |
|
lldp: bool |
40
|
1 |
|
switch: str |
41
|
1 |
|
link: Optional[str] = None |
42
|
1 |
|
link_side: Optional[str] = None |
43
|
1 |
|
metadata: dict = {} |
44
|
1 |
|
updated_at: Optional[datetime] = None |
45
|
|
|
|
46
|
|
|
|
47
|
1 |
|
class SwitchDoc(DocumentBaseModel): |
48
|
|
|
"""Switch DB Document Model.""" |
49
|
|
|
|
50
|
1 |
|
enabled: bool |
51
|
1 |
|
data_path: Optional[str] = None |
52
|
1 |
|
hardware: Optional[str] = None |
53
|
1 |
|
manufacturer: Optional[str] = None |
54
|
1 |
|
software: Optional[str] = None |
55
|
1 |
|
connection: Optional[str] = None |
56
|
1 |
|
ofp_version: Optional[str] = None |
57
|
1 |
|
serial: Optional[str] = None |
58
|
1 |
|
metadata: dict = {} |
59
|
1 |
|
interfaces: list[InterfaceSubDoc] = [] |
60
|
|
|
|
61
|
1 |
|
@field_validator("interfaces", mode="before") |
62
|
1 |
|
def preset_interfaces(cls, v, values, **kwargs) -> list[InterfaceSubDoc]: |
63
|
|
|
"""Preset interfaces.""" |
64
|
1 |
|
if isinstance(v, dict): |
65
|
1 |
|
return list(v.values()) |
66
|
1 |
|
return v |
67
|
|
|
|
68
|
1 |
|
@staticmethod |
69
|
1 |
|
def projection() -> dict: |
70
|
|
|
"""Base projection of this model.""" |
71
|
1 |
|
return { |
72
|
|
|
"_id": 0, |
73
|
|
|
"id": 1, |
74
|
|
|
"enabled": 1, |
75
|
|
|
"data_path": 1, |
76
|
|
|
"hardware": 1, |
77
|
|
|
"manufacturer": 1, |
78
|
|
|
"software": 1, |
79
|
|
|
"connection": 1, |
80
|
|
|
"ofp_version": 1, |
81
|
|
|
"serial": 1, |
82
|
|
|
"metadata": 1, |
83
|
|
|
"interfaces": { |
84
|
|
|
"$arrayToObject": { |
85
|
|
|
"$map": { |
86
|
|
|
"input": "$interfaces", |
87
|
|
|
"as": "intf", |
88
|
|
|
"in": {"k": "$$intf.id", "v": "$$intf"}, |
89
|
|
|
} |
90
|
|
|
} |
91
|
|
|
}, |
92
|
|
|
"updated_at": 1, |
93
|
|
|
"inserted_at": 1, |
94
|
|
|
} |
95
|
|
|
|
96
|
|
|
|
97
|
1 |
|
class InterfaceIdSubDoc(BaseModel): |
98
|
|
|
"""InterfaceId DB SubDocument Model.""" |
99
|
|
|
|
100
|
1 |
|
id: str |
101
|
|
|
|
102
|
|
|
|
103
|
1 |
|
class LinkDoc(DocumentBaseModel): |
104
|
|
|
"""Link DB Document Model.""" |
105
|
|
|
|
106
|
1 |
|
enabled: bool |
107
|
1 |
|
metadata: dict = {} |
108
|
1 |
|
endpoints: Annotated[list[InterfaceIdSubDoc], |
109
|
|
|
Field(min_length=2, max_length=2)] |
110
|
|
|
|
111
|
1 |
|
@staticmethod |
112
|
1 |
|
def projection() -> dict: |
113
|
|
|
"""Base projection of this model.""" |
114
|
1 |
|
return { |
115
|
|
|
"_id": 0, |
116
|
|
|
"id": 1, |
117
|
|
|
"enabled": 1, |
118
|
|
|
"metadata": 1, |
119
|
|
|
"endpoint_a": {"$first": "$endpoints"}, |
120
|
|
|
"endpoint_b": {"$last": "$endpoints"}, |
121
|
|
|
"updated_at": 1, |
122
|
|
|
"inserted_at": 1, |
123
|
|
|
} |
124
|
|
|
|
125
|
|
|
|
126
|
1 |
|
class InterfaceDetailDoc(DocumentBaseModel): |
127
|
|
|
"""InterfaceDetail DB Document Model.""" |
128
|
|
|
|
129
|
1 |
|
available_tags: Dict[str, list[list[int]]] |
130
|
1 |
|
tag_ranges: Dict[str, list[list[int]]] |
131
|
1 |
|
special_available_tags: Dict[str, list[str]] |
132
|
|
|
special_tags: Dict[str, list[str]] |
133
|
|
|
|