Completed
Push — master ( 5ea888...aa565f )
by Ramon
13s queued 11s
created

_add_dumped_elem()   A

Complexity

Conditions 4

Size

Total Lines 10
Code Lines 10

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 10
dl 0
loc 10
rs 9.9
c 0
b 0
f 0
cc 4
nop 5
1
from datetime import datetime, timezone
2
from inspect import isfunction
3
from typing import Optional, Callable, Union, MutableSequence, Tuple, Dict
4
5
from typish import get_type
6
7
from jsons._cache import cached
8
from jsons._common_impl import get_class_name, META_ATTR, StateHolder
9
from jsons._compatibility_impl import get_type_hints
10
from jsons._datetime_impl import to_str
11
from jsons._dump_impl import dump
12
from jsons.classes import JsonSerializable
13
from jsons.classes.verbosity import Verbosity
14
from jsons.exceptions import SerializationError, RecursionDetectedError
15
16
17
def default_object_serializer(
18
        obj: object,
19
        cls: Optional[type] = None,
20
        *,
21
        key_transformer: Optional[Callable[[str], str]] = None,
22
        strip_nulls: bool = False,
23
        strip_privates: bool = False,
24
        strip_properties: bool = False,
25
        strip_class_variables: bool = False,
26
        strip_attr: Union[str, MutableSequence[str], Tuple[str]] = None,
27
        verbose: Union[Verbosity, bool] = False,
28
        strict: bool = False,
29
        fork_inst: Optional[type] = StateHolder,
30
        **kwargs) -> Optional[dict]:
31
    """
32
    Serialize the given ``obj`` to a dict. All values within ``obj`` are also
33
    serialized. If ``key_transformer`` is given, it will be used to transform
34
    the casing (e.g. snake_case) to a different format (e.g. camelCase).
35
    :param obj: the object that is to be serialized.
36
    :param cls: the type of the object that is to be dumped.
37
    :param key_transformer: a function that will be applied to all keys in the
38
    resulting dict.
39
    :param strip_nulls: if ``True`` the resulting dict will not contain null
40
    values.
41
    :param strip_privates: if ``True`` the resulting dict will not contain
42
    private attributes (i.e. attributes that start with an underscore).
43
    :param strip_properties: if ``True`` the resulting dict will not contain
44
    values from @properties.
45
    :param strip_class_variables: if ``True`` the resulting dict will not
46
    contain values from class variables.
47
    :param strip_attr: can be a name or a collection of names of attributes
48
    that are not to be included in the dump.
49
    dict will not contain attributes with
50
    :param verbose: if ``True`` the resulting dict will contain meta
51
    information (e.g. on how to deserialize).
52
    :param strict: a bool to determine if the serializer should be strict
53
    (i.e. only dumping stuff that is known to ``cls``).
54
    :param fork_inst: if given, it uses this fork of ``JsonSerializable``.
55
    :param kwargs: any keyword arguments that are to be passed to the
56
    serializer functions.
57
    :return: a Python dict holding the values of ``obj``.
58
    """
59
    if obj is None:
60
        return obj
61
    strip_attr = _normalize_strip_attr(strip_attr)
62
    if cls and strict:
63
        attributes = _get_attributes_from_class(
64
            cls, strip_privates, strip_properties, strip_class_variables,
65
            strip_attr, strict)
66
    else:
67
        attributes = _get_attributes_from_object(
68
            obj, strip_privates, strip_properties, strip_class_variables,
69
            strip_attr, strict)
70
        cls = obj.__class__
71
    verbose = Verbosity.from_value(verbose)
72
    kwargs_ = {
73
        **kwargs,
74
        'fork_inst': fork_inst,
75
        'verbose': verbose,
76
        'strict': strict,
77
        # Set a flag in kwargs to temporarily store -cls:
78
        '_store_cls': Verbosity.WITH_CLASS_INFO in verbose
79
    }
80
81
    result = _do_serialize(obj=obj,
82
                           cls=cls,
83
                           attributes=attributes,
84
                           kwargs=kwargs_,
85
                           key_transformer=key_transformer,
86
                           strip_nulls=strip_nulls,
87
                           strip_privates=strip_privates,
88
                           strip_properties=strip_properties,
89
                           strip_class_variables=strip_class_variables,
90
                           strip_attr=strip_attr,
91
                           strict=strict,
92
                           fork_inst=fork_inst)
93
94
    cls_name = get_class_name(cls, fully_qualified=True)
95
    if not kwargs.get('_store_cls'):
