pocketutils.tools.json_tools   F
last analyzed

Complexity

Total Complexity 69

Size/Duplication

Total Lines 305
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 69
eloc 190
dl 0
loc 305
rs 2.88
c 0
b 0
f 0

10 Methods

Rating   Name   Duplication   Size   Complexity  
A JsonEncoder.as_str() 0 4 1
A JsonUtils.misc_types_default() 0 2 1
A JsonDecoder.from_str() 0 4 2
F MiscTypesJsonDefault.__call__() 0 47 14
A JsonDecoder.from_bytes() 0 8 4
A JsonUtils.new_default() 0 29 4
A JsonUtils.decoder() 0 2 1
A JsonEncoder.as_bytes() 0 3 1
F JsonUtils.prepare() 0 95 38
A JsonUtils.encoder() 0 36 3

How to fix   Complexity   

Complexity

Complex classes like pocketutils.tools.json_tools often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# SPDX-FileCopyrightText: Copyright 2020-2023, Contributors to pocketutils
2
# SPDX-PackageHomePage: https://github.com/dmyersturnbull/pocketutils
3
# SPDX-License-Identifier: Apache-2.0
4
"""
5
6
"""
7
8
import base64
9
import enum
10
import inspect
11
import json
12
from collections.abc import (
13
    Callable,
14
    ItemsView,
15
    KeysView,
16
    Mapping,
17
    Sequence,
18
    ValuesView,
19
)
20
from dataclasses import dataclass
21
from datetime import date, datetime, tzinfo
22
from datetime import time as _time
23
from decimal import Decimal
24
from typing import Any, Self
25
from uuid import UUID
26
27
try:
28
    import orjson
29
except ImportError:
30
    orjson = None
31
32
__all__ = ["NanInfHandling", "JsonEncoder", "JsonDecoder", "JsonUtils", "JsonTools"]
33
34
INF = float("Inf")
35
NEG_INF = float("-Inf")
36
NAN = float("NaN")
37
38
39
class NanInfHandling(enum.StrEnum):
40
    convert_to_str = enum.auto()
41
    convert_to_null = enum.auto()
42
    raise_error = enum.auto()
43
44
45
class MiscTypesJsonDefault(Callable[[Any], Any]):
46
    def __call__(self: Self, obj: Any) -> Any:
47
        """
48
        Tries to return a serializable result for `obj`.
49
        Meant to be passed as `default=` in `orjson.dumps`.
50
        Only encodes types that can always be represented exactly,
51
        without any loss of information. For that reason, it does not
52
        fall back to calling `str` or `repr` for unknown types.
53
        Handles, at least:
54
55
        - `decimal.Decimal` → str (scientific notation)
56
        - `complex` or `np.complexfloating` → str (e.g. "(3+1j)")
57
        - `typing.Mapping` → dict
58
        - `typing.ItemsView` → dict
59
        - `collections.abc.{Set,Sequence,...}` → list
60
        - `enum.Enum` → str (name)
61
        - `bytes | bytearray | memoryview` →  str (base-64)
62
        - `datetime.tzinfo` →  str (timezone name)
63
        - `typing.NamedTuple` →  dict
64
        - type or module →  str (name)
65
66
        Raise:
67
            TypeError: If none of those options worked
68
        """
69
        if obj is None:
70
            return obj  # we should never get here, but this seems safer
71
        elif isinstance(obj, str | int | float | datetime | date | _time | UUID):
72
            return obj  # we should never get here, but let's be safe
73
        elif isinstance(obj, Decimal | complex):
74
            return str(obj)
75
        elif isinstance(obj, enum.Enum):
76
            return obj.name
77
        elif isinstance(obj, bytes):
78
            return base64.b64decode(obj)
79
        elif isinstance(obj, bytes | bytearray | memoryview):
80
            return base64.b64decode(bytes(obj))
81
        elif isinstance(obj, tzinfo):
82
            return obj.tzname(datetime.now(tz=obj))
83
        elif isinstance(obj, set | frozenset | Sequence | KeysView | ValuesView):
84
            return list(obj)
85
        elif isinstance(obj, Mapping | ItemsView):
86
            return dict(obj)
87
        elif isinstance(obj, tuple) and hasattr(obj, "_asdict"):
88
            # namedtuple
89
            return obj._asdict()
90
        elif inspect.isclass(obj) or inspect.ismodule(obj):
91
            return obj.Self
92
        raise TypeError()
93
94
95
_misc_types_default = MiscTypesJsonDefault()
96
97
98
@dataclass(frozen=True, slots=True, kw_only=True)
99
class JsonEncoder:
100
    bytes_options: int
101
    str_options: int
