Passed
Pull Request — master (#17)
by Ramon
01:00
created

jsons.deserializers   B

Complexity

Total Complexity 44

Size/Duplication

Total Lines 329
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 178
dl 0
loc 329
rs 8.8798
c 0
b 0
f 0
wmc 44

14 Functions

Rating   Name   Duplication   Size   Complexity  
A default_list_deserializer() 0 12 3
A default_set_deserializer() 0 15 2
A default_object_deserializer() 0 39 4
A default_primitive_deserializer() 0 11 1
A default_dict_deserializer() 0 21 4
A _set_remaining_attrs() 0 16 4
A default_union_deserializer() 0 21 4
A default_namedtuple_deserializer() 0 24 4
B _get_value_for_attr() 0 23 7
A default_enum_deserializer() 0 16 2
A default_tuple_deserializer() 0 19 4
A default_string_deserializer() 0 17 2
A _get_constructor_args() 0 12 1
A default_datetime_deserializer() 0 18 2

How to fix   Complexity   

Complexity

Complex classes like jsons.deserializers often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""
2
This module contains default deserializers. You can override the
3
deserialization process of a particular type as follows:
4
5
``jsons.set_deserializer(custom_deserializer, SomeClass)``
6
"""
7
import inspect
8
import re
9
from datetime import datetime
10
from enum import EnumMeta
11
from functools import partial
12
from typing import List, Callable, Union, Optional
13
from jsons import _main_impl
14
from jsons._common_impl import get_class_name
15
from jsons._datetime_impl import get_datetime_inst
16
from jsons._main_impl import (
17
    RFC3339_DATETIME_PATTERN,
18
    snakecase,
19
    camelcase,
20
    pascalcase,
21
    lispcase
22
)
23
from jsons.exceptions import (
24
    UnfulfilledArgumentError,
25
    SignatureMismatchError,
26
    JsonsError,
27
    DeserializationError
28
)
29
30
31
def default_datetime_deserializer(obj: str,
32
                                  cls: type = datetime,
33
                                  **kwargs) -> datetime:
34
    """
35
    Deserialize a string with an RFC3339 pattern to a datetime instance.
36
    :param obj: the string that is to be deserialized.
37
    :param cls: not used.
38
    :param kwargs: not used.
39
    :return: a ``datetime.datetime`` instance.
40
    """
41
    pattern = RFC3339_DATETIME_PATTERN
42
    if '.' in obj:
43
        pattern += '.%f'
44
        # strptime allows a fraction of length 6, so trip the rest (if exists).
45
        regex_pattern = re.compile(r'(\.[0-9]+)')
46
        frac = regex_pattern.search(obj).group()
47
        obj = obj.replace(frac, frac[0:7])
48
    return get_datetime_inst(obj, pattern)
49
50
51
def default_list_deserializer(obj: list, cls: type = None, **kwargs) -> list:
52
    """
53
    Deserialize a list by deserializing all items of that list.
54
    :param obj: the list that needs deserializing.
55
    :param cls: the type optionally with a generic (e.g. List[str]).
56
    :param kwargs: any keyword arguments.
57
    :return: a deserialized list instance.
58
    """
59
    cls_ = None
60
    if cls and hasattr(cls, '__args__'):
61
        cls_ = cls.__args__[0]
62
    return [_main_impl.load(x, cls_, **kwargs) for x in obj]
63
64
65
def default_tuple_deserializer(obj: list,
66
                               cls: type = None,
67
                               **kwargs) -> object:
68
    """
69
    Deserialize a (JSON) list into a tuple by deserializing all items of that
70
    list.
71
    :param obj: the tuple that needs deserializing.
72
    :param cls: the type optionally with a generic (e.g. Tuple[str, int]).
73
    :param kwargs: any keyword arguments.
74
    :return: a deserialized tuple instance.
75
    """
76
    if hasattr(cls, '_fields'):
77
        return default_namedtuple_deserializer(obj, cls, **kwargs)
78
    tuple_types = getattr(cls, '__tuple_params__', cls.__args__)
79
    if len(tuple_types) > 1 and tuple_types[1] is ...:
80
        tuple_types = [tuple_types[0]] * len(obj)
81
    list_ = [_main_impl.load(value, tuple_types[i], **kwargs)
82
             for i, value in enumerate(obj)]
83
    return tuple(list_)
84
85
86
def default_namedtuple_deserializer(obj: list, cls: type, **kwargs) -> object:
87
    """
88
    Deserialize a (JSON) list into a named tuple by deserializing all items of
89
    that list.
90
    :param obj: the tuple that needs deserializing.
91
    :param cls: the NamedTuple.
92
    :param kwargs: any keyword arguments.
93
    :return: a deserialized named tuple (i.e. an instance of a class).
94
    """
95
    args = []
96
    for index, field_name in enumerate(cls._fields):
97
        if index < len(obj):
98
            field = obj[index]
99
        else:
100
            field = cls._field_defaults.get(field_name, None)
101
        if not field:
102
            msg = ('No value present in {} for argument "{}"'
103
                   .format(obj, field_name))
104
            raise UnfulfilledArgumentError(msg, field_name, obj, cls)
105
        cls_ = cls._field_types.get(field_name, None)
106
        loaded_field = _main_impl.load(field, cls_, **kwargs)
107
        args.append(loaded_field)
108
    inst = cls(*args)
109
    return inst
110
111
112
def default_union_deserializer(obj: object, cls: Union, **kwargs) -> object:
113
    """
114
    Deserialize an object to any matching type of the given union. The first
115
    successful deserialization is returned.
116
    :param obj: The object that needs deserializing.
117
    :param cls: The Union type with a generic (e.g. Union[str, int]).
118
    :param kwargs: Any keyword arguments that are passed through the
119
    deserialization process.
120
    :return: An object of the first type of the Union that could be
121
    deserialized successfully.
122
    """
123
    for sub_type in cls.__args__:
124
        try:
125
            return _main_impl.load(obj, sub_type, **kwargs)
126
        except JsonsError:
127
            pass  # Try the next one.
128
    else:
129
        args_msg = ', '.join([get_class_name(cls_) for cls_ in cls.__args__])
130
        err_msg = ('Could not match the object of type "{}" to any type of '
131
                   'the Union: {}'.format(str(cls), args_msg))
132
        raise DeserializationError(err_msg, obj, cls)
133
134
135
def default_set_deserializer(obj: list, cls: type, **kwargs) -> set:
136
    """
137
    Deserialize a (JSON) list into a set by deserializing all items of that
138
    list. If the list as a generic type (e.g. Set[datetime]) then it is
139
    assumed that all elements can be deserialized to that type.
140
    :param obj: the list that needs deserializing.
141
    :param cls: the type, optionally with a generic (e.g. Set[str]).
142
    :param kwargs: any keyword arguments.
143
    :return: a deserialized set instance.
144
    """
145
    cls_ = list
146
    if hasattr(cls, '__args__'):
147
        cls_ = List[cls.__args__[0]]
148
    list_ = default_list_deserializer(obj, cls_, **kwargs)
149
    return set(list_)
150
151
152
def default_dict_deserializer(
153
        obj: dict,
154
        cls: type,
155
        key_transformer: Optional[Callable[[str], str]] = None,
156
        **kwargs) -> dict:
157
    """
158
    Deserialize a dict by deserializing all instances of that dict.
159
    :param obj: the dict that needs deserializing.
160
    :param key_transformer: a function that transforms the keys to a different
161
    style (e.g. PascalCase).
162
    :param cls: not used.
163
    :param kwargs: any keyword arguments.
164
    :return: a deserialized dict instance.
165
    """
166
    key_transformer = key_transformer or (lambda key: key)
167
    kwargs_ = {**{'key_transformer': key_transformer}, **kwargs}
168
    if hasattr(cls, '__args__') and len(cls.__args__) > 1:
169
        sub_cls = cls.__args__[1]
170
        kwargs_['cls'] = sub_cls
171
    return {key_transformer(key): _main_impl.load(obj[key], **kwargs_)
172
            for key in obj}
173
174
175
def default_enum_deserializer(obj: str,
176
                              cls: EnumMeta,
177
                              use_enum_name: bool = True,
178
                              **kwargs) -> object:
179
    """
180
    Deserialize an enum value to an enum instance. The serialized value must
181
    can be the name of the enum element or the value; dependent on
182
    ``use_enum_name``.
183
    :param obj: the serialized enum.
184
    :param cls: the enum class.
185
    :param use_enum_name: determines whether the name or the value of an enum
186
    element should be used.
187
    :param kwargs: not used.
188
    :return: the corresponding enum element instance.
189
    """
190
    return cls[obj] if use_enum_name else cls(obj)
191
192
193
def default_string_deserializer(obj: str,
194
                                cls: Optional[type] = None,
195
                                **kwargs) -> object:
196
    """
197
    Deserialize a string. If the given ``obj`` can be parsed to a date, a
198
    ``datetime`` instance is returned.
199
    :param obj: the string that is to be deserialized.
200
    :param cls: not used.
201
    :param kwargs: any keyword arguments.
202
    :return: the deserialized obj.
203
    """
204
    try:
205
        # Use load instead of default_datetime_deserializer to allow the
206
        # datetime deserializer to be overridden.
207
        return _main_impl.load(obj, datetime, **kwargs)
208
    except:
209
        return obj
210
211
212
def default_primitive_deserializer(obj: object,
213
                                   cls: Optional[type] = None,
214
                                   **kwargs) -> object:
215
    """
216
    Deserialize a primitive: it simply returns the given primitive.
217
    :param obj: the value that is to be deserialized.
218
    :param cls: not used.
219
    :param kwargs: not used.
220
    :return: ``obj``.
221
    """
222
    return obj
223
224
225
def default_object_deserializer(
226
        obj: dict,
227
        cls: type,
228
        key_transformer: Optional[Callable[[str], str]] = None,
229
        strict: bool = False,
230
        **kwargs) -> object:
231
    """
232
    Deserialize ``obj`` into an instance of type ``cls``. If ``obj`` contains
233
    keys with a certain case style (e.g. camelCase) that do not match the style
234
    of ``cls`` (e.g. snake_case), a key_transformer should be used (e.g.
235
    KEY_TRANSFORMER_SNAKECASE).
236
    :param obj: a serialized instance of ``cls``.
237
    :param cls: the type to which ``obj`` should be deserialized.
238
    :param key_transformer: a function that transforms the keys in order to
239
    match the attribute names of ``cls``.
240
    :param strict: deserialize in strict mode.
241
    :param kwargs: any keyword arguments that may be passed to the
242
    deserializers.
243
    :return: an instance of type ``cls``.
244
    """
245
    concat_kwargs = kwargs
246
    if key_transformer:
247
        obj = {key_transformer(key): obj[key] for key in obj}
248
        concat_kwargs = {
249
            **kwargs,
250
            'key_transformer': key_transformer
251
        }
252
    concat_kwargs['strict'] = strict
253
    constructor_args = _get_constructor_args(obj, cls, **concat_kwargs)
254
    remaining_attrs = {attr_name: obj[attr_name] for attr_name in obj
255
                       if attr_name not in constructor_args}
256
    if strict and remaining_attrs:
257
        unexpected_arg = list(remaining_attrs.keys())[0]
258
        err_msg = 'Type "{}" does not expect "{}"'.format(get_class_name(cls),
259
                                                          unexpected_arg)
260
        raise SignatureMismatchError(err_msg, unexpected_arg, obj, cls)
261
    instance = cls(**constructor_args)
262
    _set_remaining_attrs(instance, remaining_attrs, **kwargs)
263
    return instance
264
265
266
def _get_constructor_args(obj, cls, attr_getters=None, **kwargs):
267
    # Loop through the signature of cls: the type we try to deserialize to. For
268
    # every required parameter, we try to get the corresponding value from
269
    # json_obj.
270
    signature_parameters = inspect.signature(cls.__init__).parameters
271
    attr_getters = dict(**(attr_getters or {}))
272
    value_for_attr_part = partial(_get_value_for_attr, obj=obj, cls=cls,
273
                                  attr_getters=attr_getters, **kwargs)
274
    args_gen = (value_for_attr_part(sig_key=sig_key, sig=sig) for sig_key, sig
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable sig_key does not seem to be defined.
Loading history...
Comprehensibility Best Practice introduced by
The variable sig does not seem to be defined.
Loading history...
275
                in signature_parameters.items() if sig_key != 'self')
276
    constructor_args_in_obj = {key: value for key, value in args_gen if key}
277
    return constructor_args_in_obj
278
279
280
def _get_value_for_attr(obj, cls, sig_key, sig, attr_getters, **kwargs):
281
    result = None, None
282
    if obj and sig_key in obj:
283
        # This argument is in obj.
284
        arg_cls = None
285
        if sig.annotation != inspect.Parameter.empty:
286
            arg_cls = sig.annotation
287
        value = _main_impl.load(obj[sig_key], arg_cls, **kwargs)
288
        result = sig_key, value
289
    elif sig_key in attr_getters:
290
        # There exists an attr_getter for this argument.
291
        attr_getter = attr_getters.pop(sig_key)
292
        result = sig_key, attr_getter()
293
    elif sig.default != inspect.Parameter.empty:
294
        # There is a default value for this argument.
295
        result = sig_key, sig.default
296
    elif sig.kind not in (inspect.Parameter.VAR_POSITIONAL,
297
                          inspect.Parameter.VAR_KEYWORD):
298
        # This argument is no *args or **kwargs and has no value.
299
        raise UnfulfilledArgumentError(
300
            'No value found for "{}"'.format(sig_key),
301
            sig_key, obj, cls)
302
    return result
303
304
305
def _set_remaining_attrs(instance,
306
                         remaining_attrs,
307
                         attr_getters=None,
308
                         **kwargs):
309
    # Set any remaining attributes on the newly created instance.
310
    attr_getters = attr_getters or {}
311
    for attr_name in remaining_attrs:
312
        loaded_attr = _main_impl.load(remaining_attrs[attr_name],
313
                                      type(remaining_attrs[attr_name]),
314
                                      **kwargs)
315
        try:
316
            setattr(instance, attr_name, loaded_attr)
317
        except AttributeError:
318
            pass  # This is raised when a @property does not have a setter.
319
    for attr_name, getter in attr_getters.items():
320
        setattr(instance, attr_name, getter())
321
322
323
# The following default key transformers can be used with the
324
# default_object_serializer.
325
KEY_TRANSFORMER_SNAKECASE = snakecase
326
KEY_TRANSFORMER_CAMELCASE = camelcase
327
KEY_TRANSFORMER_PASCALCASE = pascalcase
328
KEY_TRANSFORMER_LISPCASE = lispcase
329