96
        result = _get_dict_with_meta(result, cls_name, verbose, fork_inst)
97
    return result
98
99
100
def _do_serialize(
101
        obj: object,
102
        cls: type,
103
        attributes: Dict[str, Optional[type]],
104
        kwargs: dict,
105
        key_transformer: Optional[Callable[[str], str]] = None,
106
        strip_nulls: bool = False,
107
        strip_privates: bool = False,
108
        strip_properties: bool = False,
109
        strip_class_variables: bool = False,
110
        strip_attr: Union[str, MutableSequence[str], Tuple[str]] = None,
111
        strict: bool = False,
112
        fork_inst: Optional[type] = StateHolder) -> Dict[str, object]:
113
    result = dict()
114
    for attr_name, cls_ in attributes.items():
115
        attr = obj.__getattribute__(attr_name)
116
        dumped_elem = None
117
        try:
118
            dumped_elem = dump(attr,
119
                               cls=cls_,
120
                               key_transformer=key_transformer,
121
                               strip_nulls=strip_nulls,
122
                               strip_privates=strip_privates,
123
                               strip_properties=strip_properties,
124
                               strip_class_variables=strip_class_variables,
125
                               strip_attr=strip_attr,
126
                               **kwargs)
127
            _store_cls_info(dumped_elem, attr, kwargs)
128
        except RecursionDetectedError:
129
            fork_inst._warn('Recursive structure detected in attribute "{}" '
130
                            'of object of type "{}", ignoring the attribute.'
131
                            .format(attr_name, get_class_name(cls)))
132
        except SerializationError as err:
133
            if strict:
134
                raise
135
            else:
136
                fork_inst._warn('Failed to dump attribute "{}" of object of '
137
                                'type "{}". Reason: {}. Ignoring the '
138
                                'attribute.'
139
                                .format(attr, get_class_name(cls), err.message))
140
                break
141
        _add_dumped_elem(result, attr_name, dumped_elem,
142
                         strip_nulls, key_transformer)
143
    return result
144
145
146
def _add_dumped_elem(
147
        result: dict,
148
        attr_name: str,
149
        dumped_elem: object,
150
        strip_nulls: bool,
151
        key_transformer: Optional[Callable[[str], str]]):
152
    if not (strip_nulls and dumped_elem is None):
153
        if key_transformer:
154
            attr_name = key_transformer(attr_name)
155
        result[attr_name] = dumped_elem
156
157
158
def _normalize_strip_attr(strip_attr) -> tuple:
159
    # Make sure that strip_attr is always a tuple.
160
    strip_attr = strip_attr or tuple()
161
    if (not isinstance(strip_attr, MutableSequence)
162
            and not isinstance(strip_attr, tuple)):
163
        strip_attr = (strip_attr,)
164
    return strip_attr
165
166
167
@cached
168
def _get_attributes_from_class(
169
        cls: type,
170
        strip_privates: bool,
171
        strip_properties: bool,
172
        strip_class_variables: bool,
173
        strip_attr: tuple,
174
        strict: bool) -> Dict[str, Optional[type]]:
175
    # Get the attributes that are known in the class.
176
    attributes_and_types = _get_attributes_and_types(cls, strict)
177
    return _filter_attributes(cls, attributes_and_types, strip_privates,
178
                              strip_properties, strip_class_variables,
179
                              strip_attr)
180
181
182
def _get_attributes_from_object(
183
        obj: object,
184
        strip_privates: bool,
185
        strip_properties: bool,
186
        strip_class_variables: bool,
187
        strip_attr: tuple,
188
        strict: bool) -> Dict[str, Optional[type]]:
189
    # Get the attributes that are known in the object.
190
    cls = obj.__class__
191
    attributes_and_types = _get_attributes_and_types(cls, strict)
192
    attributes = {attr: attributes_and_types.get(attr, None)
193
                  for attr in dir(obj)}
194
    return _filter_attributes(cls, attributes, strip_privates,
195
                              strip_properties, strip_class_variables,
196
                              strip_attr)
197
198
199
@cached
200
def _get_attributes_and_types(cls: type,
201
                              strict: bool) -> Dict[str, Optional[type]]:
202
    if '__slots__' in cls.__dict__:
203
        attributes = {attr: None for attr in cls.__slots__}
204
    elif hasattr(cls, '__annotations__'):
205
        attributes = cls.__annotations__
206
    elif strict:
207
        hints = get_type_hints(cls.__init__)
208
        attributes = {k: hints[k] for k in hints if k != 'self'}
