pocketutils.core.dot_dict.AbstractDotDict.req()   A
last analyzed

Complexity

Conditions 1

Size

Total Lines 2
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 2
dl 0
loc 2
rs 10
c 0
b 0
f 0
cc 1
nop 2
1
# SPDX-FileCopyrightText: Copyright 2020-2023, Contributors to pocketutils
2
# SPDX-PackageHomePage: https://github.com/dmyersturnbull/pocketutils
3
# SPDX-License-Identifier: Apache-2.0
4
"""
5
6
"""
7
8
from __future__ import annotations
9
10
import abc
11
import io
12
import json
13
import pickle
14
from collections import defaultdict
15
from configparser import ConfigParser
16
from dataclasses import dataclass
17
from datetime import date, datetime
18
from typing import TYPE_CHECKING, Any, Self, TypeVar, Unpack
19
20
if TYPE_CHECKING:
21
    from collections.abc import Callable, Iterable, Mapping, MutableMapping
22
23
_Single = None | str | int | float | date | datetime
24
TomlLeaf = list[_Single] | _Single
25
# TomlBranch = dict[str, TomlLeaf | "TomlBranch"]
26
TomlBranch = dict[str, TomlLeaf | dict]
27
T = TypeVar("T", bound=TomlLeaf | TomlBranch)
28
29
30
class _Utils:
31
    @classmethod
32
    def json_encode_default(cls: type[Self], obj: Any) -> Any:
33
        if isinstance(obj, NestedDotDict):
34
            return dict(obj)
35
36
    @classmethod
37
    def dots_to_dict(cls: type[Self], items: Mapping[str, TomlLeaf]) -> dict[str, TomlLeaf | TomlBranch]:
38
        """
39
        Make sub-dictionaries from substrings in `items` delimited by `.`.
40
        Used for TOML.
41
42
        Example:
43
44
            Utils.dots_to_dict({"genus.species": "fruit bat"}) == {"genus": {"species": "fruit bat"}}
45
46
        See Also:
47
            [`dict_to_dots`](pocketutils.core.dot_dict._Utils.dicts_to_dots)
48
        """
49
        dct = {}
50
        cls._un_leaf(dct, items)
51
        return dct
52
53
    @classmethod
54
    def dict_to_dots(cls: type[Self], items: Mapping[str, TomlLeaf | TomlBranch]) -> dict[str, TomlLeaf]:
55
        """
56
        Performs the inverse of [`dict_to_dots`](pocketutils.core.dot_dict._Utils.dots_to_dicts].
57
58
        Example:
59
60
            Utils.dict_to_dots({"genus": {"species": "fruit bat"}}) == {"genus.species": "fruit bat"}
61
        """
62
        return dict(cls._re_leaf("", items))
63
64
    @classmethod
65
    def _un_leaf(cls: type[Self], to: MutableMapping[str, Any], items: Mapping[str, Any]) -> None:
66
        for k, v in items.items():
67
            if "." not in k:
68
                to[k] = v
69
            else:
70
                k0, k1 = k.split(".", 1)
71
                if k0 not in to:
72
                    to[k0] = {}
73
                cls._un_leaf(to[k0], {k1: v})
74
75
    @classmethod
76
    def _re_leaf(cls: type[Self], at: str, items: Mapping[str, Any]) -> Iterable[tuple[str, Any]]:
77
        for k, v in items.items():
78
            me = at + "." + k if len(at) > 0 else k
79
            if hasattr(v, "items") and hasattr(v, "keys") and hasattr(v, "values"):
80
                yield from cls._re_leaf(me, v)
81
            else:
82
                yield me, v
83
84
    @classmethod
85
    def check(cls: type[Self], dct: Mapping | TomlBranch | TomlLeaf) -> TomlLeaf | TomlBranch:
86
        if hasattr(dct, "items") and hasattr(dct, "keys") and hasattr(dct, "values"):
87
            bad = [k for k in dct if "." in k]
88
            if len(bad) > 0:
89
                msg = f"Key(s) contain '.': {bad}"
90
                raise ValueError(msg)
91
            return dict({k: cls.check(v) for k, v in dct.items()})
92
        return dct
93
94
95
@dataclass(frozen=True, slots=True)
96
class AbstractYamlMixin(metaclass=abc.ABCMeta):
97
    @classmethod
98
    def from_yaml(cls: type[Self], data: str) -> Self:
99
        raise NotImplementedError()
100
101
    def to_yaml(self: Self, **kwargs: Unpack[Mapping[str, Any]]) -> str:
