Completed
Pull Request — master (#17)
by Ramon
01:13
created

default_namedtuple_deserializer()   A

Complexity

Conditions 4

Size

Total Lines 24
Code Lines 15

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 4
eloc 15
nop 3
dl 0
loc 24
rs 9.65
c 0
b 0
f 0
1
"""
2
This module contains default deserializers. You can override the
3
deserialization process of a particular type as follows:
4
5
``jsons.set_deserializer(custom_deserializer, SomeClass)``
6
"""
7
import inspect
8
import re
9
from datetime import datetime
10
from enum import EnumMeta
11
from typing import List, Callable, Union, Optional
12
from jsons import _main_impl
13
from jsons._common_impl import get_class_name
14
from jsons._datetime_impl import get_datetime_inst
15
from jsons._main_impl import (
16
    RFC3339_DATETIME_PATTERN,
17
    snakecase,
18
    camelcase,
19
    pascalcase,
20
    lispcase
21
)
22
from jsons.exceptions import (
23
    UnfulfilledArgumentError,
24
    SignatureMismatchError,
25
    JsonsError,
26
    DeserializationError
27
)
28
29
30
def default_datetime_deserializer(obj: str,
31
                                  cls: type = datetime,
32
                                  **kwargs) -> datetime:
33
    """
34
    Deserialize a string with an RFC3339 pattern to a datetime instance.
35
    :param obj: the string that is to be deserialized.
36
    :param cls: not used.
37
    :param kwargs: not used.
38
    :return: a ``datetime.datetime`` instance.
39
    """
40
    pattern = RFC3339_DATETIME_PATTERN
41
    if '.' in obj:
42
        pattern += '.%f'
43
        # strptime allows a fraction of length 6, so trip the rest (if exists).
44
        regex_pattern = re.compile(r'(\.[0-9]+)')
45
        frac = regex_pattern.search(obj).group()
46
        obj = obj.replace(frac, frac[0:7])
47
    return get_datetime_inst(obj, pattern)
48
49
50
def default_list_deserializer(obj: list, cls: type = None, **kwargs) -> list:
51
    """
52
    Deserialize a list by deserializing all items of that list.
53
    :param obj: the list that needs deserializing.
54
    :param cls: the type optionally with a generic (e.g. List[str]).
55
    :param kwargs: any keyword arguments.
56
    :return: a deserialized list instance.
57
    """
58
    cls_ = None
59
    if cls and hasattr(cls, '__args__'):
60
        cls_ = cls.__args__[0]
61
    return [_main_impl.load(x, cls_, **kwargs) for x in obj]
62
63
64
def default_tuple_deserializer(obj: list,
65
                               cls: type = None,
66
                               **kwargs) -> object:
67
    """
68
    Deserialize a (JSON) list into a tuple by deserializing all items of that
69
    list.
70
    :param obj: the tuple that needs deserializing.
71
    :param cls: the type optionally with a generic (e.g. Tuple[str, int]).
72
    :param kwargs: any keyword arguments.
73
    :return: a deserialized tuple instance.
74
    """
75
    if hasattr(cls, '_fields'):
76
        return default_namedtuple_deserializer(obj, cls, **kwargs)
77
    tuple_types = getattr(cls, '__tuple_params__', cls.__args__)
78
    if len(tuple_types) > 1 and tuple_types[1] is ...:
79
        tuple_types = [tuple_types[0]] * len(obj)
80
    list_ = [_main_impl.load(value, tuple_types[i], **kwargs)
81
             for i, value in enumerate(obj)]
82
    return tuple(list_)
83
84
85
def default_namedtuple_deserializer(obj: list, cls: type, **kwargs) -> object:
86
    """
87
    Deserialize a (JSON) list into a named tuple by deserializing all items of
88
    that list.
89
    :param obj: the tuple that needs deserializing.
90
    :param cls: the NamedTuple.
91
    :param kwargs: any keyword arguments.
92
    :return: a deserialized named tuple (i.e. an instance of a class).
93
    """
94
    args = []
95
    for index, field_name in enumerate(cls._fields):
96
        if index < len(obj):
97
            field = obj[index]
98
        else:
99
            field = cls._field_defaults.get(field_name, None)
100
        if not field:
101
            msg = ('No value present in {} for argument "{}"'
102
                   .format(obj, field_name))
103
            raise UnfulfilledArgumentError(msg, field_name, obj, cls)
104
        cls_ = cls._field_types.get(field_name, None)
105
        loaded_field = _main_impl.load(field, cls_, **kwargs)
106
        args.append(loaded_field)
107
    inst = cls(*args)
108
    return inst
109
110
111
def default_union_deserializer(obj: object, cls: Union, **kwargs) -> object:
112
    """
113
    Deserialize an object to any matching type of the given union. The first
114
    successful deserialization is returned.
115
    :param obj: The object that needs deserializing.
116
    :param cls: The Union type with a generic (e.g. Union[str, int]).
117
    :param kwargs: Any keyword arguments that are passed through the
118
    deserialization process.
119
    :return: An object of the first type of the Union that could be
120
    deserialized successfully.
121
    """
122
    for sub_type in cls.__args__:
123
        try:
124
            return _main_impl.load(obj, sub_type, **kwargs)
125
        except JsonsError:
126
            pass  # Try the next one.
127
    else:
128
        args_msg = ', '.join([get_class_name(cls_) for cls_ in cls.__args__])
129
        err_msg = ('Could not match the object of type "{}" to any type of '
130
                   'the Union: {}'.format(str(cls), args_msg))
131
        raise DeserializationError(err_msg, obj, cls)
132
133
134
def default_set_deserializer(obj: list, cls: type, **kwargs) -> set:
135
    """
136
    Deserialize a (JSON) list into a set by deserializing all items of that
137
    list. If the list as a generic type (e.g. Set[datetime]) then it is
138
    assumed that all elements can be deserialized to that type.
139
    :param obj: the list that needs deserializing.
140
    :param cls: the type, optionally with a generic (e.g. Set[str]).
141
    :param kwargs: any keyword arguments.
142
    :return: a deserialized set instance.
143
    """
144
    cls_ = list
145
    if hasattr(cls, '__args__'):
146
        cls_ = List[cls.__args__[0]]
147
    list_ = default_list_deserializer(obj, cls_, **kwargs)
148
    return set(list_)
149
150
151
def default_dict_deserializer(
152
        obj: dict,
153
        cls: type,
154
        key_transformer: Optional[Callable[[str], str]] = None,
155
        **kwargs) -> dict:
156
    """
157
    Deserialize a dict by deserializing all instances of that dict.
158
    :param obj: the dict that needs deserializing.
159
    :param key_transformer: a function that transforms the keys to a different
160
    style (e.g. PascalCase).
161
    :param cls: not used.
162
    :param kwargs: any keyword arguments.
163
    :return: a deserialized dict instance.
164
    """
165
    key_transformer = key_transformer or (lambda key: key)
166
    kwargs_ = {**{'key_transformer': key_transformer}, **kwargs}
167
    if hasattr(cls, '__args__') and len(cls.__args__) > 1:
168
        sub_cls = cls.__args__[1]
169
        kwargs_['cls'] = sub_cls
170
    return {key_transformer(key): _main_impl.load(obj[key], **kwargs_)
171
            for key in obj}
172
173
174
def default_enum_deserializer(obj: str,
175
                              cls: EnumMeta,
176
                              use_enum_name: bool = True,
177
                              **kwargs) -> object:
178
    """
179
    Deserialize an enum value to an enum instance. The serialized value must
180
    can be the name of the enum element or the value; dependent on
181
    ``use_enum_name``.
182
    :param obj: the serialized enum.
183
    :param cls: the enum class.
184
    :param use_enum_name: determines whether the name or the value of an enum
185
    element should be used.
186
    :param kwargs: not used.
187
    :return: the corresponding enum element instance.
188
    """
189
    if use_enum_name:
190
        result = cls[obj]
191
    else:
192
        for elem in cls:
193
            if elem.value == obj:
194
                result = elem
195
                break
196
    return result
0 ignored issues
show
introduced by
The variable result does not seem to be defined for all execution paths.
Loading history...
197
198
199
def default_string_deserializer(obj: str,
200
                                cls: Optional[type] = None,
201
                                **kwargs) -> object:
202
    """
203
    Deserialize a string. If the given ``obj`` can be parsed to a date, a
204
    ``datetime`` instance is returned.
205
    :param obj: the string that is to be deserialized.
206
    :param cls: not used.
207
    :param kwargs: any keyword arguments.
208
    :return: the deserialized obj.
209
    """
210
    try:
211
        # Use load instead of default_datetime_deserializer to allow the
212
        # datetime deserializer to be overridden.
213
        return _main_impl.load(obj, datetime, **kwargs)
214
    except:
215
        return obj
216
217
218
def default_primitive_deserializer(obj: object,
219
                                   cls: Optional[type] = None,
220
                                   **kwargs) -> object:
221
    """
222
    Deserialize a primitive: it simply returns the given primitive.
223
    :param obj: the value that is to be deserialized.
224
    :param cls: not used.
225
    :param kwargs: not used.
226
    :return: ``obj``.
227
    """
228
    return obj
229
230
231
def default_object_deserializer(
232
        obj: dict,
233
        cls: type,
234
        key_transformer: Optional[Callable[[str], str]] = None,
235
        strict: bool = False,
236
        **kwargs) -> object:
237
    """
238
    Deserialize ``obj`` into an instance of type ``cls``. If ``obj`` contains
239
    keys with a certain case style (e.g. camelCase) that do not match the style
240
    of ``cls`` (e.g. snake_case), a key_transformer should be used (e.g.
241
    KEY_TRANSFORMER_SNAKECASE).
242
    :param obj: a serialized instance of ``cls``.
243
    :param cls: the type to which ``obj`` should be deserialized.
244
    :param key_transformer: a function that transforms the keys in order to
245
    match the attribute names of ``cls``.
246
    :param strict: deserialize in strict mode.
247
    :param kwargs: any keyword arguments that may be passed to the
248
    deserializers.
249
    :return: an instance of type ``cls``.
250
    """
251
    concat_kwargs = kwargs
252
    if key_transformer:
253
        obj = {key_transformer(key): obj[key] for key in obj}
254
        concat_kwargs = {
255
            **kwargs,
256
            'key_transformer': key_transformer
257
        }
258
    concat_kwargs['strict'] = strict
259
    signature_parameters = inspect.signature(cls.__init__).parameters
260
    constructor_args, getters = _get_constructor_args(obj,
261
                                                      cls,
262
                                                      signature_parameters,
263
                                                      **concat_kwargs)
264
    remaining_attrs = {attr_name: obj[attr_name] for attr_name in obj
265
                       if attr_name not in constructor_args}
266
    if strict and remaining_attrs:
267
        unexpected_arg = list(remaining_attrs.keys())[0]
268
        err_msg = 'Type "{}" does not expect "{}"'.format(get_class_name(cls),
269
                                                          unexpected_arg)
270
        raise SignatureMismatchError(err_msg, unexpected_arg, obj, cls)
271
    instance = cls(**constructor_args)
272
    _set_remaining_attrs(instance, remaining_attrs, **kwargs)
273
    return instance
274
275
276
def _get_constructor_args(obj,
277
                          cls,
278
                          signature_parameters,
279
                          attr_getters=None,
280
                          **kwargs):
281
    # Loop through the signature of cls: the type we try to deserialize to. For
282
    # every required parameter, we try to get the corresponding value from
283
    # json_obj.
284
    attr_getters = dict(**(attr_getters or {}))
285
    constructor_args_in_obj = dict()
286
    signatures = ((sig_key, sig) for sig_key, sig in
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable sig does not seem to be defined.
Loading history...
Comprehensibility Best Practice introduced by
The variable sig_key does not seem to be defined.
Loading history...
287
                  signature_parameters.items() if sig_key != 'self')
288
    for sig_key, sig in signatures:
289
        if obj and sig_key in obj:
290
            # This argument is in obj.
291
            arg_cls = None
292
            if sig.annotation != inspect.Parameter.empty:
293
                arg_cls = sig.annotation
294
            value = _main_impl.load(obj[sig_key], arg_cls, **kwargs)
295
            constructor_args_in_obj[sig_key] = value
296
        elif sig_key in attr_getters:
297
            # There exists an attr_getter for this argument.
298
            attr_getter = attr_getters.pop(sig_key)
299
            constructor_args_in_obj[sig_key] = attr_getter()
300
        elif sig.default != inspect.Parameter.empty:
301
            # There is a default value for this argument.
302
            constructor_args_in_obj[sig_key] = sig.default
303
        elif sig.kind not in (inspect.Parameter.VAR_POSITIONAL,
304
                              inspect.Parameter.VAR_KEYWORD):
305
            # This argument is no *args or **kwargs and has no value.
306
            raise UnfulfilledArgumentError(
307
                'No value found for "{}"'.format(sig_key),
308
                sig_key, obj, cls)
309
310
    return constructor_args_in_obj, attr_getters
311
312
313
def _set_remaining_attrs(instance,
314
                         remaining_attrs,
315
                         attr_getters=None,
316
                         **kwargs):
317
    # Set any remaining attributes on the newly created instance.
318
    attr_getters = attr_getters or {}
319
    for attr_name in remaining_attrs:
320
        loaded_attr = _main_impl.load(remaining_attrs[attr_name],
321
                                      type(remaining_attrs[attr_name]),
322
                                      **kwargs)
323
        try:
324
            setattr(instance, attr_name, loaded_attr)
325
        except AttributeError:
326
            pass  # This is raised when a @property does not have a setter.
327
    for attr_name, getter in attr_getters.items():
328
        setattr(instance, attr_name, getter())
329
330
331
# The following default key transformers can be used with the
332
# default_object_serializer.
333
KEY_TRANSFORMER_SNAKECASE = snakecase
334
KEY_TRANSFORMER_CAMELCASE = camelcase
335
KEY_TRANSFORMER_PASCALCASE = pascalcase
336
KEY_TRANSFORMER_LISPCASE = lispcase
337