209
    else:
210
        attributes = {}
211
212
    # Add properties and class variables.
213
    props, class_vars = _get_class_props(cls)
214
    for elem in props + class_vars:
215
        attributes[elem] = None
216
217
    return attributes
218
219
220
def _filter_attributes(
221
        cls: type,
222
        attributes: Dict[str, Optional[type]],
223
        strip_privates: bool,
224
        strip_properties: bool,
225
        strip_class_variables: bool,
226
        strip_attr: tuple) -> Dict[str, Optional[type]]:
227
    # Filter the given attributes with the given preferences.
228
    strip_attr = strip_attr + _ABC_ATTRS
229
    excluded_elems = dir(JsonSerializable)
230
    props, other_cls_vars = _get_class_props(cls)
231
232
    return {attr: type_ for attr, type_ in attributes.items()
233
            if not attr.startswith('__')
234
            and not (strip_privates and attr.startswith('_'))
235
            and not (strip_properties and attr in props)
236
            and not (strip_class_variables and attr in other_cls_vars)
237
            and attr not in strip_attr
238
            and attr != 'json'
239
            and not isfunction(getattr(cls, attr, None))
240
            and attr not in excluded_elems}
241
242
243
def _get_class_props(cls: type) -> Tuple[list, list]:
244
    props = []
245
    other_cls_vars = []
246
    for n, v in _get_complete_class_dict(cls).items():
247
        list_to_append = props if isinstance(v, property) else other_cls_vars
248
        list_to_append.append(n)
249
    return props, other_cls_vars
250
251
252
def _get_complete_class_dict(cls: type) -> dict:
253
    cls_dict = {}
254
    # Loop reversed so values of sub-classes override those of super-classes.
255
    for cls_or_elder in reversed(cls.mro()):
256
        cls_dict.update(cls_or_elder.__dict__)
257
    return cls_dict
258
259
260
def _get_dict_with_meta(
261
        obj: dict,
262
        cls_name: str,
263
        verbose: Verbosity,
264
        fork_inst: type) -> dict:
265
    # This function will add a -meta section to the given obj (provided that
266
    # the given obj has -cls attributes for all children).
267
    if verbose is Verbosity.WITH_NOTHING:
268
        return obj
269
270
    obj[META_ATTR] = {}
271
    if Verbosity.WITH_CLASS_INFO in verbose:
272
        collection_of_types = {}
273
        _fill_collection_of_types(obj, cls_name, '/', collection_of_types)
274
        collection_of_types['/'] = cls_name
275
        obj[META_ATTR]['classes'] = collection_of_types
276
    if Verbosity.WITH_DUMP_TIME in verbose:
277
        dump_time = to_str(datetime.now(tz=timezone.utc), True, fork_inst)
278
        obj[META_ATTR]['dump_time'] = dump_time
279
    return obj
280
281
282
def _fill_collection_of_types(
283
        obj_: dict,
284
        cls_name_: Optional[str],
285
        prefix: str,
286
        collection_of_types_: dict) -> str:
287
    # This function loops through obj_ to fill collection_of_types_ with the
288
    # class names. All of the -cls attributes are removed in the process.
289
    cls_name_ = _get_class_name_and_strip_cls(cls_name_, obj_)
290
    for attr in obj_:
291
        if attr != META_ATTR and isinstance(obj_[attr], dict):
292
            attr_class = _fill_collection_of_types(obj_[attr],
293
                                                   None,
294
                                                   prefix + attr + '/',
295
                                                   collection_of_types_)
296
            collection_of_types_[prefix + attr] = attr_class
297
    return cls_name_
298
299
300
def _get_class_name_and_strip_cls(cls_name: Optional[str], obj: dict) -> str:
301
    result = cls_name
302
    if not cls_name and '-cls' in obj:
303
        result = obj['-cls']
304
    if '-cls' in obj:
305
        del obj['-cls']
306
    return result
307
308
309
def _store_cls_info(result: object, original_obj: dict, kwargs):
310
    if kwargs.get('_store_cls', None) and isinstance(result, dict):
311
        cls = get_type(original_obj)
312
        if cls.__module__ == 'typing':
313
            cls_name = repr(cls)
314
        else:
315
            cls_name = get_class_name(cls, fully_qualified=True)
316
        result['-cls'] = cls_name
317
318
319
_ABC_ATTRS = ('_abc_registry', '_abc_cache', '_abc_negative_cache',
320
              '_abc_negative_cache_version', '_abc_impl')
321