Passed
Push — main ( 87238c...9f1476 )
by Douglas
02:33
created

JsonUtils.preserve_inf()   F

Complexity

Conditions 16

Size

Total Lines 56
Code Lines 23

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 23
dl 0
loc 56
rs 2.4
c 0
b 0
f 0
cc 16
nop 2

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like pocketutils.tools.json_tools.JsonUtils.preserve_inf() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# SPDX-FileCopyrightText: Copyright 2020-2023, Contributors to pocketutils
2
# SPDX-PackageHomePage: https://github.com/dmyersturnbull/pocketutils
3
# SPDX-License-Identifier: Apache-2.0
4
"""
5
6
"""
7
8
import base64
9
import enum
10
import inspect
11
import json
12
from collections.abc import (
13
    Callable,
14
    ItemsView,
15
    KeysView,
16
    Mapping,
17
    Sequence,
18
    ValuesView,
19
)
20
from dataclasses import dataclass
21
from datetime import date, datetime, tzinfo
22
from datetime import time as _time
23
from decimal import Decimal
24
from typing import Any, Self
25
from uuid import UUID
26
27
try:
28
    import orjson
29
except ImportError:
30
    orjson = None
31
32
__all__ = ["JsonEncoder", "JsonDecoder", "JsonUtils", "JsonTools"]
33
34
INF = float("Inf")
35
NEG_INF = float("-Inf")
36
NAN = float("NaN")
37
38
39
class MiscTypesJsonDefault(Callable[[Any], Any]):
40
    def __call__(self: Self, obj: Any) -> Any:
41
        """
42
        Tries to return a serializable result for `obj`.
43
        Meant to be passed as `default=` in `orjson.dumps`.
44
        Only encodes types that can always be represented exactly,
45
        without any loss of information. For that reason, it does not
46
        fall back to calling `str` or `repr` for unknown types.
47
        Handles, at least:
48
49
        - `decimal.Decimal` → str (scientific notation)
50
        - `complex` or `np.complexfloating` → str (e.g. "(3+1j)")
51
        - `typing.Mapping` → dict
52
        - `typing.ItemsView` → dict
53
        - `collections.abc.{Set,Sequence,...}` → list
54
        - `enum.Enum` → str (name)
55
        - `bytes | bytearray | memoryview` →  str (base-64)
56
        - `datetime.tzinfo` →  str (timezone name)
57
        - `typing.NamedTuple` →  dict
58
        - type or module →  str (name)
59
60
        Raise:
61
            TypeError: If none of those options worked
62
        """
63
        if obj is None:
64
            return obj  # we should never get here, but this seems safer
65
        elif isinstance(obj, str | int | float | datetime | date | _time | UUID):
66
            return obj  # we should never get here, but let's be safe
67
        elif isinstance(obj, Decimal | complex):
68
            return str(obj)
69
        elif isinstance(obj, enum.Enum):
70
            return obj.name
71
        elif isinstance(obj, bytes):
72
            return base64.b64decode(obj)
73
        elif isinstance(obj, bytes | bytearray | memoryview):
74
            return base64.b64decode(bytes(obj))
75
        elif isinstance(obj, tzinfo):
76
            return obj.tzname(datetime.now(tz=obj))
77
        elif isinstance(obj, set | frozenset | Sequence | KeysView | ValuesView):
78
            return list(obj)
79
        elif isinstance(obj, Mapping | ItemsView):
80
            return dict(obj)
81
        elif isinstance(obj, tuple) and hasattr(obj, "_asdict"):
82
            # namedtuple
83
            return obj._asdict()
84
        elif inspect.isclass(obj) or inspect.ismodule(obj):
85
            return obj.Self
86
        raise TypeError()
87
88
89
_misc_types_default = MiscTypesJsonDefault()
90
91
92
@dataclass(frozen=True, slots=True, kw_only=True)
93
class JsonEncoder:
94
    bytes_options: int
95
    str_options: int
96
    default: Callable[[Any], Any]
97
    prep: Callable[[Any], Any] | None
98
99
    def as_bytes(self: Self, data: Any) -> bytes | bytearray | memoryview:
100
        if self.prep is not None:
101
            data = self.prep(data)
102
        return orjson.dumps(data, default=self.default, option=self.bytes_options)
103
104
    def as_str(self: Self, data: Any) -> str:
105
        if self.prep is not None:
106
            data = self.prep(data)
107
        x = orjson.dumps(data, default=self.default, option=self.str_options)
108
        return x.decode(encoding="utf-8") + "\n"
109
110
111
@dataclass(frozen=True, slots=True)
112
class JsonDecoder:
113
    def from_bytes(self: Self, data: bytes | bytearray | memoryview) -> Any:
114
        if not isinstance(data, bytes | bytearray | memoryview):
115
            raise TypeError(str(type(data)))
116
        if not isinstance(data, bytes):
117
            data = bytes(data)
118
        if orjson:
119
            return orjson.loads(data)
120
        return json.loads(data.decode(encoding="utf-8"))
121
122
    def from_str(self: Self, data: str) -> Any:
123
        if orjson:
124
            return orjson.loads(data)
125
        json.loads(data)
126
127
128
@dataclass(slots=True, frozen=True)
129
class JsonUtils:
130
    def misc_types_default(self: Self) -> Callable[[Any], Any]:
131
        return _misc_types_default
