|
1
|
|
|
""" |
|
2
|
|
|
This module contains default deserializers. You can override the |
|
3
|
|
|
deserialization process of a particular type as follows: |
|
4
|
|
|
|
|
5
|
|
|
``jsons.set_deserializer(custom_deserializer, SomeClass)`` |
|
6
|
|
|
""" |
|
7
|
|
|
import inspect |
|
8
|
|
|
import re |
|
9
|
|
|
from datetime import datetime |
|
10
|
|
|
from enum import EnumMeta |
|
11
|
|
|
from typing import List, Callable, Union, Optional |
|
12
|
|
|
from jsons import _main_impl |
|
13
|
|
|
from jsons._common_impl import get_class_name |
|
14
|
|
|
from jsons._datetime_impl import get_datetime_inst |
|
15
|
|
|
from jsons._main_impl import ( |
|
16
|
|
|
RFC3339_DATETIME_PATTERN, |
|
17
|
|
|
snakecase, |
|
18
|
|
|
camelcase, |
|
19
|
|
|
pascalcase, |
|
20
|
|
|
lispcase |
|
21
|
|
|
) |
|
22
|
|
|
from jsons.exceptions import ( |
|
23
|
|
|
UnfulfilledArgumentError, |
|
24
|
|
|
SignatureMismatchError, |
|
25
|
|
|
JsonsError, |
|
26
|
|
|
DeserializationError |
|
27
|
|
|
) |
|
28
|
|
|
|
|
29
|
|
|
|
|
30
|
|
|
def default_datetime_deserializer(obj: str, |
|
31
|
|
|
cls: type = datetime, |
|
32
|
|
|
**kwargs) -> datetime: |
|
33
|
|
|
""" |
|
34
|
|
|
Deserialize a string with an RFC3339 pattern to a datetime instance. |
|
35
|
|
|
:param obj: the string that is to be deserialized. |
|
36
|
|
|
:param cls: not used. |
|
37
|
|
|
:param kwargs: not used. |
|
38
|
|
|
:return: a ``datetime.datetime`` instance. |
|
39
|
|
|
""" |
|
40
|
|
|
pattern = RFC3339_DATETIME_PATTERN |
|
41
|
|
|
if '.' in obj: |
|
42
|
|
|
pattern += '.%f' |
|
43
|
|
|
# strptime allows a fraction of length 6, so trip the rest (if exists). |
|
44
|
|
|
regex_pattern = re.compile(r'(\.[0-9]+)') |
|
45
|
|
|
frac = regex_pattern.search(obj).group() |
|
46
|
|
|
obj = obj.replace(frac, frac[0:7]) |
|
47
|
|
|
return get_datetime_inst(obj, pattern) |
|
48
|
|
|
|
|
49
|
|
|
|
|
50
|
|
|
def default_list_deserializer(obj: list, cls: type = None, **kwargs) -> list: |
|
51
|
|
|
""" |
|
52
|
|
|
Deserialize a list by deserializing all items of that list. |
|
53
|
|
|
:param obj: the list that needs deserializing. |
|
54
|
|
|
:param cls: the type optionally with a generic (e.g. List[str]). |
|
55
|
|
|
:param kwargs: any keyword arguments. |
|
56
|
|
|
:return: a deserialized list instance. |
|
57
|
|
|
""" |
|
58
|
|
|
cls_ = None |
|
59
|
|
|
if cls and hasattr(cls, '__args__'): |
|
60
|
|
|
cls_ = cls.__args__[0] |
|
61
|
|
|
return [_main_impl.load(x, cls_, **kwargs) for x in obj] |
|
62
|
|
|
|
|
63
|
|
|
|
|
64
|
|
|
def default_tuple_deserializer(obj: list, |
|
65
|
|
|
cls: type = None, |
|
66
|
|
|
**kwargs) -> object: |
|
67
|
|
|
""" |
|
68
|
|
|
Deserialize a (JSON) list into a tuple by deserializing all items of that |
|
69
|
|
|
list. |
|
70
|
|
|
:param obj: the tuple that needs deserializing. |
|
71
|
|
|
:param cls: the type optionally with a generic (e.g. Tuple[str, int]). |
|
72
|
|
|
:param kwargs: any keyword arguments. |
|
73
|
|
|
:return: a deserialized tuple instance. |
|
74
|
|
|
""" |
|
75
|
|
|
if hasattr(cls, '_fields'): |
|
76
|
|
|
return default_namedtuple_deserializer(obj, cls, **kwargs) |
|
77
|
|
|
tuple_types = getattr(cls, '__tuple_params__', cls.__args__) |
|
78
|
|
|
if len(tuple_types) > 1 and tuple_types[1] is ...: |
|
79
|
|
|
tuple_types = [tuple_types[0]] * len(obj) |
|
80
|
|
|
list_ = [_main_impl.load(value, tuple_types[i], **kwargs) |
|
81
|
|
|
for i, value in enumerate(obj)] |
|
82
|
|
|
return tuple(list_) |
|
83
|
|
|
|
|
84
|
|
|
|
|
85
|
|
|
def default_namedtuple_deserializer(obj: list, cls: type, **kwargs) -> object: |
|
86
|
|
|
""" |
|
87
|
|
|
Deserialize a (JSON) list into a named tuple by deserializing all items of |
|
88
|
|
|
that list. |
|
89
|
|
|
:param obj: the tuple that needs deserializing. |
|
90
|
|
|
:param cls: the NamedTuple. |
|
91
|
|
|
:param kwargs: any keyword arguments. |
|
92
|
|
|
:return: a deserialized named tuple (i.e. an instance of a class). |
|
93
|
|
|
""" |
|
94
|
|
|
args = [] |
|
95
|
|
|
for index, field_name in enumerate(cls._fields): |
|
96
|
|
|
if index < len(obj): |
|
97
|
|
|
field = obj[index] |
|
98
|
|
|
else: |
|
99
|
|
|
field = cls._field_defaults.get(field_name, None) |
|
100
|
|
|
if not field: |
|
101
|
|
|
msg = ('No value present in {} for argument "{}"' |
|
102
|
|
|
.format(obj, field_name)) |
|
103
|
|
|
raise UnfulfilledArgumentError(msg, field_name, obj, cls) |
|
104
|
|
|
cls_ = cls._field_types.get(field_name, None) |
|
105
|
|
|
loaded_field = _main_impl.load(field, cls_, **kwargs) |
|
106
|
|
|
args.append(loaded_field) |
|
107
|
|
|
inst = cls(*args) |
|
108
|
|
|
return inst |
|
109
|
|
|
|
|
110
|
|
|
|
|
111
|
|
|
def default_union_deserializer(obj: object, cls: Union, **kwargs) -> object: |
|
112
|
|
|
""" |
|
113
|
|
|
Deserialize an object to any matching type of the given union. The first |
|
114
|
|
|
successful deserialization is returned. |
|
115
|
|
|
:param obj: The object that needs deserializing. |
|
116
|
|
|
:param cls: The Union type with a generic (e.g. Union[str, int]). |
|
117
|
|
|
:param kwargs: Any keyword arguments that are passed through the |
|
118
|
|
|
deserialization process. |
|
119
|
|
|
:return: An object of the first type of the Union that could be |
|
120
|
|
|
deserialized successfully. |
|
121
|
|
|
""" |
|
122
|
|
|
for sub_type in cls.__args__: |
|
123
|
|
|
try: |
|
124
|
|
|
return _main_impl.load(obj, sub_type, **kwargs) |
|
125
|
|
|
except JsonsError: |
|
126
|
|
|
pass # Try the next one. |
|
127
|
|
|
else: |
|
128
|
|
|
args_msg = ', '.join([get_class_name(cls_) for cls_ in cls.__args__]) |
|
129
|
|
|
err_msg = ('Could not match the object of type "{}" to any type of ' |
|
130
|
|
|
'the Union: {}'.format(str(cls), args_msg)) |
|
131
|
|
|
raise DeserializationError(err_msg, obj, cls) |
|
132
|
|
|
|
|
133
|
|
|
|
|
134
|
|
|
def default_set_deserializer(obj: list, cls: type, **kwargs) -> set: |
|
135
|
|
|
""" |
|
136
|
|
|
Deserialize a (JSON) list into a set by deserializing all items of that |
|
137
|
|
|
list. If the list as a generic type (e.g. Set[datetime]) then it is |
|
138
|
|
|
assumed that all elements can be deserialized to that type. |
|
139
|
|
|
:param obj: the list that needs deserializing. |
|
140
|
|
|
:param cls: the type, optionally with a generic (e.g. Set[str]). |
|
141
|
|
|
:param kwargs: any keyword arguments. |
|
142
|
|
|
:return: a deserialized set instance. |
|
143
|
|
|
""" |
|
144
|
|
|
cls_ = list |
|
145
|
|
|
if hasattr(cls, '__args__'): |
|
146
|
|
|
cls_ = List[cls.__args__[0]] |
|
147
|
|
|
list_ = default_list_deserializer(obj, cls_, **kwargs) |
|
148
|
|
|
return set(list_) |
|
149
|
|
|
|
|
150
|
|
|
|
|
151
|
|
|
def default_dict_deserializer( |
|
152
|
|
|
obj: dict, |
|
153
|
|
|
cls: type, |
|
154
|
|
|
key_transformer: Optional[Callable[[str], str]] = None, |
|
155
|
|
|
**kwargs) -> dict: |
|
156
|
|
|
""" |
|
157
|
|
|
Deserialize a dict by deserializing all instances of that dict. |
|
158
|
|
|
:param obj: the dict that needs deserializing. |
|
159
|
|
|
:param key_transformer: a function that transforms the keys to a different |
|
160
|
|
|
style (e.g. PascalCase). |
|
161
|
|
|
:param cls: not used. |
|
162
|
|
|
:param kwargs: any keyword arguments. |
|
163
|
|
|
:return: a deserialized dict instance. |
|
164
|
|
|
""" |
|
165
|
|
|
key_transformer = key_transformer or (lambda key: key) |
|
166
|
|
|
kwargs_ = {**{'key_transformer': key_transformer}, **kwargs} |
|
167
|
|
|
if hasattr(cls, '__args__') and len(cls.__args__) > 1: |
|
168
|
|
|
sub_cls = cls.__args__[1] |
|
169
|
|
|
kwargs_['cls'] = sub_cls |
|
170
|
|
|
return {key_transformer(key): _main_impl.load(obj[key], **kwargs_) |
|
171
|
|
|
for key in obj} |
|
172
|
|
|
|
|
173
|
|
|
|
|
174
|
|
|
def default_enum_deserializer(obj: str, |
|
175
|
|
|
cls: EnumMeta, |
|
176
|
|
|
use_enum_name: bool = True, |
|
177
|
|
|
**kwargs) -> object: |
|
178
|
|
|
""" |
|
179
|
|
|
Deserialize an enum value to an enum instance. The serialized value must |
|
180
|
|
|
can be the name of the enum element or the value; dependent on |
|
181
|
|
|
``use_enum_name``. |
|
182
|
|
|
:param obj: the serialized enum. |
|
183
|
|
|
:param cls: the enum class. |
|
184
|
|
|
:param use_enum_name: determines whether the name or the value of an enum |
|
185
|
|
|
element should be used. |
|
186
|
|
|
:param kwargs: not used. |
|
187
|
|
|
:return: the corresponding enum element instance. |
|
188
|
|
|
""" |
|
189
|
|
|
if use_enum_name: |
|
190
|
|
|
result = cls[obj] |
|
191
|
|
|
else: |
|
192
|
|
|
for elem in cls: |
|
193
|
|
|
if elem.value == obj: |
|
194
|
|
|
result = elem |
|
195
|
|
|
break |
|
196
|
|
|
return result |
|
|
|
|
|
|
197
|
|
|
|
|
198
|
|
|
|
|
199
|
|
|
def default_string_deserializer(obj: str, |
|
200
|
|
|
cls: Optional[type] = None, |
|
201
|
|
|
**kwargs) -> object: |
|
202
|
|
|
""" |
|
203
|
|
|
Deserialize a string. If the given ``obj`` can be parsed to a date, a |
|
204
|
|
|
``datetime`` instance is returned. |
|
205
|
|
|
:param obj: the string that is to be deserialized. |
|
206
|
|
|
:param cls: not used. |
|
207
|
|
|
:param kwargs: any keyword arguments. |
|
208
|
|
|
:return: the deserialized obj. |
|
209
|
|
|
""" |
|
210
|
|
|
try: |
|
211
|
|
|
# Use load instead of default_datetime_deserializer to allow the |
|
212
|
|
|
# datetime deserializer to be overridden. |
|
213
|
|
|
return _main_impl.load(obj, datetime, **kwargs) |
|
214
|
|
|
except: |
|
215
|
|
|
return obj |
|
216
|
|
|
|
|
217
|
|
|
|
|
218
|
|
|
def default_primitive_deserializer(obj: object, |
|
219
|
|
|
cls: Optional[type] = None, |
|
220
|
|
|
**kwargs) -> object: |
|
221
|
|
|
""" |
|
222
|
|
|
Deserialize a primitive: it simply returns the given primitive. |
|
223
|
|
|
:param obj: the value that is to be deserialized. |
|
224
|
|
|
:param cls: not used. |
|
225
|
|
|
:param kwargs: not used. |
|
226
|
|
|
:return: ``obj``. |
|
227
|
|
|
""" |
|
228
|
|
|
return obj |
|
229
|
|
|
|
|
230
|
|
|
|
|
231
|
|
|
def default_object_deserializer( |
|
232
|
|
|
obj: dict, |
|
233
|
|
|
cls: type, |
|
234
|
|
|
key_transformer: Optional[Callable[[str], str]] = None, |
|
235
|
|
|
strict: bool = False, |
|
236
|
|
|
**kwargs) -> object: |
|
237
|
|
|
""" |
|
238
|
|
|
Deserialize ``obj`` into an instance of type ``cls``. If ``obj`` contains |
|
239
|
|
|
keys with a certain case style (e.g. camelCase) that do not match the style |
|
240
|
|
|
of ``cls`` (e.g. snake_case), a key_transformer should be used (e.g. |
|
241
|
|
|
KEY_TRANSFORMER_SNAKECASE). |
|
242
|
|
|
:param obj: a serialized instance of ``cls``. |
|
243
|
|
|
:param cls: the type to which ``obj`` should be deserialized. |
|
244
|
|
|
:param key_transformer: a function that transforms the keys in order to |
|
245
|
|
|
match the attribute names of ``cls``. |
|
246
|
|
|
:param strict: deserialize in strict mode. |
|
247
|
|
|
:param kwargs: any keyword arguments that may be passed to the |
|
248
|
|
|
deserializers. |
|
249
|
|
|
:return: an instance of type ``cls``. |
|
250
|
|
|
""" |
|
251
|
|
|
concat_kwargs = kwargs |
|
252
|
|
|
if key_transformer: |
|
253
|
|
|
obj = {key_transformer(key): obj[key] for key in obj} |
|
254
|
|
|
concat_kwargs = { |
|
255
|
|
|
**kwargs, |
|
256
|
|
|
'key_transformer': key_transformer |
|
257
|
|
|
} |
|
258
|
|
|
concat_kwargs['strict'] = strict |
|
259
|
|
|
signature_parameters = inspect.signature(cls.__init__).parameters |
|
260
|
|
|
constructor_args, getters = _get_constructor_args(obj, |
|
261
|
|
|
cls, |
|
262
|
|
|
signature_parameters, |
|
263
|
|
|
**concat_kwargs) |
|
264
|
|
|
remaining_attrs = {attr_name: obj[attr_name] for attr_name in obj |
|
265
|
|
|
if attr_name not in constructor_args} |
|
266
|
|
|
if strict and remaining_attrs: |
|
267
|
|
|
unexpected_arg = list(remaining_attrs.keys())[0] |
|
268
|
|
|
err_msg = 'Type "{}" does not expect "{}"'.format(get_class_name(cls), |
|
269
|
|
|
unexpected_arg) |
|
270
|
|
|
raise SignatureMismatchError(err_msg, unexpected_arg, obj, cls) |
|
271
|
|
|
instance = cls(**constructor_args) |
|
272
|
|
|
_set_remaining_attrs(instance, remaining_attrs, **kwargs) |
|
273
|
|
|
return instance |
|
274
|
|
|
|
|
275
|
|
|
|
|
276
|
|
|
def _get_constructor_args(obj, |
|
277
|
|
|
cls, |
|
278
|
|
|
signature_parameters, |
|
279
|
|
|
attr_getters=None, |
|
280
|
|
|
**kwargs): |
|
281
|
|
|
# Loop through the signature of cls: the type we try to deserialize to. For |
|
282
|
|
|
# every required parameter, we try to get the corresponding value from |
|
283
|
|
|
# json_obj. |
|
284
|
|
|
attr_getters = dict(**(attr_getters or {})) |
|
285
|
|
|
constructor_args_in_obj = dict() |
|
286
|
|
|
signatures = ((sig_key, sig) for sig_key, sig in |
|
|
|
|
|
|
287
|
|
|
signature_parameters.items() if sig_key != 'self') |
|
288
|
|
|
for sig_key, sig in signatures: |
|
289
|
|
|
if obj and sig_key in obj: |
|
290
|
|
|
# This argument is in obj. |
|
291
|
|
|
arg_cls = None |
|
292
|
|
|
if sig.annotation != inspect.Parameter.empty: |
|
293
|
|
|
arg_cls = sig.annotation |
|
294
|
|
|
value = _main_impl.load(obj[sig_key], arg_cls, **kwargs) |
|
295
|
|
|
constructor_args_in_obj[sig_key] = value |
|
296
|
|
|
elif sig_key in attr_getters: |
|
297
|
|
|
# There exists an attr_getter for this argument. |
|
298
|
|
|
attr_getter = attr_getters.pop(sig_key) |
|
299
|
|
|
constructor_args_in_obj[sig_key] = attr_getter() |
|
300
|
|
|
elif sig.default != inspect.Parameter.empty: |
|
301
|
|
|
# There is a default value for this argument. |
|
302
|
|
|
constructor_args_in_obj[sig_key] = sig.default |
|
303
|
|
|
elif sig.kind not in (inspect.Parameter.VAR_POSITIONAL, |
|
304
|
|
|
inspect.Parameter.VAR_KEYWORD): |
|
305
|
|
|
# This argument is no *args or **kwargs and has no value. |
|
306
|
|
|
raise UnfulfilledArgumentError( |
|
307
|
|
|
'No value found for "{}"'.format(sig_key), |
|
308
|
|
|
sig_key, obj, cls) |
|
309
|
|
|
|
|
310
|
|
|
return constructor_args_in_obj, attr_getters |
|
311
|
|
|
|
|
312
|
|
|
|
|
313
|
|
|
def _set_remaining_attrs(instance, |
|
314
|
|
|
remaining_attrs, |
|
315
|
|
|
attr_getters=None, |
|
316
|
|
|
**kwargs): |
|
317
|
|
|
# Set any remaining attributes on the newly created instance. |
|
318
|
|
|
attr_getters = attr_getters or {} |
|
319
|
|
|
for attr_name in remaining_attrs: |
|
320
|
|
|
loaded_attr = _main_impl.load(remaining_attrs[attr_name], |
|
321
|
|
|
type(remaining_attrs[attr_name]), |
|
322
|
|
|
**kwargs) |
|
323
|
|
|
try: |
|
324
|
|
|
setattr(instance, attr_name, loaded_attr) |
|
325
|
|
|
except AttributeError: |
|
326
|
|
|
pass # This is raised when a @property does not have a setter. |
|
327
|
|
|
for attr_name, getter in attr_getters.items(): |
|
328
|
|
|
setattr(instance, attr_name, getter()) |
|
329
|
|
|
|
|
330
|
|
|
|
|
331
|
|
|
# The following default key transformers can be used with the |
|
332
|
|
|
# default_object_serializer. |
|
333
|
|
|
KEY_TRANSFORMER_SNAKECASE = snakecase |
|
334
|
|
|
KEY_TRANSFORMER_CAMELCASE = camelcase |
|
335
|
|
|
KEY_TRANSFORMER_PASCALCASE = pascalcase |
|
336
|
|
|
KEY_TRANSFORMER_LISPCASE = lispcase |
|
337
|
|
|
|