Passed
Push — main ( ed7d21...87238c )
by Douglas
01:43
created

pocketutils.core.dot_dict   F

Complexity

Total Complexity 65

Size/Duplication

Total Lines 326
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 65
eloc 181
dl 0
loc 326
rs 3.2
c 0
b 0
f 0

29 Methods

Rating   Name   Duplication   Size   Complexity  
A NestedDotDict.transform_leaves() 0 3 1
A NestedDotDict.__rich_repr__() 0 9 1
A NestedDotDict.req() 0 2 1
A NestedDotDict.from_json() 0 3 1
A NestedDotDict._to_datetime() 0 12 4
A NestedDotDict.__getitem__() 0 16 3
A NestedDotDict.req_list_as() 0 12 4
A NestedDotDict.parse_pickle() 0 5 2
A NestedDotDict.get_list() 0 5 3
A NestedDotDict.branches() 0 15 2
A NestedDotDict.from_yaml() 0 3 1
A NestedDotDict.to_yaml() 0 5 1
A NestedDotDict.get_as() 0 20 2
A NestedDotDict.to_toml() 0 5 1
A NestedDotDict.get() 0 9 2
A NestedDotDict.leaves() 0 14 3
A NestedDotDict.to_ini() 0 9 1
A NestedDotDict.n_bytes_total() 0 2 1
A NestedDotDict.from_toml() 0 3 1
A NestedDotDict.n_elements_total() 0 5 2
A NestedDotDict.req_as() 0 19 2
A NestedDotDict.from_ini() 0 5 1
A NestedDotDict.to_json() 0 7 2
A NestedDotDict.__init__() 0 12 2
A NestedDotDict.to_pickle() 0 5 1
B NestedDotDict.get_list_as() 0 16 6
A NestedDotDict.walk() 0 8 4
A NestedDotDict.sub() 0 6 1
A NestedDotDict._to_date() 0 9 3

2 Functions

Rating   Name   Duplication   Size   Complexity  
A _check() 0 8 4
A _json_encode_default() 0 4 2

How to fix   Complexity   

Complexity

Complex classes like pocketutils.core.dot_dict often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
from __future__ import annotations
2
3
import io
4
import pickle
5
import sys
6
from collections import UserDict, defaultdict
7
from configparser import ConfigParser
8
from datetime import date, datetime
9
from typing import TYPE_CHECKING, Any, ClassVar, Self, TypeVar, Unpack
10
11
import tomlkit
12
from orjson import orjson
13
from ruamel.yaml import YAML
14
15
if TYPE_CHECKING:
16
    from collections.abc import Callable, Generator, Mapping
17
18
yaml = YAML(typ="safe")
19
TomlLeaf = None | list | int | str | float | date | datetime
20
TomlBranch = dict[str, TomlLeaf]
21
T = TypeVar("T")
22
23
24
def _json_encode_default(obj: Any) -> Any:
25
    if isinstance(obj, NestedDotDict):
26
        # noinspection PyProtectedMember
27
        return dict(obj)
28
29
30
def _check(dct: TomlBranch | TomlLeaf) -> None:
31
    if isinstance(dct, dict):
32
        bad = [k for k in dct if "." in k]
33
        if len(bad) > 0:
34
            msg = f"Key(s) contain '.': {bad}"
35
            raise ValueError(msg)
36
        for v in dct.values():
37
            _check(v)
38
39
40
class NestedDotDict(UserDict):
41
    """
42
    A thin wrapper around a nested dict, a wrapper for TOML.
43
44
    Keys must be strings that do not contain a dot (.).
45
    A dot is reserved for splitting values to traverse the tree.
46
    For example, `wrapped["pet.species.name"]`.
47
    """
48
49
    PICKLE_PROTOCOL: ClassVar[int] = 5
50
51
    def __init__(self: Self, x: dict[str, TomlLeaf | TomlBranch] | Self) -> None:
52
        """
53
        Constructor.
54
55
        Raises:
56
            ValueError: If a key (in this dict or a sub-dict) is not a str or contains a dot
57
        """
58
        if not isinstance(x, NestedDotDict | dict):
59
            msg = f"Not a dict; actually {type(x)} (value: '{x}')"
60
            raise TypeError(msg)
61
        _check(x)
62
        super().__init__(x)
63
64
    @classmethod
65
    def from_toml(cls: type[Self], data: str) -> Self:
66
        return cls(tomlkit.loads(data))
67
68
    @classmethod
69
    def from_yaml(cls: type[Self], data: str) -> Self:
70
        return cls(yaml.load(data))