102
        """
103
        Returns YAML text.
104
        """
105
        raise NotImplementedError()
106
107
108
@dataclass(frozen=True, slots=True)
109
class RuamelYamlMixin(AbstractYamlMixin):
110
    @classmethod
111
    def from_yaml(cls: type[Self], data: str) -> Self:
112
        from ruamel.yaml import YAML
113
114
        yaml = YAML(typ="safe")
115
        return cls(yaml.load(data))
116
117
    def to_yaml(self: Self, **kwargs: Unpack[Mapping[str, Any]]) -> str:
118
        from ruamel.yaml import YAML
119
120
        yaml = YAML(typ="safe")
121
        return yaml.dump(self, **kwargs)
122
123
124
@dataclass(frozen=True, slots=True)
125
class AbstractJsonMixin(metaclass=abc.ABCMeta):
126
    @classmethod
127
    def from_json(cls: type[Self], data: str) -> Self:
128
        raise NotImplementedError()
129
130
    def to_json(self: Self) -> str:
131
        """
132
        Returns JSON text.
133
        """
134
        raise NotImplementedError()
135
136
137
@dataclass(frozen=True, slots=True)
138
class OrjsonJsonMixin(AbstractJsonMixin):
139
    @classmethod
140
    def from_json(cls: type[Self], data: str) -> Self:
141
        import orjson
142
143
        return cls(orjson.loads(data))
144
145
    def to_json(self: Self) -> str:
146
        import orjson
147
148
        encoded = orjson.dumps(self, default=_Utils.json_encode_default)
149
        return encoded.decode(encoding="utf-8")
150
151
152
@dataclass(frozen=True, slots=True)
153
class AbstractTomlMixin(metaclass=abc.ABCMeta):
154
    @classmethod
155
    def from_toml(cls: type[Self], data: str) -> Self:
156
        raise NotImplementedError()
157
158
    def to_toml(self: Self) -> str:
159
        """
160
        Returns TOML text.
161
        """
162
        raise NotImplementedError()
163
164
165
@dataclass(frozen=True, slots=True)
166
class TomllibTomlMixin(AbstractTomlMixin):
167
    @classmethod
168
    def from_toml(cls: type[Self], data: str) -> Self:
169
        import tomllib
170
171
        return cls(tomllib.loads(data))
172
173
    def to_toml(self: Self) -> str:
174
        raise NotImplementedError()
175
176
177
@dataclass(frozen=True, slots=True)
178
class TomlkitTomlMixin(AbstractTomlMixin):
179
    @classmethod
180
    def from_toml(cls: type[Self], data: str) -> Self:
181
        import tomlkit
182
183
        return cls(tomlkit.loads(data))
184
185
    def to_toml(self: Self) -> str:
186
        import tomlkit
187
188
        return tomlkit.dumps(self)
189
190
191
@dataclass(frozen=True, slots=True)
192
class BuiltinJsonMixin(AbstractJsonMixin):
193
    @classmethod
194
    def from_json(cls: type[Self], data: str) -> Self:
195
        import json
196
197
        return cls(json.loads(data))
198
199
    def to_json(self: Self) -> str:
200
        import json
201
202
        return json.dumps(self, ensure_ascii=False)
203
204
205
@dataclass(frozen=True, slots=True)
206
class PickleMixin(metaclass=abc.ABCMeta):
207
    @classmethod
208
    def from_pickle(cls: type[Self], data: bytes) -> Self:
209
        if not isinstance(data, bytes):
210
            data = bytes(data)
211
        return cls(pickle.loads(data))  # noqa: S301
212
213
    def to_pickle(self: Self) -> bytes:
214
        """
215
        Writes to a pickle file.
216
        """
217
        return pickle.dumps(self, protocol=self.PICKLE_PROTOCOL)
218
219
220
@dataclass(frozen=True, slots=True)
221
class AbstractIniMixin(metaclass=abc.ABCMeta):
222
    @classmethod
223
    def from_ini(cls: type[Self], data: str) -> Self:
224
        raise NotImplementedError()
225
226
    def to_ini(self: Self) -> str:
227
        """
228
        Returns INI text.
229
        """
230
        raise NotImplementedError()
231
232
233
@dataclass(frozen=True, slots=True)
234
class BuiltinIniMixin(AbstractIniMixin):
235
    @classmethod
236
    def from_ini(cls: type[Self], data: str) -> Self:
237
        parser = ConfigParser()
238
        parser.read_string(data)
239
        return cls(parser)
240
241
    def to_ini(self: Self) -> str:
242
        config = ConfigParser()
243
        config.read_dict(self)
244
        writer = io.StringIO()
245
        config.write(writer)
246
        return writer.getvalue()
247
248
249
class AbstractDotDict(dict[str, TomlLeaf | TomlBranch]):
250
    """
251
    A thin wrapper around a nested dict, a wrapper for TOML.
252
253
    Keys must be strings that do not contain a dot (.).
254
    A dot is reserved for splitting values to traverse the tree.
255
    For example, `wrapped["pet.species.name"]`.
256
    """
257
258
    def __init__(self: Self, x: dict[str, TomlLeaf | TomlBranch] | Self) -> None:
259
        """
260
        Constructor.
261
262
        Raises:
263
            ValueError: If a key (in this dict or a sub-dict) is not a str or contains a dot
264
        """
265
        if not isinstance(x, AbstractDotDict | dict):
266
            msg = f"Not a dict; actually {type(x)} (value: '{x}')"
267
            raise TypeError(msg)
268
        _Utils.check(x)
269
        super().__init__(x)
270
271
    @classmethod
272
    def from_leaves(cls: type[Self], x: Mapping[str, TomlLeaf] | Self) -> Self:
273
        return cls(_Utils.dots_to_dict(x))
274
275
    def transform_leaves(self: Self, fn: Callable[[str, TomlLeaf], TomlLeaf]) -> Self:
276
        x = {k: fn(k, v) for k, v in self.leaves()}
277
        return self.__class__(x)
278
279
    def walk(self: Self) -> Iterable[TomlLeaf | TomlBranch]:
280
        for value in self.values():
281
            if isinstance(value, dict):
282
                yield from self.__class__(value).walk()
283
            else:
284
                yield value
285
286
    def nodes(self: Self) -> dict[str, TomlBranch | TomlLeaf]:
287
        return {**self.branches(), **self.leaves()}
288
289
    def branches(self: Self) -> dict[str, TomlBranch]:
290
        """
291
        Maps each lowest-level branch to a dict of its values.
292
293
        Note:
294
            Leaves directly under the root are assigned to key `''`.
295
296
        Returns:
297
            `dotted-key:str -> (non-dotted-key:str -> value)`
298
        """
299
        dicts = defaultdict(dict)
300
        for k, v in self.leaves().items():
301
            k0, _, k1 = str(k).rpartition(".")
302
            dicts[k0][k1] = v
303
        return dicts
304
305
    def leaves(self: Self) -> dict[str, TomlLeaf]:
306
        """
307
        Gets the leaves in this tree.
308
309
        Returns:
310
            `dotted-key:str -> value`
311
        """
312
        dct = {}
313
        for key, value in self.items():
314
            if isinstance(value, dict):
315
                dct.update({key + "." + k: v for k, v in self.__class__(value).leaves().items()})
316
            else:
317
                dct[key] = value
318
        return dct
319
320
    def sub(self: Self, items: str) -> Self:
321
        """
322
        Returns the dictionary under (dotted) keys `items`.
323
        """
324
        # noinspection PyTypeChecker
325
        return self.__class__(self[items])
326
327
    def get_as(self: Self, items: str, as_type: type[T], default: T | None = None) -> T:
328
        """
329
        Gets the key `items` from the dict, or `default` if it does not exist
330
331
        Args:
332
            items: The key hierarchy, with a dot (.) as a separator
333
            as_type: The type, which will be checked using `isinstance`
334
            default: Default to return the key is not found
335
336
        Returns:
337
            The value in the required type
338
339
        Raises:
340
            XTypeError: If not `isinstance(value, as_type)`
341
        """
342
        z = self.get(items, default)
343
        if not isinstance(z, as_type):
344
            msg = f"Value {z} from {items} is a {type(z)}, not {as_type}"
345
            raise TypeError(msg)
346
        return z
347
348
    def req_as(self: Self, items: str, as_type: type[T]) -> T:
349
        """
350
        Gets the key `items` from the dict.
351
352
        Args:
353
            items: The key hierarchy, with a dot (.) as a separator
354
            as_type: The type, which will be checked using `isinstance`
355
356
        Returns:
357
            The value in the required type
358
359
        Raises:
360
            XTypeError: If not `isinstance(value, as_type)`
361
        """
362
        z = self[items]
363
        if not isinstance(z, as_type):
364
            msg = f"Value {z} from {items} is a {type(z)}, not {as_type}"
365
            raise TypeError(msg)
366
        return z
367
368
    def get_list(self: Self, items: str, default: list[T] | None = None) -> list[T]:
369
        try:
370
            return self[items]
371
        except KeyError:
372
            return [] if default is None else default
373
374
    def get_list_as(self: Self, items: str, as_type: type[T], default: list[T] | None = None) -> list[T]:
375
        """
376
        Gets list values from an *optional* key.
377
        """
378
        try:
379
            x = self[items]
380
        except KeyError:
381
            return [] if default is None else default
382
        if not isinstance(x, list) or isinstance(x, str):
383
            msg = f"Value {x} is not a list for lookup {items}"
384
            raise TypeError(msg)
385
        bad = [y for y in x if not isinstance(y, as_type)]
386
        if len(bad) > 0:
387
            msg = f"Value(s) from {items} are not {as_type}: {bad}"
388
            raise TypeError(msg)
389
        return x
390
391
    def req_list_as(self: Self, items: str, as_type: type[T]) -> list[T]:
392
        """
393
        Gets list values from a *required* key.
394
        """
395
        x = self[items]
396
        if not isinstance(x, list) or isinstance(x, str):
397
            msg = f"Value {x} is not a list for lookup {items}"
398
            raise TypeError(msg)
399
        if not all(isinstance(y, as_type) for y in x):
400
            msg = f"Value {x} from {items} is a {type(x)}, not {as_type}"
401
            raise TypeError(msg)
402
        return x
403
404
    def req(self: Self, items: str) -> TomlLeaf | dict:
405
        return self[items]
406
407
    def get(self: Self, items: str, default: TomlLeaf | dict = None) -> TomlLeaf | dict:
408
        """
409
        Gets a value from an optional key.
410
        Also see `__getitem__`.
411
        """
412
        try:
413
            return self[items]
414
        except KeyError:
415
            return default
416
417
    def __getitem__(self: Self, items: str) -> TomlLeaf | dict:
418
        """
419
        Gets a value from a required key, operating on `.`-joined strings.
420
421
        Example:
422
423
            d = WrappedToml(dict(a=dict(b=1)))
424
            assert d["a.b"] == 1
425
        """
426
        if "." in items:
427
            i0, _, i_ = items.partition(".")
428
            z = self[i0]
429
            if not isinstance(z, dict | AbstractDotDict):
430
                msg = f"No key {items} (ends at {i0})"
431
                raise KeyError(msg)
432
            return self.__class__(z)[i_]
433
        return super().__getitem__(items)
434
435
    def __rich_repr__(self: Self) -> str:
436
        """
437
        Pretty-prints the leaves of this dict using `json.dumps`.
438
439
        Returns:
440
            A multi-line string
441
        """
442
        return json.dumps(self, ensure_ascii=True, indent=2)
443
444
    def _to_date(self: Self, s) -> date:
445
        if isinstance(s, date):
446
            return s
447
        elif isinstance(s, str):
448
            # This is MUCH faster than tomlkit's
449
            return date.fromisoformat(s)
450
        else:
451
            msg = f"Invalid type {type(s)} for {s}"
452
            raise TypeError(msg)
453
454
    def _to_datetime(self: Self, s: str | datetime) -> datetime:
455
        if isinstance(s, datetime):
456
            return s
457
        elif isinstance(s, str):
458
            # This is MUCH faster than tomlkit's
459
            if s.count(":") < 2:
460
                msg = f"Datetime {s} does not contain hours, minutes, and seconds"
461
                raise ValueError(msg)
462
            return datetime.fromisoformat(s.upper().replace("Z", "+00:00"))
463
        else:
464
            msg = f"Invalid type {type(s)} for {s}"
465
            raise TypeError(msg)
466
467
468
try:
469
    import orjson
470
471
    _Json = OrjsonJsonMixin
472
except ImportError:
473
    _Json = BuiltinJsonMixin
474
475
try:
476
    import orjson
477
478
    _Toml = TomlkitTomlMixin
479
except ImportError:
480
    _Toml = TomllibTomlMixin
481
482
try:
483
    import ruamel
484
485
    _Yaml = RuamelYamlMixin
486
except ImportError:
487
    _Yaml = None
488
489
if _Yaml is None:
490
491
    class NestedDotDict(AbstractDotDict, _Json, _Toml, BuiltinIniMixin, PickleMixin):
492
        pass
493
else:
494
495
    class NestedDotDict(AbstractDotDict, _Json, _Toml, _Yaml, BuiltinIniMixin, PickleMixin):
496
        pass
497
498
499
__all__ = ["NestedDotDict", "TomlLeaf", "TomlBranch"]
500