1
|
|
|
from typing import Callable, Optional, Union |
2
|
|
|
|
3
|
|
|
from typish import get_args |
4
|
|
|
|
5
|
|
|
from jsons._common_impl import NoneType |
6
|
|
|
from jsons._compatibility_impl import (get_type_hints, get_union_params, |
7
|
|
|
tuple_with_ellipsis) |
8
|
|
|
from jsons._load_impl import load |
9
|
|
|
from jsons.exceptions import UnfulfilledArgumentError |
10
|
|
|
|
11
|
|
|
|
12
|
|
|
def default_tuple_deserializer(obj: list, |
13
|
|
|
cls: type = None, |
14
|
|
|
*, |
15
|
|
|
key_transformer: Optional[Callable[[str], str]] = None, |
16
|
|
|
**kwargs) -> object: |
17
|
|
|
""" |
18
|
|
|
Deserialize a (JSON) list into a tuple by deserializing all items of that |
19
|
|
|
list. |
20
|
|
|
:param obj: the tuple that needs deserializing. |
21
|
|
|
:param cls: the type optionally with a generic (e.g. Tuple[str, int]). |
22
|
|
|
:param kwargs: any keyword arguments. |
23
|
|
|
:return: a deserialized tuple instance. |
24
|
|
|
""" |
25
|
|
|
if hasattr(cls, '_fields'): |
26
|
|
|
return default_namedtuple_deserializer(obj, cls, key_transformer=key_transformer, **kwargs) |
27
|
|
|
cls_args = get_args(cls) |
28
|
|
|
if cls_args: |
29
|
|
|
tuple_types = getattr(cls, '__tuple_params__', cls_args) |
30
|
|
|
if tuple_with_ellipsis(cls): |
31
|
|
|
tuple_types = [tuple_types[0]] * len(obj) |
32
|
|
|
list_ = [load(value, tuple_types[i], **kwargs) |
33
|
|
|
for i, value in enumerate(obj)] |
34
|
|
|
else: |
35
|
|
|
list_ = [load(value, **kwargs) for i, value in enumerate(obj)] |
36
|
|
|
return tuple(list_) |
37
|
|
|
|
38
|
|
|
|
39
|
|
|
def default_namedtuple_deserializer( |
40
|
|
|
obj: Union[list, dict], |
41
|
|
|
cls: type, |
42
|
|
|
*, |
43
|
|
|
key_transformer: Optional[Callable[[str], str]] = None, |
44
|
|
|
**kwargs) -> object: |
45
|
|
|
""" |
46
|
|
|
Deserialize a (JSON) list or dict into a named tuple by deserializing all |
47
|
|
|
items of that list/dict. |
48
|
|
|
:param obj: the tuple that needs deserializing. |
49
|
|
|
:param cls: the NamedTuple. |
50
|
|
|
:param kwargs: any keyword arguments. |
51
|
|
|
:return: a deserialized named tuple (i.e. an instance of a class). |
52
|
|
|
""" |
53
|
|
|
is_dict = isinstance(obj, dict) |
54
|
|
|
key_tfr = key_transformer or (lambda key: key) |
55
|
|
|
|
56
|
|
|
if is_dict: |
57
|
|
|
tfm_obj = {key_tfr(k): v for k, v in obj.items()} |
58
|
|
|
|
59
|
|
|
args = [] |
60
|
|
|
for index, field_name in enumerate(cls._fields): |
61
|
|
|
if index < len(obj): |
62
|
|
|
if is_dict: |
63
|
|
|
field = tfm_obj[field_name] |
|
|
|
|
64
|
|
|
else: |
65
|
|
|
field = obj[index] |
66
|
|
|
else: |
67
|
|
|
field = cls._field_defaults.get(field_name, None) |
68
|
|
|
|
69
|
|
|
# _field_types has been deprecated in favor of __annotations__ in Python 3.8 |
70
|
|
|
if hasattr(cls, '__annotations__'): |
71
|
|
|
# It is important to use get_type_hints so that forward references get resolved, |
72
|
|
|
# rather than access __annotations__ directly |
73
|
|
|
field_types = get_type_hints(cls) |
74
|
|
|
else: |
75
|
|
|
field_types = getattr(cls, '_field_types', {}) |
76
|
|
|
|
77
|
|
|
if field is None: |
78
|
|
|
hint = field_types.get(field_name) |
79
|
|
|
if NoneType not in (get_union_params(hint) or []): |
80
|
|
|
# The value 'None' is not permitted here. |
81
|
|
|
msg = ('No value present in {} for argument "{}"' |
82
|
|
|
.format(obj, field_name)) |
83
|
|
|
raise UnfulfilledArgumentError(msg, field_name, obj, cls) |
84
|
|
|
cls_ = field_types.get(field_name) if field_types else None |
85
|
|
|
loaded_field = load(field, cls_, key_transformer=key_transformer, **kwargs) |
86
|
|
|
args.append(loaded_field) |
87
|
|
|
inst = cls(*args) |
88
|
|
|
return inst |
89
|
|
|
|