71
72
    @classmethod
73
    def from_ini(cls: type[Self], data: str) -> Self:
74
        parser = ConfigParser()
75
        parser.read_string(data)
76
        return cls(parser)
77
78
    @classmethod
79
    def from_json(cls: type[Self], data: str) -> Self:
80
        return cls(orjson.loads(data))
81
82
    @classmethod
83
    def parse_pickle(cls: type[Self], data: bytes) -> Self:
84
        if not isinstance(data, bytes):
85
            data = bytes(data)
86
        return cls(pickle.loads(data))
87
88
    def to_json(self: Self, *, indent: bool = False) -> str:
89
        """
90
        Returns JSON text.
91
        """
92
        kwargs = {"option": orjson.OPT_INDENT_2} if indent else {}
93
        encoded = orjson.dumps(self, default=_json_encode_default, **kwargs)
94
        return encoded.decode(encoding="utf-8")
95
96
    def to_yaml(self: Self, **kwargs: Unpack[Mapping[str, Any]]) -> str:
97
        """
98
        Returns JSON text.
99
        """
100
        return yaml.dump(self, **kwargs)
101
102
    def to_ini(self: Self) -> str:
103
        """
104
        Returns TOML text.
105
        """
106
        config = ConfigParser()
107
        config.read_dict(self)
108
        writer = io.StringIO()
109
        config.write(writer)
110
        return writer.getvalue()
111
112
    def to_toml(self: Self) -> str:
113
        """
114
        Returns TOML text.
115
        """
116
        return tomlkit.dumps(self)
117
118
    def to_pickle(self: Self) -> bytes:
119
        """
120
        Writes to a pickle file.
121
        """
122
        return pickle.dumps(self, protocol=self.PICKLE_PROTOCOL)
123
124
    def n_elements_total(self: Self) -> int:
125
        i = 0
126
        for _ in self.walk():
127
            i += 1
128
        return i
129
130
    def n_bytes_total(self: Self) -> int:
131
        return sum([sys.getsizeof(x) for x in self.walk()])
132
133
    def transform_leaves(self: Self, fn: Callable[[str, TomlLeaf], TomlLeaf]) -> Self:
134
        x = {k: fn(k, v) for k, v in self.leaves()}
135
        return self.__class__(x)
136
137
    def walk(self: Self) -> Generator[TomlLeaf | TomlBranch, None, None]:
138
        for value in self.values():
139
            if isinstance(value, dict):
140
                yield from self.__class__(value).walk()
141
            elif isinstance(value, list):
142
                yield from value
143
            else:
144
                yield value
145
146
    def branches(self: Self) -> dict[str, TomlBranch]:
147
        """
148
        Maps each lowest-level branch to a dict of its values.
149
150
        Note:
151
            Leaves directly under the root are assigned to key `''`.
152
153
        Returns:
154
            `dotted-key:str -> (non-dotted-key:str -> value)`
155
        """
156
        dicts = defaultdict()
157
        for k, v in self.leaves():
158
            k0, _, k1 = str(k).rpartition(".")
159
            dicts[k0][k1] = v
160
        return dicts
161
162
    def leaves(self: Self) -> dict[str, TomlLeaf]:
163
        """
164
        Gets the leaves in this tree.
165
166
        Returns:
167
            `dotted-key:str -> value`
168
        """
169
        dct = {}
170
        for key, value in self.items():
171
            if isinstance(value, dict):
172
                dct.update({key + "." + k: v for k, v in self.__class__(value).leaves().items()})
173
            else:
174
                dct[key] = value
175
        return dct
176
177
    def sub(self: Self, items: str) -> Self:
178
        """
179
        Returns the dictionary under (dotted) keys `items`.
180
        """
181
        # noinspection PyTypeChecker
182
        return self.__class__(self[items])
183
184
    def get_as(self: Self, items: str, as_type: type[T], default: T | None = None) -> T:
185
        """
186
        Gets the key `items` from the dict, or `default` if it does not exist
187
188
        Args:
189
            items: The key hierarchy, with a dot (.) as a separator
190
            as_type: The type, which will be checked using `isinstance`
191
            default: Default to return the key is not found
192
193
        Returns:
194
            The value in the required type
195
196
        Raises:
197
            XTypeError: If not `isinstance(value, as_type)`
198
        """
199
        z = self.get(items, default)
200
        if not isinstance(z, as_type):
201
            msg = f"Value {z} from {items} is a {type(z)}, not {as_type}"
202
            raise TypeError(msg)
203
        return z
