Passed
Push — main ( ed7d21...87238c )
by Douglas
01:43
created

pocketutils.tools.json_tools   B

Complexity

Total Complexity 48

Size/Duplication

Total Lines 255
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 48
eloc 151
dl 0
loc 255
rs 8.5599
c 0
b 0
f 0

10 Methods

Rating   Name   Duplication   Size   Complexity  
A JsonEncoder.as_str() 0 5 2
F JsonTools.preserve_inf() 0 59 16
A JsonTools.decoder() 0 3 1
A JsonDecoder.from_str() 0 2 1
F MiscTypesJsonDefault.__call__() 0 47 14
A JsonTools.encoder() 0 38 4
A JsonDecoder.from_bytes() 0 6 3
A JsonTools.misc_types_default() 0 3 1
A JsonTools.new_default() 0 30 4
A JsonEncoder.as_bytes() 0 4 2

How to fix   Complexity   

Complexity

Complex classes like pocketutils.tools.json_tools often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
import base64
2
import enum
3
import inspect
4
from collections.abc import (
5
    ByteString,
6
    Callable,
7
    ItemsView,
8
    KeysView,
9
    Mapping,
10
    Sequence,
11
    Set,
12
    ValuesView,
13
)
14
from dataclasses import dataclass
15
from datetime import date, datetime, tzinfo
16
from datetime import time as _time
17
from decimal import Decimal
18
from typing import Any, Self
19
from uuid import UUID
20
21
import orjson
22
23
INF = float("Inf")
24
NEG_INF = float("-Inf")
25
NAN = float("NaN")
26
27
28
class MiscTypesJsonDefault(Callable[[Any], Any]):
29
    def __call__(self: Self, obj: Any) -> Any:
30
        """
31
        Tries to return a serializable result for `obj`.
32
        Meant to be passed as `default=` in `orjson.dumps`.
33
        Only encodes types that can always be represented exactly,
34
        without any loss of information. For that reason, it does not
35
        fall back to calling `str` or `repr` for unknown types.
36
        Handles, at least:
37
38
        - `decimal.Decimal` → str (scientific notation)
39
        - `complex` or `np.complexfloating` → str (e.g. "(3+1j)")
40
        - `typing.Mapping` → dict
41
        - `typing.ItemsView` → dict
42
        - `collections.abc.{Set,Sequence,...}` → list
43
        - `enum.Enum` → str (name)
44
        - `typing.ByteString` →  str (base-64)
45
        - `datetime.tzinfo` →  str (timezone name)
46
        - `typing.NamedTuple` →  dict
47
        - type or module →  str (name)
48
49
        Raise:
50
            TypeError: If none of those options worked
51
        """
52
        if obj is None:
53
            return obj  # we should never get here, but this seems safer
54
        elif isinstance(obj, str | int | float | datetime | date | _time | UUID):
55
            return obj  # we should never get here, but let's be safe
56
        elif isinstance(obj, Decimal | complex):
57
            return str(obj)
58
        elif isinstance(obj, enum.Enum):
59
            return obj.name
60
        elif isinstance(obj, bytes):
61
            return base64.b64decode(obj)
62
        elif isinstance(obj, ByteString):
63
            return base64.b64decode(bytes(obj))
64
        elif isinstance(obj, tzinfo):
65
            return obj.tzname(datetime.now(tz=obj))
66
        elif isinstance(obj, Set | Sequence | KeysView | ValuesView):
67
            return list(obj)
68
        elif isinstance(obj, Mapping | ItemsView):
69
            return dict(obj)
70
        elif isinstance(obj, tuple) and hasattr(obj, "_asdict"):
71
            # namedtuple
72
            return obj._asdict()
73
        elif inspect.isclass(obj) or inspect.ismodule(obj):
74
            return obj.Self
75
        raise TypeError
76
77
78
_misc_types_default = MiscTypesJsonDefault()
79
80
81
@dataclass(frozen=True, slots=True, kw_only=True)
82
class JsonEncoder:
83
    bytes_options: int
84
    str_options: int
85
    default: Callable[[Any], Any]
86
    prep: Callable[[Any], Any] | None
87
88
    def as_bytes(self: Self, data: Any) -> ByteString:
89
        if self.prep is not None:
90
            data = self.prep(data)
91
        return orjson.dumps(data, default=self.default, option=self.bytes_options)
92
93
    def as_str(self: Self, data: Any) -> str:
94
        if self.prep is not None:
95
            data = self.prep(data)
96
        x = orjson.dumps(data, default=self.default, option=self.str_options)
97
        return x.decode(encoding="utf8") + "\n"
98
99
100
@dataclass(frozen=True, slots=True)
101
class JsonDecoder:
102
    def from_bytes(self: Self, data: ByteString) -> Any:
103
        if not isinstance(data, ByteString):
104
            raise TypeError(str(type(data)))
105
        if not isinstance(data, bytes):
106
            data = bytes(data)
107
        return orjson.loads(data)
108
109
    def from_str(self: Self, data: str) -> Any:
110
        return orjson.loads(data)
111
112
113
class JsonTools:
114
    @classmethod
115
    def misc_types_default(cls: type[Self]) -> Callable[[Any], Any]:
116
        return _misc_types_default
117
118
    @classmethod
119
    def new_default(
120
        cls: type[Self],
121
        *fallbacks: Callable[[Any], Any] | None,
122
        first: Callable[[Any], Any] | None = _misc_types_default,
123
        last: Callable[[Any], Any] | None = str,
124
    ) -> Callable[[Any], Any]:
125
        """
126
        Creates a new method to be passed as `default=` to `orjson.dumps`.
127
        Tries, in order: :meth:`orjson_default`, `fallbacks`, then `str`.
128
129
        Args:
130
            first: Try this first
131
            fallbacks: Tries these, in order, after `first`, skipping any None
132
            last: Use this as the last resort; consider `str` or `repr`
133
        """
134
        then = [f for f in [first, *fallbacks] if f is not None]
135
136
        def _default(obj):
137
            for t in then:
138
                try:
139
                    return t(obj)
140
                except TypeError:
141
                    pass
142
                if last is None:
143
                    raise TypeError
144
            return last(obj)
145
146
        _default.__name__ = f"default({', '.join([str(t) for t in then])})"
147
        return _default
148
149
    @classmethod
150
    def decoder(cls: type[Self]) -> JsonDecoder:
151
        return JsonDecoder()
152
153
    @classmethod
154
    def encoder(
155
        cls: type[Self],
156
        *fallbacks: Callable[[Any], Any] | None,
157
        indent: bool = True,
158
        sort: bool = False,
159
        preserve_inf: bool = True,
160
        last: Callable[[Any], Any] | None = str,
161
    ) -> JsonEncoder:
162
        """
163
        Serializes to string with orjson, indenting and adding a trailing newline.
164
        Uses :meth:`orjson_default` to encode more types than orjson can.
165
166
        Args:
167
            indent: Indent by 2 spaces
168
            preserve_inf: Preserve infinite values with :meth:`orjson_preserve_inf`
169
            sort: Sort keys with `orjson.OPT_SORT_KEYS`;
170
                  only for :meth:`typeddfs.json_utils.JsonEncoder.as_str`
171
            last: Last resort option to encode a value
172
        """
173
        bytes_option = orjson.OPT_UTC_Z | orjson.OPT_NON_STR_KEYS
174
        str_option = orjson.OPT_UTC_Z
175
        if sort:
176
            bytes_option |= orjson.OPT_SORT_KEYS
177
            str_option |= orjson.OPT_SORT_KEYS
178
        if indent:
179
            str_option |= orjson.OPT_INDENT_2
180
        default = cls.new_default(
181
            *fallbacks,
182
            first=_misc_types_default,
183
            last=last,
184
        )
185
        prep = cls.preserve_inf if preserve_inf else None
186
        return JsonEncoder(
187
            default=default,
188
            bytes_options=bytes_option,
189
            str_options=str_option,
190
            prep=prep,
191
        )
192
193
    @classmethod
194
    def preserve_inf(cls: type[Self], data: Any) -> Any:
195
        """
196
        Recursively replaces infinite float and numpy values with strings.
197
        Orjson encodes NaN, inf, and +inf as JSON null.
198
        This function converts to string as needed to preserve infinite values.
199
        Any float scalar (`np.floating` and `float`) will be replaced with a string.
200
        Any `np.ndarray`, whether it contains an infinite value or not, will be converted
201
        to an ndarray of strings.
202
        The returned result may still not be serializable with orjson or :meth:`orjson_bytes`.
203
        Trying those methods is the best way to test for serializablity.
204
        """
205
        # we go to great lengths to avoid importing numpy
206
        # no np.isinf, np.isneginf, or np.isnan allowed
207
        # we can use the fact that Numpy float types compare to float,
208
        # including to -inf and +inf, where all comparisons between Inf/-Inf and NaN are False
209
        # So our logic is is_infinite := (data > NEG_INF) != (data < INF)
210
        # Meanwhile, we only need to deal with floats:
211
        # - int and bool stay as-is
212
        # - str stays as-is
213
        # - complex gets converted to
214
        if isinstance(data, Mapping):
215
            return {str(k): cls.preserve_inf(v) for k, v in data.items()}
216
        elif (
217
            (isinstance(data, Sequence) or type(data).__name__ == "ndarray")
218
            and not isinstance(data, str)
219
            and not isinstance(data, ByteString)
220
        ):
221
            is_np_float_array = hasattr(data, "dtype") and str(data.dtype).startswith(
222
                "dtype(float",
223
            )
224
            if (
225
                is_np_float_array
226
                or all(isinstance(v, float) for v in data)
227
                and all((v > NEG_INF) != (v < INF) for v in data)
228
            ):
229
                # it's a list or array of floats containing -Inf or +Inf
230
                # ==> convert to str to preserve
231
                return [str(v) for v in data]
232
            elif is_np_float_array:
233
                # it's an array of other types, or of floats containing neither -Inf nor +Inf
234
                # ==> convert to list (faster than recursing)
235
                # noinspection PyUnresolvedReferences
236
                return data.tolist()
237
            else:
238
                # it's an array of other types, or of floats containing neither -Inf nor +Inf
239
                # ==> return float list as-is
240
                return data
241
        elif (isinstance(data, float) or (hasattr(data, "dtype") and str(data.dtype).startswith("dtype(float"))) and (
242
            data > NEG_INF
243
        ) != (data < INF):
244
            # single float value with -Inf or +Inf
245
            # ==> preserve inf
246
            return str(data)
247
        elif type(data).__name__ == "ndarray" and hasattr(data, "dtype"):
248
            # it's a non-float Numpy array
249
            # ==> convert to list
250
            return data.astype(str).tolist()
251
        return data
252
253
254
__all__ = ["JsonEncoder", "JsonDecoder", "JsonTools"]
255