102
    default: Callable[[Any], Any]
103
    prep: Callable[[Any], Any]
104
105
    def as_str(self: Self, data: Any) -> str:
106
        data = self.prep(data)
107
        x = orjson.dumps(data, default=self.default, option=self.str_options)
108
        return x.decode(encoding="utf-8") + "\n"
109
110
    def as_bytes(self: Self, data: Any) -> bytes | bytearray | memoryview:
111
        data = self.prep(data)
112
        return orjson.dumps(data, default=self.default, option=self.bytes_options)
113
114
115
@dataclass(frozen=True, slots=True)
116
class JsonDecoder:
117
    def from_bytes(self: Self, data: bytes | bytearray | memoryview) -> Any:
118
        if not isinstance(data, bytes | bytearray | memoryview):
119
            raise TypeError(str(type(data)))
120
        if not isinstance(data, bytes):
121
            data = bytes(data)
122
        if orjson:
123
            return orjson.loads(data)
124
        return json.loads(data.decode(encoding="utf-8"))
125
126
    def from_str(self: Self, data: str) -> Any:
127
        if orjson:
128
            return orjson.loads(data)
129
        json.loads(data)
130
131
132
@dataclass(slots=True, frozen=True)
133
class JsonUtils:
134
    def misc_types_default(self: Self) -> Callable[[Any], Any]:
135
        return _misc_types_default
136
137
    def new_default(
138
        self: Self,
139
        *fallbacks: Callable[[Any], Any] | None,
140
        first: Callable[[Any], Any] | None = _misc_types_default,
141
        last: Callable[[Any], Any] | None = str,
142
    ) -> Callable[[Any], Any]:
143
        """
144
        Creates a new method to be passed as `default=` to `orjson.dumps`.
145
        Tries, in order: :meth:`orjson_default`, `fallbacks`, then `str`.
146
147
        Args:
148
            first: Try this first
149
            fallbacks: Tries these, in order, after `first`, skipping any None
150
            last: Use this as the last resort; consider `str` or `repr`
151
        """
152
        then = [f for f in [first, *fallbacks] if f is not None]
153
154
        def _default(obj):
155
            for t in then:
156
                try:
157
                    return t(obj)
158
                except TypeError:  # noqa: S110
159
                    pass
160
                if last is None:
161
                    raise TypeError()
162
            return last(obj)
163
164
        _default.__name__ = f"default({', '.join([str(t) for t in then])})"
165
        return _default
166
167
    def decoder(self: Self) -> JsonDecoder:
168
        return JsonDecoder()
169
170
    def encoder(
171
        self: Self,
172
        *fallbacks: Callable[[Any], Any] | None,
173
        indent: bool = True,
174
        sort: bool = False,
175
        inf_handling: NanInfHandling = NanInfHandling.raise_error,
176
        nan_handling: NanInfHandling = NanInfHandling.raise_error,
177
        last: Callable[[Any], Any] | None = str,
178
    ) -> JsonEncoder:
179
        """
180
        Serializes to string with orjson, indenting and adding a trailing newline.
181
        Uses :meth:`orjson_default` to encode more types than orjson can.
182
183
        Args:
184
            indent: Indent by 2 spaces
185
            inf_handling: How to handle Inf and -Inf values in lists and Numpy arrays of floats
186
            nan_handling: How to handle NaN values in lists and Numpy arrays of floats
187
            sort: Sort keys with `orjson.OPT_SORT_KEYS`;
188
                  only for :meth:`pocketutils.tools.json_tools.JsonEncoder.as_str`
189
            last: Last resort option to encode a value
190
        """
191
        import orjson
192
193
        bytes_option = orjson.OPT_UTC_Z | orjson.OPT_NON_STR_KEYS
194
        str_option = orjson.OPT_UTC_Z
195
        if sort:
196
            bytes_option |= orjson.OPT_SORT_KEYS
197
            str_option |= orjson.OPT_SORT_KEYS
198
        if indent:
199
            str_option |= orjson.OPT_INDENT_2
200
        default = self.new_default(*fallbacks, first=_misc_types_default, last=last)
201
202
        def prep_fn(d):
203
            return self.prepare(d, inf_handling=inf_handling, nan_handling=nan_handling)
204
205
        return JsonEncoder(default=default, bytes_options=bytes_option, str_options=str_option, prep=prep_fn)
206
207
    def prepare(
208
        self: Self,
209
        data: Any,
210
        *,
211
        inf_handling: NanInfHandling,
212
        nan_handling: NanInfHandling,
213
    ):
