Passed
Pull Request — master (#189)
by
unknown
01:45
created

jsons._lizers_impl._get_parents()   B

Complexity

Conditions 7

Size

Total Lines 20
Code Lines 13

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 13
dl 0
loc 20
rs 8
c 0
b 0
f 0
cc 7
nop 2
1
"""
2
PRIVATE MODULE: do not import (from) it directly.
3
4
This module contains functionality for setting and getting serializers and
5
deserializers.
6
"""
7
import types
8
from typing import Optional, Dict, Sequence, Union
9
10
from jsons._cache import cached
11
from jsons._common_impl import StateHolder, get_class_name
12
from jsons._compatibility_impl import get_naked_class
13
14
15
def set_serializer(
16
        func: callable,
17
        cls: Union[type, Sequence[type]],
18
        high_prio: bool = True,
19
        fork_inst: type = StateHolder) -> None:
20
    """
21
    Set a serializer function for the given type. You may override the default
22
    behavior of ``jsons.load`` by setting a custom serializer.
23
24
    The ``func`` argument must take one argument (i.e. the object that is to be
25
    serialized) and also a ``kwargs`` parameter. For example:
26
27
    >>> def func(obj, **kwargs):
28
    ...    return dict()
29
30
    You may ask additional arguments between ``cls`` and ``kwargs``.
31
32
    :param func: the serializer function.
33
    :param cls: the type or sequence of types this serializer can handle.
34
    :param high_prio: determines the order in which is looked for the callable.
35
    :param fork_inst: if given, it uses this fork of ``JsonSerializable``.
36
    :return: None.
37
    """
38
    if isinstance(cls, Sequence):
39
        for cls_ in cls:
40
            set_serializer(func, cls_, high_prio, fork_inst)
41
    elif cls:
42
        index = 0 if high_prio else len(fork_inst._classes_serializers)
43
        fork_inst._classes_serializers.insert(index, cls)
44
        cls_name = get_class_name(cls, fully_qualified=True)
45
        fork_inst._serializers[cls_name.lower()] = func
46
    else:
47
        fork_inst._serializers['nonetype'] = func
48
49
50
def set_deserializer(
51
        func: callable,
52
        cls: Union[type, Sequence[type]],
53
        high_prio: bool = True,
54
        fork_inst: type = StateHolder) -> None:
55
    """
56
    Set a deserializer function for the given type. You may override the
57
    default behavior of ``jsons.dump`` by setting a custom deserializer.
58
59
    The ``func`` argument must take two arguments (i.e. the dict containing the
60
    serialized values and the type that the values should be deserialized into)
61
    and also a ``kwargs`` parameter. For example:
62
63
    >>> def func(dict_, cls, **kwargs):
64
    ...    return cls()
65
66
    You may ask additional arguments between ``cls`` and ``kwargs``.
67
68
    :param func: the deserializer function.
69
    :param cls: the type or sequence of types this serializer can handle.
70
    :param high_prio: determines the order in which is looked for the callable.
71
    :param fork_inst: if given, it uses this fork of ``JsonSerializable``.
72
    :return: None.
73
    """
74
    if isinstance(cls, Sequence):
75
        for cls_ in cls:
76
            set_deserializer(func, cls_, high_prio, fork_inst)
77
    elif cls:
78
        index = 0 if high_prio else len(fork_inst._classes_deserializers)
79
        fork_inst._classes_deserializers.insert(index, cls)
80
        cls_name = get_class_name(cls, fully_qualified=True)
81
        fork_inst._deserializers[cls_name.lower()] = func
82
    else:
83
        fork_inst._deserializers['nonetype'] = func
84
85
86
@cached
87
def get_serializer(
88
        cls: type,
89
        fork_inst: Optional[type] = StateHolder) -> callable:
90
    """
91
    Return the serializer function that would be used for the given ``cls``.
92
    :param cls: the type for which a serializer is to be returned.
93
    :param fork_inst: if given, it uses this fork of ``JsonSerializable``.
94
    :return: a serializer function.
95
    """
96
    serializer = _get_lizer(cls, fork_inst._serializers,
97
                            fork_inst._classes_serializers, fork_inst)
98
    return serializer
99
100
101
@cached
102
def get_deserializer(
103
        cls: type,
104
        fork_inst: Optional[type] = StateHolder) -> callable:
105
    """
106
    Return the deserializer function that would be used for the given ``cls``.
107
    :param cls: the type for which a deserializer is to be returned.
108
    :param fork_inst: if given, it uses this fork of ``JsonSerializable``.
109
    :return: a deserializer function.
110
    """
111
    deserializer = _get_lizer(cls, fork_inst._deserializers,
112
                              fork_inst._classes_deserializers, fork_inst)
113
    return deserializer
114
115
116
def _get_lizer(
117
        cls: type,
118
        lizers: Dict[str, callable],
119
        classes_lizers: list,
120
        fork_inst: type,
121
        recursive: bool = False) -> callable:
122
    cls_name = get_class_name(cls, str.lower, fully_qualified=True)
123
    lizer = (lizers.get(cls_name, None)
124
             or _get_lizer_by_parents(cls, lizers, classes_lizers, fork_inst))
125
    if not lizer and not recursive and hasattr(cls, '__supertype__'):
126
        return _get_lizer(cls.__supertype__, lizers,
127
                          classes_lizers, fork_inst, True)
128
    return lizer
129
130
131
def _get_lizer_by_parents(
132
        cls: type,
133
        lizers: Dict[str, callable],
134
        classes_lizers: list,
135
        fork_inst: type) -> callable:
136
    result = None
137
    parents = _get_parents(cls, classes_lizers)
138
    if parents:
139
        pname = get_class_name(parents[0], str.lower, fully_qualified=True)
140
        result = lizers[pname]
141
    return result
142
143
144
def _get_parents(cls: type, lizers: list) -> list:
145
    """
146
    Return a list of serializers or deserializers that can handle a parent
147
    of ``cls``.
148
    :param cls: the type that
149
    :param lizers: a list of serializers or deserializers.
150
    :return: a list of serializers or deserializers.
151
    """
152
    parents = []
153
    naked_cls = get_naked_class(cls)
154
    for cls_ in lizers:
155
        try:
156
            if issubclass(naked_cls, cls_):
157
                parents.append(cls_)
158
        except (TypeError, AttributeError):
159
            pass  # Some types do not support `issubclass` (e.g. Union).
160
    if not parents and isinstance(naked_cls, types.UnionType) and \
161
            Union in lizers:
162
        parents = [Union]
163
    return parents
164