132
133
    def new_default(
134
        self: Self,
135
        *fallbacks: Callable[[Any], Any] | None,
136
        first: Callable[[Any], Any] | None = _misc_types_default,
137
        last: Callable[[Any], Any] | None = str,
138
    ) -> Callable[[Any], Any]:
139
        """
140
        Creates a new method to be passed as `default=` to `orjson.dumps`.
141
        Tries, in order: :meth:`orjson_default`, `fallbacks`, then `str`.
142
143
        Args:
144
            first: Try this first
145
            fallbacks: Tries these, in order, after `first`, skipping any None
146
            last: Use this as the last resort; consider `str` or `repr`
147
        """
148
        then = [f for f in [first, *fallbacks] if f is not None]
149
150
        def _default(obj):
151
            for t in then:
152
                try:
153
                    return t(obj)
154
                except TypeError:
155
                    pass
156
                if last is None:
157
                    raise TypeError()
158
            return last(obj)
159
160
        _default.__name__ = f"default({', '.join([str(t) for t in then])})"
161
        return _default
162
163
    def decoder(self: Self) -> JsonDecoder:
164
        return JsonDecoder()
165
166
    def encoder(
167
        self: Self,
168
        *fallbacks: Callable[[Any], Any] | None,
169
        indent: bool = True,
170
        sort: bool = False,
171
        preserve_inf: bool = True,
172
        last: Callable[[Any], Any] | None = str,
173
    ) -> JsonEncoder:
174
        """
175
        Serializes to string with orjson, indenting and adding a trailing newline.
176
        Uses :meth:`orjson_default` to encode more types than orjson can.
177
178
        Args:
179
            indent: Indent by 2 spaces
180
            preserve_inf: Preserve infinite values with :meth:`preserve_inf`
181
            sort: Sort keys with `orjson.OPT_SORT_KEYS`;
182
                  only for :meth:`typeddfs.json_utils.JsonEncoder.as_str`
183
            last: Last resort option to encode a value
184
        """
185
        import orjson
186
187
        bytes_option = orjson.OPT_UTC_Z | orjson.OPT_NON_STR_KEYS
188
        str_option = orjson.OPT_UTC_Z
189
        if sort:
190
            bytes_option |= orjson.OPT_SORT_KEYS
191
            str_option |= orjson.OPT_SORT_KEYS
192
        if indent:
193
            str_option |= orjson.OPT_INDENT_2
194
        default = self.new_default(*fallbacks, first=_misc_types_default, last=last)
195
        prep = self.preserve_inf if preserve_inf else None
196
        return JsonEncoder(default=default, bytes_options=bytes_option, str_options=str_option, prep=prep)
197
198
    def preserve_inf(self: Self, data: Any) -> Any:
199
        """
200
        Recursively replaces infinite float and numpy values with strings.
201
        Orjson encodes NaN, inf, and +inf as JSON null.
202
        This function converts to string as needed to preserve infinite values.
203
        Any float scalar (`np.floating` and `float`) will be replaced with a string.
204
        Any `np.ndarray`, whether it contains an infinite value or not, will be converted
205
        to an ndarray of strings.
206
        The returned result may still not be serializable with orjson or :meth:`orjson_bytes`.
207
        Trying those methods is the best way to test for serializeablity.
208
        """
209
        # we go to great lengths to avoid importing numpy
210
        # no np.isinf, np.isneginf, or np.isnan allowed
211
        # we can use the fact that Numpy float types compare to float,
212
        # including to -inf and +inf, where all comparisons between Inf/-Inf and NaN are False
213
        # So our logic is is_infinite := (data > NEG_INF) != (data < INF)
214
        # Meanwhile, we only need to deal with floats:
215
        # - int and bool stay as-is
216
        # - str stays as-is
217
        # - complex gets converted to
218
        if isinstance(data, Mapping):
219
            return {str(k): self.preserve_inf(v) for k, v in data.items()}
220
        elif (
221
            (isinstance(data, Sequence) or type(data).__name__ == "ndarray")
222
            and not isinstance(data, str)
223
            and not isinstance(data, bytes | bytearray | memoryview)
224
        ):
225
            is_np_float_array = hasattr(data, "dtype") and str(data.dtype).startswith("dtype(float")
226
            if (
227
                is_np_float_array
228
                or all(isinstance(v, float) for v in data)
229
                and all((v > NEG_INF) != (v < INF) for v in data)
230
            ):
231
                # it's a list or array of floats containing -Inf or +Inf
232
                # ==> convert to str to preserve
233
                return [str(v) for v in data]
234
            elif is_np_float_array:
235
                # it's an array of other types, or of floats containing neither -Inf nor +Inf
236
                # ==> convert to list (faster than recursing)
237
                # noinspection PyUnresolvedReferences
238
                return data.tolist()
239
            else:
240
                # it's an array of other types, or of floats containing neither -Inf nor +Inf
241
                # ==> return float list as-is
242
                return data
243
        elif (isinstance(data, float) or (hasattr(data, "dtype") and str(data.dtype).startswith("dtype(float"))) and (
244
            data > NEG_INF
245
        ) != (data < INF):
246
            # single float value with -Inf or +Inf
247
            # ==> preserve inf
248
            return str(data)
249
        elif type(data).__name__ == "ndarray" and hasattr(data, "dtype"):
250
            # it's a non-float Numpy array
251
            # ==> convert to list
252
            return data.astype(str).tolist()
253
        return data
254
255
256
JsonTools = JsonUtils()
257