214
        """
215
        Recursively replaces infinite float and numpy values with strings.
216
        Orjson encodes NaN, inf, and +inf as JSON null.
217
        This function converts to string as needed to preserve infinite values.
218
        Any float scalar (`np.floating` and `float`) will be replaced with a string.
219
        Any `np.ndarray`, whether it contains an infinite value or not, will be converted
220
        to an ndarray of strings.
221
        The returned result may still not be serializable with orjson or :meth:`orjson_bytes`.
222
        Trying those methods is the best way to test for serializeablity.
223
        """
224
        # we go to great lengths to avoid importing numpy
225
        # no np.isinf, np.isneginf, or np.isnan allowed
226
        # we can use the fact that Numpy float types compare to float,
227
        # including to -inf and +inf, where all comparisons between Inf/-Inf and NaN are False
228
        # So our logic is is_infinite := (data > NEG_INF) != (data < INF)
229
        # Meanwhile, we only need to deal with floats:
230
        # - int and bool stay as-is
231
        # - str stays as-is
232
        # - complex gets converted
233
        # figure out the type
234
        is_dict = hasattr(data, "items") and hasattr(data, "keys") and hasattr(data, "values")
235
        is_list = isinstance(data, list)
236
        is_list_with_inf = (
237
            is_list and all(isinstance(e, float) for e in data) and not all((v > NEG_INF) == (v < INF) for v in data)
238
        )
239
        is_list_with_nan = (
240
            is_list and all(isinstance(e, float) for e in data) and all(v == NEG_INF or v == INF for v in data)
241
        )
242
        is_np_array = type(data).__name__ == "ndarray" and hasattr(data, "dtype")
243
        is_np_array_with_inf = bool(
244
            is_np_array and str(data.dtype).startswith("float") and not all((v > NEG_INF) == (v < INF) for v in data),
245
        )
246
        is_np_array_with_nan = bool(
247
            is_np_array and str(data.dtype).startswith("float") and all(v == NEG_INF or v == INF for v in data),
248
        )
249
        is_inf_scalar = bool(
250
            (isinstance(data, float) or str(type(data)).startswith("<class 'numpy.float"))
251
            and (data > NEG_INF) != (data < INF),
252
        )
253
        is_nan_scalar = bool(
254
            (isinstance(data, float) or str(type(data)).startswith("<class 'numpy.float"))
255
            and (data == NEG_INF or data == INF),
256
        )
257
        # fix it
258
        if is_dict:
259
            return {
260
                str(k): self.prepare(v, inf_handling=inf_handling, nan_handling=nan_handling) for k, v in data.items()
261
            }
262
        if (is_list_with_inf or is_np_array_with_inf) and inf_handling is NanInfHandling.raise_error:
263
            raise ValueError(f"Array '{data}' contains Inf or -Inf")
264
        if (is_list_with_nan or is_np_array_with_nan) and nan_handling is NanInfHandling.raise_error:
265
            raise ValueError(f"Array '{data}' contains NaN")
266
        if is_inf_scalar and inf_handling is NanInfHandling.raise_error:
267
            raise ValueError(f"Value '{data}' is Inf or -Inf")
268
        if is_nan_scalar and nan_handling is NanInfHandling.raise_error:
269
            raise ValueError(f"Value '{data}' is NaN")
270
        if (
271
            (is_list_with_inf or is_np_array_with_inf or is_list_with_nan or is_list_with_nan)
272
            and inf_handling is NanInfHandling.convert_to_str
273
            and nan_handling is NanInfHandling.convert_to_str
274
        ):
275
            return [str(v) for v in data]
276
        if (
277
            (is_list_with_inf or is_np_array_with_inf)
278
            and (is_list_with_nan or is_list_with_nan)
279
            and inf_handling is NanInfHandling.convert_to_str
280
            and nan_handling is NanInfHandling.convert_to_null
281
        ):
282
            return [None if float(v) == NAN else str(v) for v in data]
283
        if (
284
            (is_list_with_inf or is_np_array_with_inf)
285
            and (is_list_with_nan or is_list_with_nan)
286
            and inf_handling is NanInfHandling.convert_to_null
287
            and nan_handling is NanInfHandling.convert_to_str
288
        ):
289
            return [None if float(v) == INF or float(v) == NEG_INF else str(v) for v in data]
290
        if is_np_array:
291
            return data.tolist()
292
        if is_list:
293
            return [self.prepare(e, inf_handling=inf_handling, nan_handling=nan_handling) for e in data]
294
        if (
295
            is_inf_scalar
296
            and inf_handling is NanInfHandling.convert_to_str
297
            or is_nan_scalar
298
            and nan_handling is NanInfHandling.convert_to_str
299
        ):
300
            return str(data)
301
        return data
302
303
304
JsonTools = JsonUtils()
305