|
1
|
|
|
from __future__ import annotations |
|
2
|
|
|
|
|
3
|
|
|
import io |
|
4
|
|
|
import pickle |
|
5
|
|
|
import sys |
|
6
|
|
|
from collections import UserDict, defaultdict |
|
7
|
|
|
from configparser import ConfigParser |
|
8
|
|
|
from datetime import date, datetime |
|
9
|
|
|
from typing import TYPE_CHECKING, Any, ClassVar, Self, TypeVar, Unpack |
|
10
|
|
|
|
|
11
|
|
|
import tomlkit |
|
12
|
|
|
from orjson import orjson |
|
13
|
|
|
from ruamel.yaml import YAML |
|
14
|
|
|
|
|
15
|
|
|
if TYPE_CHECKING: |
|
16
|
|
|
from collections.abc import Callable, Generator, Mapping |
|
17
|
|
|
|
|
18
|
|
|
yaml = YAML(typ="safe") |
|
19
|
|
|
TomlLeaf = None | list | int | str | float | date | datetime |
|
20
|
|
|
TomlBranch = dict[str, TomlLeaf] |
|
21
|
|
|
T = TypeVar("T") |
|
22
|
|
|
|
|
23
|
|
|
|
|
24
|
|
|
def _json_encode_default(obj: Any) -> Any: |
|
25
|
|
|
if isinstance(obj, NestedDotDict): |
|
26
|
|
|
# noinspection PyProtectedMember |
|
27
|
|
|
return dict(obj) |
|
28
|
|
|
|
|
29
|
|
|
|
|
30
|
|
|
def _check(dct: TomlBranch | TomlLeaf) -> None: |
|
31
|
|
|
if isinstance(dct, dict): |
|
32
|
|
|
bad = [k for k in dct if "." in k] |
|
33
|
|
|
if len(bad) > 0: |
|
34
|
|
|
msg = f"Key(s) contain '.': {bad}" |
|
35
|
|
|
raise ValueError(msg) |
|
36
|
|
|
for v in dct.values(): |
|
37
|
|
|
_check(v) |
|
38
|
|
|
|
|
39
|
|
|
|
|
40
|
|
|
class NestedDotDict(UserDict): |
|
41
|
|
|
""" |
|
42
|
|
|
A thin wrapper around a nested dict, a wrapper for TOML. |
|
43
|
|
|
|
|
44
|
|
|
Keys must be strings that do not contain a dot (.). |
|
45
|
|
|
A dot is reserved for splitting values to traverse the tree. |
|
46
|
|
|
For example, `wrapped["pet.species.name"]`. |
|
47
|
|
|
""" |
|
48
|
|
|
|
|
49
|
|
|
PICKLE_PROTOCOL: ClassVar[int] = 5 |
|
50
|
|
|
|
|
51
|
|
|
def __init__(self: Self, x: dict[str, TomlLeaf | TomlBranch] | Self) -> None: |
|
52
|
|
|
""" |
|
53
|
|
|
Constructor. |
|
54
|
|
|
|
|
55
|
|
|
Raises: |
|
56
|
|
|
ValueError: If a key (in this dict or a sub-dict) is not a str or contains a dot |
|
57
|
|
|
""" |
|
58
|
|
|
if not isinstance(x, NestedDotDict | dict): |
|
59
|
|
|
msg = f"Not a dict; actually {type(x)} (value: '{x}')" |
|
60
|
|
|
raise TypeError(msg) |
|
61
|
|
|
_check(x) |
|
62
|
|
|
super().__init__(x) |
|
63
|
|
|
|
|
64
|
|
|
@classmethod |
|
65
|
|
|
def from_toml(cls: type[Self], data: str) -> Self: |
|
66
|
|
|
return cls(tomlkit.loads(data)) |
|
67
|
|
|
|
|
68
|
|
|
@classmethod |
|
69
|
|
|
def from_yaml(cls: type[Self], data: str) -> Self: |
|
70
|
|
|
return cls(yaml.load(data)) |
|
71
|
|
|
|
|
72
|
|
|
@classmethod |
|
73
|
|
|
def from_ini(cls: type[Self], data: str) -> Self: |
|
74
|
|
|
parser = ConfigParser() |
|
75
|
|
|
parser.read_string(data) |
|
76
|
|
|
return cls(parser) |
|
77
|
|
|
|
|
78
|
|
|
@classmethod |
|
79
|
|
|
def from_json(cls: type[Self], data: str) -> Self: |
|
80
|
|
|
return cls(orjson.loads(data)) |
|
81
|
|
|
|
|
82
|
|
|
@classmethod |
|
83
|
|
|
def parse_pickle(cls: type[Self], data: bytes) -> Self: |
|
84
|
|
|
if not isinstance(data, bytes): |
|
85
|
|
|
data = bytes(data) |
|
86
|
|
|
return cls(pickle.loads(data)) |
|
87
|
|
|
|
|
88
|
|
|
def to_json(self: Self, *, indent: bool = False) -> str: |
|
89
|
|
|
""" |
|
90
|
|
|
Returns JSON text. |
|
91
|
|
|
""" |
|
92
|
|
|
kwargs = {"option": orjson.OPT_INDENT_2} if indent else {} |
|
93
|
|
|
encoded = orjson.dumps(self, default=_json_encode_default, **kwargs) |
|
94
|
|
|
return encoded.decode(encoding="utf-8") |
|
95
|
|
|
|
|
96
|
|
|
def to_yaml(self: Self, **kwargs: Unpack[Mapping[str, Any]]) -> str: |
|
97
|
|
|
""" |
|
98
|
|
|
Returns JSON text. |
|
99
|
|
|
""" |
|
100
|
|
|
return yaml.dump(self, **kwargs) |
|
101
|
|
|
|
|
102
|
|
|
def to_ini(self: Self) -> str: |
|
103
|
|
|
""" |
|
104
|
|
|
Returns TOML text. |
|
105
|
|
|
""" |
|
106
|
|
|
config = ConfigParser() |
|
107
|
|
|
config.read_dict(self) |
|
108
|
|
|
writer = io.StringIO() |
|
109
|
|
|
config.write(writer) |
|
110
|
|
|
return writer.getvalue() |
|
111
|
|
|
|
|
112
|
|
|
def to_toml(self: Self) -> str: |
|
113
|
|
|
""" |
|
114
|
|
|
Returns TOML text. |
|
115
|
|
|
""" |
|
116
|
|
|
return tomlkit.dumps(self) |
|
117
|
|
|
|
|
118
|
|
|
def to_pickle(self: Self) -> bytes: |
|
119
|
|
|
""" |
|
120
|
|
|
Writes to a pickle file. |
|
121
|
|
|
""" |
|
122
|
|
|
return pickle.dumps(self, protocol=self.PICKLE_PROTOCOL) |
|
123
|
|
|
|
|
124
|
|
|
def n_elements_total(self: Self) -> int: |
|
125
|
|
|
i = 0 |
|
126
|
|
|
for _ in self.walk(): |
|
127
|
|
|
i += 1 |
|
128
|
|
|
return i |
|
129
|
|
|
|
|
130
|
|
|
def n_bytes_total(self: Self) -> int: |
|
131
|
|
|
return sum([sys.getsizeof(x) for x in self.walk()]) |
|
132
|
|
|
|
|
133
|
|
|
def transform_leaves(self: Self, fn: Callable[[str, TomlLeaf], TomlLeaf]) -> Self: |
|
134
|
|
|
x = {k: fn(k, v) for k, v in self.leaves()} |
|
135
|
|
|
return self.__class__(x) |
|
136
|
|
|
|
|
137
|
|
|
def walk(self: Self) -> Generator[TomlLeaf | TomlBranch, None, None]: |
|
138
|
|
|
for value in self.values(): |
|
139
|
|
|
if isinstance(value, dict): |
|
140
|
|
|
yield from self.__class__(value).walk() |
|
141
|
|
|
elif isinstance(value, list): |
|
142
|
|
|
yield from value |
|
143
|
|
|
else: |
|
144
|
|
|
yield value |
|
145
|
|
|
|
|
146
|
|
|
def branches(self: Self) -> dict[str, TomlBranch]: |
|
147
|
|
|
""" |
|
148
|
|
|
Maps each lowest-level branch to a dict of its values. |
|
149
|
|
|
|
|
150
|
|
|
Note: |
|
151
|
|
|
Leaves directly under the root are assigned to key `''`. |
|
152
|
|
|
|
|
153
|
|
|
Returns: |
|
154
|
|
|
`dotted-key:str -> (non-dotted-key:str -> value)` |
|
155
|
|
|
""" |
|
156
|
|
|
dicts = defaultdict() |
|
157
|
|
|
for k, v in self.leaves(): |
|
158
|
|
|
k0, _, k1 = str(k).rpartition(".") |
|
159
|
|
|
dicts[k0][k1] = v |
|
160
|
|
|
return dicts |
|
161
|
|
|
|
|
162
|
|
|
def leaves(self: Self) -> dict[str, TomlLeaf]: |
|
163
|
|
|
""" |
|
164
|
|
|
Gets the leaves in this tree. |
|
165
|
|
|
|
|
166
|
|
|
Returns: |
|
167
|
|
|
`dotted-key:str -> value` |
|
168
|
|
|
""" |
|
169
|
|
|
dct = {} |
|
170
|
|
|
for key, value in self.items(): |
|
171
|
|
|
if isinstance(value, dict): |
|
172
|
|
|
dct.update({key + "." + k: v for k, v in self.__class__(value).leaves().items()}) |
|
173
|
|
|
else: |
|
174
|
|
|
dct[key] = value |
|
175
|
|
|
return dct |
|
176
|
|
|
|
|
177
|
|
|
def sub(self: Self, items: str) -> Self: |
|
178
|
|
|
""" |
|
179
|
|
|
Returns the dictionary under (dotted) keys `items`. |
|
180
|
|
|
""" |
|
181
|
|
|
# noinspection PyTypeChecker |
|
182
|
|
|
return self.__class__(self[items]) |
|
183
|
|
|
|
|
184
|
|
|
def get_as(self: Self, items: str, as_type: type[T], default: T | None = None) -> T: |
|
185
|
|
|
""" |
|
186
|
|
|
Gets the key `items` from the dict, or `default` if it does not exist |
|
187
|
|
|
|
|
188
|
|
|
Args: |
|
189
|
|
|
items: The key hierarchy, with a dot (.) as a separator |
|
190
|
|
|
as_type: The type, which will be checked using `isinstance` |
|
191
|
|
|
default: Default to return the key is not found |
|
192
|
|
|
|
|
193
|
|
|
Returns: |
|
194
|
|
|
The value in the required type |
|
195
|
|
|
|
|
196
|
|
|
Raises: |
|
197
|
|
|
XTypeError: If not `isinstance(value, as_type)` |
|
198
|
|
|
""" |
|
199
|
|
|
z = self.get(items, default) |
|
200
|
|
|
if not isinstance(z, as_type): |
|
201
|
|
|
msg = f"Value {z} from {items} is a {type(z)}, not {as_type}" |
|
202
|
|
|
raise TypeError(msg) |
|
203
|
|
|
return z |
|
204
|
|
|
|
|
205
|
|
|
def req_as(self: Self, items: str, as_type: type[T]) -> T | None: |
|
206
|
|
|
""" |
|
207
|
|
|
Gets the key `items` from the dict. |
|
208
|
|
|
|
|
209
|
|
|
Args: |
|
210
|
|
|
items: The key hierarchy, with a dot (.) as a separator |
|
211
|
|
|
as_type: The type, which will be checked using `isinstance` |
|
212
|
|
|
|
|
213
|
|
|
Returns: |
|
214
|
|
|
The value in the required type |
|
215
|
|
|
|
|
216
|
|
|
Raises: |
|
217
|
|
|
XTypeError: If not `isinstance(value, as_type)` |
|
218
|
|
|
""" |
|
219
|
|
|
z = self[items] |
|
220
|
|
|
if not isinstance(z, as_type): |
|
221
|
|
|
msg = f"Value {z} from {items} is a {type(z)}, not {as_type}" |
|
222
|
|
|
raise TypeError(msg) |
|
223
|
|
|
return z |
|
224
|
|
|
|
|
225
|
|
|
def get_list(self: Self, items: str, default: list[T] | None = None) -> list[T]: |
|
226
|
|
|
try: |
|
227
|
|
|
return self[items] |
|
228
|
|
|
except KeyError: |
|
229
|
|
|
return [] if default is None else default |
|
230
|
|
|
|
|
231
|
|
|
def get_list_as(self: Self, items: str, as_type: type[T], default: list[T] | None = None) -> list[T]: |
|
232
|
|
|
""" |
|
233
|
|
|
Gets list values from an *optional* key. |
|
234
|
|
|
""" |
|
235
|
|
|
try: |
|
236
|
|
|
x = self[items] |
|
237
|
|
|
except KeyError: |
|
238
|
|
|
return [] if default is None else default |
|
239
|
|
|
if not isinstance(x, list) or isinstance(x, str): |
|
240
|
|
|
msg = f"Value {x} is not a list for lookup {items}" |
|
241
|
|
|
raise TypeError(msg) |
|
242
|
|
|
bad = [y for y in x if not isinstance(y, as_type)] |
|
243
|
|
|
if len(bad) > 0: |
|
244
|
|
|
msg = f"Value(s) from {items} are not {as_type}: {bad}" |
|
245
|
|
|
raise TypeError(msg) |
|
246
|
|
|
return x |
|
247
|
|
|
|
|
248
|
|
|
def req_list_as(self: Self, items: str, as_type: type[T]) -> list[T]: |
|
249
|
|
|
""" |
|
250
|
|
|
Gets list values from a *required* key. |
|
251
|
|
|
""" |
|
252
|
|
|
x = self[items] |
|
253
|
|
|
if not isinstance(x, list) or isinstance(x, str): |
|
254
|
|
|
msg = f"Value {x} is not a list for lookup {items}" |
|
255
|
|
|
raise TypeError(msg) |
|
256
|
|
|
if not all(isinstance(y, as_type) for y in x): |
|
257
|
|
|
msg = f"Value {x} from {items} is a {type(x)}, not {as_type}" |
|
258
|
|
|
raise TypeError(msg) |
|
259
|
|
|
return x |
|
260
|
|
|
|
|
261
|
|
|
def req(self: Self, items: str) -> TomlLeaf | dict: |
|
262
|
|
|
return self[items] |
|
263
|
|
|
|
|
264
|
|
|
def get(self: Self, items: str, default: TomlLeaf | dict = None) -> TomlLeaf | dict: |
|
265
|
|
|
""" |
|
266
|
|
|
Gets a value from an optional key. |
|
267
|
|
|
Also see `__getitem__`. |
|
268
|
|
|
""" |
|
269
|
|
|
try: |
|
270
|
|
|
return self[items] |
|
271
|
|
|
except KeyError: |
|
272
|
|
|
return default |
|
273
|
|
|
|
|
274
|
|
|
def __getitem__(self: Self, items: str) -> TomlLeaf | dict: |
|
275
|
|
|
""" |
|
276
|
|
|
Gets a value from a required key, operating on `.`-joined strings. |
|
277
|
|
|
|
|
278
|
|
|
Example: |
|
279
|
|
|
d = WrappedToml(dict(a=dict(b=1))) |
|
280
|
|
|
assert d["a.b"] == 1 |
|
281
|
|
|
""" |
|
282
|
|
|
if "." in items: |
|
283
|
|
|
i0, _, i_ = items.partition(".") |
|
284
|
|
|
z = self[i0] |
|
285
|
|
|
if not isinstance(z, dict | NestedDotDict): |
|
286
|
|
|
msg = f"No key {items} (ends at {i0})" |
|
287
|
|
|
raise KeyError(msg) |
|
288
|
|
|
return self.__class__(z)[i_] |
|
289
|
|
|
return super().__getitem__(items) |
|
290
|
|
|
|
|
291
|
|
|
def __rich_repr__(self: Self) -> str: |
|
292
|
|
|
""" |
|
293
|
|
|
Pretty-prints the leaves of this dict using `json.dumps`. |
|
294
|
|
|
|
|
295
|
|
|
Returns: |
|
296
|
|
|
A multi-line string |
|
297
|
|
|
""" |
|
298
|
|
|
option = orjson.OPT_SORT_KEYS | orjson.OPT_INDENT_2 | orjson.OPT_UTC_Z |
|
299
|
|
|
return orjson.dumps(self.leaves(), option=option).decode(encoding="utf-8") |
|
300
|
|
|
|
|
301
|
|
|
def _to_date(self: Self, s) -> date: |
|
302
|
|
|
if isinstance(s, date): |
|
303
|
|
|
return s |
|
304
|
|
|
elif isinstance(s, str): |
|
305
|
|
|
# This is MUCH faster than tomlkit's |
|
306
|
|
|
return date.fromisoformat(s) |
|
307
|
|
|
else: |
|
308
|
|
|
msg = f"Invalid type {type(s)} for {s}" |
|
309
|
|
|
raise TypeError(msg) |
|
310
|
|
|
|
|
311
|
|
|
def _to_datetime(self: Self, s: str | datetime) -> datetime: |
|
312
|
|
|
if isinstance(s, datetime): |
|
313
|
|
|
return s |
|
314
|
|
|
elif isinstance(s, str): |
|
315
|
|
|
# This is MUCH faster than tomlkit's |
|
316
|
|
|
if s.count(":") < 2: |
|
317
|
|
|
msg = f"Datetime {s} does not contain hours, minutes, and seconds" |
|
318
|
|
|
raise ValueError(msg) |
|
319
|
|
|
return datetime.fromisoformat(s.upper().replace("Z", "+00:00")) |
|
320
|
|
|
else: |
|
321
|
|
|
msg = f"Invalid type {type(s)} for {s}" |
|
322
|
|
|
raise TypeError(msg) |
|
323
|
|
|
|
|
324
|
|
|
|
|
325
|
|
|
__all__ = ["NestedDotDict", "TomlLeaf", "TomlBranch"] |
|
326
|
|
|
|