1
|
|
|
from __future__ import annotations |
2
|
|
|
|
3
|
|
|
import io |
4
|
|
|
import pickle |
5
|
|
|
import sys |
6
|
|
|
from collections import UserDict, defaultdict |
7
|
|
|
from configparser import ConfigParser |
8
|
|
|
from datetime import date, datetime |
9
|
|
|
from typing import TYPE_CHECKING, Any, ClassVar, Self, TypeVar, Unpack |
10
|
|
|
|
11
|
|
|
import tomlkit |
12
|
|
|
from orjson import orjson |
13
|
|
|
from ruamel.yaml import YAML |
14
|
|
|
|
15
|
|
|
if TYPE_CHECKING: |
16
|
|
|
from collections.abc import Callable, Generator, Mapping |
17
|
|
|
|
18
|
|
|
yaml = YAML(typ="safe") |
19
|
|
|
TomlLeaf = None | list | int | str | float | date | datetime |
20
|
|
|
TomlBranch = dict[str, TomlLeaf] |
21
|
|
|
T = TypeVar("T") |
22
|
|
|
|
23
|
|
|
|
24
|
|
|
def _json_encode_default(obj: Any) -> Any: |
25
|
|
|
if isinstance(obj, NestedDotDict): |
26
|
|
|
# noinspection PyProtectedMember |
27
|
|
|
return dict(obj) |
28
|
|
|
|
29
|
|
|
|
30
|
|
|
def _check(dct: TomlBranch | TomlLeaf) -> None: |
31
|
|
|
if isinstance(dct, dict): |
32
|
|
|
bad = [k for k in dct if "." in k] |
33
|
|
|
if len(bad) > 0: |
34
|
|
|
msg = f"Key(s) contain '.': {bad}" |
35
|
|
|
raise ValueError(msg) |
36
|
|
|
for v in dct.values(): |
37
|
|
|
_check(v) |
38
|
|
|
|
39
|
|
|
|
40
|
|
|
class NestedDotDict(UserDict): |
41
|
|
|
""" |
42
|
|
|
A thin wrapper around a nested dict, a wrapper for TOML. |
43
|
|
|
|
44
|
|
|
Keys must be strings that do not contain a dot (.). |
45
|
|
|
A dot is reserved for splitting values to traverse the tree. |
46
|
|
|
For example, `wrapped["pet.species.name"]`. |
47
|
|
|
""" |
48
|
|
|
|
49
|
|
|
PICKLE_PROTOCOL: ClassVar[int] = 5 |
50
|
|
|
|
51
|
|
|
def __init__(self: Self, x: dict[str, TomlLeaf | TomlBranch] | Self) -> None: |
52
|
|
|
""" |
53
|
|
|
Constructor. |
54
|
|
|
|
55
|
|
|
Raises: |
56
|
|
|
ValueError: If a key (in this dict or a sub-dict) is not a str or contains a dot |
57
|
|
|
""" |
58
|
|
|
if not isinstance(x, NestedDotDict | dict): |
59
|
|
|
msg = f"Not a dict; actually {type(x)} (value: '{x}')" |
60
|
|
|
raise TypeError(msg) |
61
|
|
|
_check(x) |
62
|
|
|
super().__init__(x) |
63
|
|
|
|
64
|
|
|
@classmethod |
65
|
|
|
def from_toml(cls: type[Self], data: str) -> Self: |
66
|
|
|
return cls(tomlkit.loads(data)) |
67
|
|
|
|
68
|
|
|
@classmethod |
69
|
|
|
def from_yaml(cls: type[Self], data: str) -> Self: |
70
|
|
|
return cls(yaml.load(data)) |
71
|
|
|
|
72
|
|
|
@classmethod |
73
|
|
|
def from_ini(cls: type[Self], data: str) -> Self: |
74
|
|
|
parser = ConfigParser() |
75
|
|
|
parser.read_string(data) |
76
|
|
|
return cls(parser) |
77
|
|
|
|
78
|
|
|
@classmethod |
79
|
|
|
def from_json(cls: type[Self], data: str) -> Self: |
80
|
|
|
return cls(orjson.loads(data)) |
81
|
|
|
|
82
|
|
|
@classmethod |
83
|
|
|
def parse_pickle(cls: type[Self], data: bytes) -> Self: |
84
|
|
|
if not isinstance(data, bytes): |
85
|
|
|
data = bytes(data) |
86
|
|
|
return cls(pickle.loads(data)) |
87
|
|
|
|
88
|
|
|
def to_json(self: Self, *, indent: bool = False) -> str: |
89
|
|
|
""" |
90
|
|
|
Returns JSON text. |
91
|
|
|
""" |
92
|
|
|
kwargs = {"option": orjson.OPT_INDENT_2} if indent else {} |
93
|
|
|
encoded = orjson.dumps(self, default=_json_encode_default, **kwargs) |
94
|
|
|
return encoded.decode(encoding="utf-8") |
95
|
|
|
|
96
|
|
|
def to_yaml(self: Self, **kwargs: Unpack[Mapping[str, Any]]) -> str: |
97
|
|
|
""" |
98
|
|
|
Returns JSON text. |
99
|
|
|
""" |
100
|
|
|
return yaml.dump(self, **kwargs) |
101
|
|
|
|
102
|
|
|
def to_ini(self: Self) -> str: |
103
|
|
|
""" |
104
|
|
|
Returns TOML text. |
105
|
|
|
""" |
106
|
|
|
config = ConfigParser() |
107
|
|
|
config.read_dict(self) |
108
|
|
|
writer = io.StringIO() |
109
|
|
|
config.write(writer) |
110
|
|
|
return writer.getvalue() |
111
|
|
|
|
112
|
|
|
def to_toml(self: Self) -> str: |
113
|
|
|
""" |
114
|
|
|
Returns TOML text. |
115
|
|
|
""" |
116
|
|
|
return tomlkit.dumps(self) |
117
|
|
|
|
118
|
|
|
def to_pickle(self: Self) -> bytes: |
119
|
|
|
""" |
120
|
|
|
Writes to a pickle file. |
121
|
|
|
""" |
122
|
|
|
return pickle.dumps(self, protocol=self.PICKLE_PROTOCOL) |
123
|
|
|
|
124
|
|
|
def n_elements_total(self: Self) -> int: |
125
|
|
|
i = 0 |
126
|
|
|
for _ in self.walk(): |
127
|
|
|
i += 1 |
128
|
|
|
return i |
129
|
|
|
|
130
|
|
|
def n_bytes_total(self: Self) -> int: |
131
|
|
|
return sum([sys.getsizeof(x) for x in self.walk()]) |
132
|
|
|
|
133
|
|
|
def transform_leaves(self: Self, fn: Callable[[str, TomlLeaf], TomlLeaf]) -> Self: |
134
|
|
|
x = {k: fn(k, v) for k, v in self.leaves()} |
135
|
|
|
return self.__class__(x) |
136
|
|
|
|
137
|
|
|
def walk(self: Self) -> Generator[TomlLeaf | TomlBranch, None, None]: |
138
|
|
|
for value in self.values(): |
139
|
|
|
if isinstance(value, dict): |
140
|
|
|
yield from self.__class__(value).walk() |
141
|
|
|
elif isinstance(value, list): |
142
|
|
|
yield from value |
143
|
|
|
else: |
144
|
|
|
yield value |
145
|
|
|
|
146
|
|
|
def branches(self: Self) -> dict[str, TomlBranch]: |
147
|
|
|
""" |
148
|
|
|
Maps each lowest-level branch to a dict of its values. |
149
|
|
|
|
150
|
|
|
Note: |
151
|
|
|
Leaves directly under the root are assigned to key `''`. |
152
|
|
|
|
153
|
|
|
Returns: |
154
|
|
|
`dotted-key:str -> (non-dotted-key:str -> value)` |
155
|
|
|
""" |
156
|
|
|
dicts = defaultdict() |
157
|
|
|
for k, v in self.leaves(): |
158
|
|
|
k0, _, k1 = str(k).rpartition(".") |
159
|
|
|
dicts[k0][k1] = v |
160
|
|
|
return dicts |
161
|
|
|
|
162
|
|
|
def leaves(self: Self) -> dict[str, TomlLeaf]: |
163
|
|
|
""" |
164
|
|
|
Gets the leaves in this tree. |
165
|
|
|
|
166
|
|
|
Returns: |
167
|
|
|
`dotted-key:str -> value` |
168
|
|
|
""" |
169
|
|
|
dct = {} |
170
|
|
|
for key, value in self.items(): |
171
|
|
|
if isinstance(value, dict): |
172
|
|
|
dct.update({key + "." + k: v for k, v in self.__class__(value).leaves().items()}) |
173
|
|
|
else: |
174
|
|
|
dct[key] = value |
175
|
|
|
return dct |
176
|
|
|
|
177
|
|
|
def sub(self: Self, items: str) -> Self: |
178
|
|
|
""" |
179
|
|
|
Returns the dictionary under (dotted) keys `items`. |
180
|
|
|
""" |
181
|
|
|
# noinspection PyTypeChecker |
182
|
|
|
return self.__class__(self[items]) |
183
|
|
|
|
184
|
|
|
def get_as(self: Self, items: str, as_type: type[T], default: T | None = None) -> T: |
185
|
|
|
""" |
186
|
|
|
Gets the key `items` from the dict, or `default` if it does not exist |
187
|
|
|
|
188
|
|
|
Args: |
189
|
|
|
items: The key hierarchy, with a dot (.) as a separator |
190
|
|
|
as_type: The type, which will be checked using `isinstance` |
191
|
|
|
default: Default to return the key is not found |
192
|
|
|
|
193
|
|
|
Returns: |
194
|
|
|
The value in the required type |
195
|
|
|
|
196
|
|
|
Raises: |
197
|
|
|
XTypeError: If not `isinstance(value, as_type)` |
198
|
|
|
""" |
199
|
|
|
z = self.get(items, default) |
200
|
|
|
if not isinstance(z, as_type): |
201
|
|
|
msg = f"Value {z} from {items} is a {type(z)}, not {as_type}" |
202
|
|
|
raise TypeError(msg) |
203
|
|
|
return z |
204
|
|
|
|
205
|
|
|
def req_as(self: Self, items: str, as_type: type[T]) -> T | None: |
206
|
|
|
""" |
207
|
|
|
Gets the key `items` from the dict. |
208
|
|
|
|
209
|
|
|
Args: |
210
|
|
|
items: The key hierarchy, with a dot (.) as a separator |
211
|
|
|
as_type: The type, which will be checked using `isinstance` |
212
|
|
|
|
213
|
|
|
Returns: |
214
|
|
|
The value in the required type |
215
|
|
|
|
216
|
|
|
Raises: |
217
|
|
|
XTypeError: If not `isinstance(value, as_type)` |
218
|
|
|
""" |
219
|
|
|
z = self[items] |
220
|
|
|
if not isinstance(z, as_type): |
221
|
|
|
msg = f"Value {z} from {items} is a {type(z)}, not {as_type}" |
222
|
|
|
raise TypeError(msg) |
223
|
|
|
return z |
224
|
|
|
|
225
|
|
|
def get_list(self: Self, items: str, default: list[T] | None = None) -> list[T]: |
226
|
|
|
try: |
227
|
|
|
return self[items] |
228
|
|
|
except KeyError: |
229
|
|
|
return [] if default is None else default |
230
|
|
|
|
231
|
|
|
def get_list_as(self: Self, items: str, as_type: type[T], default: list[T] | None = None) -> list[T]: |
232
|
|
|
""" |
233
|
|
|
Gets list values from an *optional* key. |
234
|
|
|
""" |
235
|
|
|
try: |
236
|
|
|
x = self[items] |
237
|
|
|
except KeyError: |
238
|
|
|
return [] if default is None else default |
239
|
|
|
if not isinstance(x, list) or isinstance(x, str): |
240
|
|
|
msg = f"Value {x} is not a list for lookup {items}" |
241
|
|
|
raise TypeError(msg) |
242
|
|
|
bad = [y for y in x if not isinstance(y, as_type)] |
243
|
|
|
if len(bad) > 0: |
244
|
|
|
msg = f"Value(s) from {items} are not {as_type}: {bad}" |
245
|
|
|
raise TypeError(msg) |
246
|
|
|
return x |
247
|
|
|
|
248
|
|
|
def req_list_as(self: Self, items: str, as_type: type[T]) -> list[T]: |
249
|
|
|
""" |
250
|
|
|
Gets list values from a *required* key. |
251
|
|
|
""" |
252
|
|
|
x = self[items] |
253
|
|
|
if not isinstance(x, list) or isinstance(x, str): |
254
|
|
|
msg = f"Value {x} is not a list for lookup {items}" |
255
|
|
|
raise TypeError(msg) |
256
|
|
|
if not all(isinstance(y, as_type) for y in x): |
257
|
|
|
msg = f"Value {x} from {items} is a {type(x)}, not {as_type}" |
258
|
|
|
raise TypeError(msg) |
259
|
|
|
return x |
260
|
|
|
|
261
|
|
|
def req(self: Self, items: str) -> TomlLeaf | dict: |
262
|
|
|
return self[items] |
263
|
|
|
|
264
|
|
|
def get(self: Self, items: str, default: TomlLeaf | dict = None) -> TomlLeaf | dict: |
265
|
|
|
""" |
266
|
|
|
Gets a value from an optional key. |
267
|
|
|
Also see `__getitem__`. |
268
|
|
|
""" |
269
|
|
|
try: |
270
|
|
|
return self[items] |
271
|
|
|
except KeyError: |
272
|
|
|
return default |
273
|
|
|
|
274
|
|
|
def __getitem__(self: Self, items: str) -> TomlLeaf | dict: |
275
|
|
|
""" |
276
|
|
|
Gets a value from a required key, operating on `.`-joined strings. |
277
|
|
|
|
278
|
|
|
Example: |
279
|
|
|
d = WrappedToml(dict(a=dict(b=1))) |
280
|
|
|
assert d["a.b"] == 1 |
281
|
|
|
""" |
282
|
|
|
if "." in items: |
283
|
|
|
i0, _, i_ = items.partition(".") |
284
|
|
|
z = self[i0] |
285
|
|
|
if not isinstance(z, dict | NestedDotDict): |
286
|
|
|
msg = f"No key {items} (ends at {i0})" |
287
|
|
|
raise KeyError(msg) |
288
|
|
|
return self.__class__(z)[i_] |
289
|
|
|
return super().__getitem__(items) |
290
|
|
|
|
291
|
|
|
def __rich_repr__(self: Self) -> str: |
292
|
|
|
""" |
293
|
|
|
Pretty-prints the leaves of this dict using `json.dumps`. |
294
|
|
|
|
295
|
|
|
Returns: |
296
|
|
|
A multi-line string |
297
|
|
|
""" |
298
|
|
|
option = orjson.OPT_SORT_KEYS | orjson.OPT_INDENT_2 | orjson.OPT_UTC_Z |
299
|
|
|
return orjson.dumps(self.leaves(), option=option).decode(encoding="utf-8") |
300
|
|
|
|
301
|
|
|
def _to_date(self: Self, s) -> date: |
302
|
|
|
if isinstance(s, date): |
303
|
|
|
return s |
304
|
|
|
elif isinstance(s, str): |
305
|
|
|
# This is MUCH faster than tomlkit's |
306
|
|
|
return date.fromisoformat(s) |
307
|
|
|
else: |
308
|
|
|
msg = f"Invalid type {type(s)} for {s}" |
309
|
|
|
raise TypeError(msg) |
310
|
|
|
|
311
|
|
|
def _to_datetime(self: Self, s: str | datetime) -> datetime: |
312
|
|
|
if isinstance(s, datetime): |
313
|
|
|
return s |
314
|
|
|
elif isinstance(s, str): |
315
|
|
|
# This is MUCH faster than tomlkit's |
316
|
|
|
if s.count(":") < 2: |
317
|
|
|
msg = f"Datetime {s} does not contain hours, minutes, and seconds" |
318
|
|
|
raise ValueError(msg) |
319
|
|
|
return datetime.fromisoformat(s.upper().replace("Z", "+00:00")) |
320
|
|
|
else: |
321
|
|
|
msg = f"Invalid type {type(s)} for {s}" |
322
|
|
|
raise TypeError(msg) |
323
|
|
|
|
324
|
|
|
|
325
|
|
|
__all__ = ["NestedDotDict", "TomlLeaf", "TomlBranch"] |
326
|
|
|
|