204
205
    def req_as(self: Self, items: str, as_type: type[T]) -> T | None:
206
        """
207
        Gets the key `items` from the dict.
208
209
        Args:
210
            items: The key hierarchy, with a dot (.) as a separator
211
            as_type: The type, which will be checked using `isinstance`
212
213
        Returns:
214
            The value in the required type
215
216
        Raises:
217
            XTypeError: If not `isinstance(value, as_type)`
218
        """
219
        z = self[items]
220
        if not isinstance(z, as_type):
221
            msg = f"Value {z} from {items} is a {type(z)}, not {as_type}"
222
            raise TypeError(msg)
223
        return z
224
225
    def get_list(self: Self, items: str, default: list[T] | None = None) -> list[T]:
226
        try:
227
            return self[items]
228
        except KeyError:
229
            return [] if default is None else default
230
231
    def get_list_as(self: Self, items: str, as_type: type[T], default: list[T] | None = None) -> list[T]:
232
        """
233
        Gets list values from an *optional* key.
234
        """
235
        try:
236
            x = self[items]
237
        except KeyError:
238
            return [] if default is None else default
239
        if not isinstance(x, list) or isinstance(x, str):
240
            msg = f"Value {x} is not a list for lookup {items}"
241
            raise TypeError(msg)
242
        bad = [y for y in x if not isinstance(y, as_type)]
243
        if len(bad) > 0:
244
            msg = f"Value(s) from {items} are not {as_type}: {bad}"
245
            raise TypeError(msg)
246
        return x
247
248
    def req_list_as(self: Self, items: str, as_type: type[T]) -> list[T]:
249
        """
250
        Gets list values from a *required* key.
251
        """
252
        x = self[items]
253
        if not isinstance(x, list) or isinstance(x, str):
254
            msg = f"Value {x} is not a list for lookup {items}"
255
            raise TypeError(msg)
256
        if not all(isinstance(y, as_type) for y in x):
257
            msg = f"Value {x} from {items} is a {type(x)}, not {as_type}"
258
            raise TypeError(msg)
259
        return x
260
261
    def req(self: Self, items: str) -> TomlLeaf | dict:
262
        return self[items]
263
264
    def get(self: Self, items: str, default: TomlLeaf | dict = None) -> TomlLeaf | dict:
265
        """
266
        Gets a value from an optional key.
267
        Also see `__getitem__`.
268
        """
269
        try:
270
            return self[items]
271
        except KeyError:
272
            return default
273
274
    def __getitem__(self: Self, items: str) -> TomlLeaf | dict:
275
        """
276
        Gets a value from a required key, operating on `.`-joined strings.
277
278
        Example:
279
            d = WrappedToml(dict(a=dict(b=1)))
280
            assert d["a.b"] == 1
281
        """
282
        if "." in items:
283
            i0, _, i_ = items.partition(".")
284
            z = self[i0]
285
            if not isinstance(z, dict | NestedDotDict):
286
                msg = f"No key {items} (ends at {i0})"
287
                raise KeyError(msg)
288
            return self.__class__(z)[i_]
289
        return super().__getitem__(items)
290
291
    def __rich_repr__(self: Self) -> str:
292
        """
293
        Pretty-prints the leaves of this dict using `json.dumps`.
294
295
        Returns:
296
            A multi-line string
297
        """
298
        option = orjson.OPT_SORT_KEYS | orjson.OPT_INDENT_2 | orjson.OPT_UTC_Z
299
        return orjson.dumps(self.leaves(), option=option).decode(encoding="utf-8")
300
301
    def _to_date(self: Self, s) -> date:
302
        if isinstance(s, date):
303
            return s
304
        elif isinstance(s, str):
305
            # This is MUCH faster than tomlkit's
306
            return date.fromisoformat(s)
307
        else:
308
            msg = f"Invalid type {type(s)} for {s}"
309
            raise TypeError(msg)
310
311
    def _to_datetime(self: Self, s: str | datetime) -> datetime:
312
        if isinstance(s, datetime):
313
            return s
314
        elif isinstance(s, str):
315
            # This is MUCH faster than tomlkit's
316
            if s.count(":") < 2:
317
                msg = f"Datetime {s} does not contain hours, minutes, and seconds"
318
                raise ValueError(msg)
319
            return datetime.fromisoformat(s.upper().replace("Z", "+00:00"))
320
        else:
321
            msg = f"Invalid type {type(s)} for {s}"
322
            raise TypeError(msg)
323
324
325
__all__ = ["NestedDotDict", "TomlLeaf", "TomlBranch"]
326