Completed
Push — master ( 8f3758...257d3c )
by Ramon
29s queued 11s
created

jsons.serializers.default_object   B

Complexity

Total Complexity 44

Size/Duplication

Total Lines 313
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 44
eloc 230
dl 0
loc 313
rs 8.8798
c 0
b 0
f 0

13 Functions

Rating   Name   Duplication   Size   Complexity  
B _do_serialize() 0 47 8
A _get_complete_class_dict() 0 6 2
A _get_attributes_from_object() 0 15 1
A _normalize_strip_attr() 0 7 3
A _get_attributes_from_class() 0 13 1
A _get_attributes_and_types() 0 19 5
A _get_dict_with_meta() 0 20 4
A _store_cls_info() 0 9 4
B default_object_serializer() 0 81 4
A _filter_attributes() 0 21 1
A _get_class_props() 0 7 3
A _fill_collection_of_types() 0 16 4
A _get_class_name_and_strip_cls() 0 7 4

How to fix   Complexity   

Complexity

Complex classes like jsons.serializers.default_object often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
from datetime import datetime, timezone
2
from inspect import isfunction
3
from typing import Optional, Callable, Union, MutableSequence, Tuple, Dict
4
5
from typish import get_type
6
7
from jsons._cache import cached
8
from jsons._common_impl import get_class_name, META_ATTR, StateHolder
9
from jsons._compatibility_impl import get_type_hints
10
from jsons._datetime_impl import to_str
11
from jsons._dump_impl import dump
12
from jsons.classes import JsonSerializable
13
from jsons.classes.verbosity import Verbosity
14
from jsons.exceptions import SerializationError, RecursionDetectedError
15
16
17
def default_object_serializer(
18
        obj: object,
19
        cls: Optional[type] = None,
20
        *,
21
        key_transformer: Optional[Callable[[str], str]] = None,
22
        strip_nulls: bool = False,
23
        strip_privates: bool = False,
24
        strip_properties: bool = False,
25
        strip_class_variables: bool = False,
26
        strip_attr: Union[str, MutableSequence[str], Tuple[str]] = None,
27
        verbose: Union[Verbosity, bool] = False,
28
        strict: bool = False,
29
        fork_inst: Optional[type] = StateHolder,
30
        **kwargs) -> Optional[dict]:
31
    """
32
    Serialize the given ``obj`` to a dict. All values within ``obj`` are also
33
    serialized. If ``key_transformer`` is given, it will be used to transform
34
    the casing (e.g. snake_case) to a different format (e.g. camelCase).
35
    :param obj: the object that is to be serialized.
36
    :param cls: the type of the object that is to be dumped.
37
    :param key_transformer: a function that will be applied to all keys in the
38
    resulting dict.
39
    :param strip_nulls: if ``True`` the resulting dict will not contain null
40
    values.
41
    :param strip_privates: if ``True`` the resulting dict will not contain
42
    private attributes (i.e. attributes that start with an underscore).
43
    :param strip_properties: if ``True`` the resulting dict will not contain
44
    values from @properties.
45
    :param strip_class_variables: if ``True`` the resulting dict will not
46
    contain values from class variables.
47
    :param strip_attr: can be a name or a collection of names of attributes
48
    that are not to be included in the dump.
49
    dict will not contain attributes with
50
    :param verbose: if ``True`` the resulting dict will contain meta
51
    information (e.g. on how to deserialize).
52
    :param strict: a bool to determine if the serializer should be strict
53
    (i.e. only dumping stuff that is known to ``cls``).
54
    :param fork_inst: if given, it uses this fork of ``JsonSerializable``.
55
    :param kwargs: any keyword arguments that are to be passed to the
56
    serializer functions.
57
    :return: a Python dict holding the values of ``obj``.
58
    """
59
    if obj is None:
60
        return obj
61
    strip_attr = _normalize_strip_attr(strip_attr)
62
    if cls:
63
        attributes = _get_attributes_from_class(
64
            cls, strip_privates, strip_properties, strip_class_variables,
65
            strip_attr, strict)
66
    else:
67
        attributes = _get_attributes_from_object(
68
            obj, strip_privates, strip_properties, strip_class_variables,
69
            strip_attr, strict)
70
        cls = obj.__class__
71
    verbose = Verbosity.from_value(verbose)
72
    kwargs_ = {
73
        **kwargs,
74
        'fork_inst': fork_inst,
75
        'verbose': verbose,
76
        'strict': strict,
77
        # Set a flag in kwargs to temporarily store -cls:
78
        '_store_cls': Verbosity.WITH_CLASS_INFO in verbose
79
    }
80
81
    result = _do_serialize(obj=obj,
82
                           cls=cls,
83
                           attributes=attributes,
84
                           kwargs=kwargs_,
85
                           key_transformer=key_transformer,
86
                           strip_nulls=strip_nulls,
87
                           strip_privates=strip_privates,
88
                           strip_properties=strip_properties,
89
                           strip_class_variables=strip_class_variables,
90
                           strip_attr=strip_attr,
91
                           strict=strict,
92
                           fork_inst=fork_inst)
93
94
    cls_name = get_class_name(cls, fully_qualified=True, fork_inst=fork_inst)
95
    if not kwargs.get('_store_cls'):
96
        result = _get_dict_with_meta(result, cls_name, verbose, fork_inst)
97
    return result
98
99
100
def _do_serialize(
101
        obj: object,
102
        cls: type,
103
        attributes: Dict[str, Optional[type]],
104
        kwargs: dict,
105
        key_transformer: Optional[Callable[[str], str]] = None,
106
        strip_nulls: bool = False,
107
        strip_privates: bool = False,
108
        strip_properties: bool = False,
109
        strip_class_variables: bool = False,
110
        strip_attr: Union[str, MutableSequence[str], Tuple[str]] = None,
111
        strict: bool = False,
112
        fork_inst: Optional[type] = StateHolder) -> Dict[str, object]:
113
    result = dict()
114
    for attr_name, cls_ in attributes.items():
115
        attr = obj.__getattribute__(attr_name)
116
        dumped_elem = None
117
        try:
118
            dumped_elem = dump(attr,
119
                               cls=cls_,
120
                               key_transformer=key_transformer,
121
                               strip_nulls=strip_nulls,
122
                               strip_privates=strip_privates,
123
                               strip_properties=strip_properties,
124
                               strip_class_variables=strip_class_variables,
125
                               strip_attr=strip_attr,
126
                               **kwargs)
127
128
            _store_cls_info(dumped_elem, attr, kwargs)
129
        except RecursionDetectedError:
130
            fork_inst._warn('Recursive structure detected in attribute "{}" '
131
                            'of object of type "{}", ignoring the attribute.'
132
                            .format(attr_name, get_class_name(cls)))
133
        except SerializationError as err:
134
            if strict:
135
                raise
136
            else:
137
                fork_inst._warn('Failed to dump attribute "{}" of object of '
138
                                'type "{}". Reason: {}. Ignoring the '
139
                                'attribute.'
140
                                .format(attr, get_class_name(cls), err.message))
141
                break
142
        if not (strip_nulls and dumped_elem is None):
143
            if key_transformer:
144
                attr_name = key_transformer(attr_name)
145
            result[attr_name] = dumped_elem
146
    return result
147
148
149
def _normalize_strip_attr(strip_attr) -> tuple:
150
    # Make sure that strip_attr is always a tuple.
151
    strip_attr = strip_attr or tuple()
152
    if (not isinstance(strip_attr, MutableSequence)
153
            and not isinstance(strip_attr, tuple)):
154
        strip_attr = (strip_attr,)
155
    return strip_attr
156
157
158
@cached
159
def _get_attributes_from_class(
160
        cls: type,
161
        strip_privates: bool,
162
        strip_properties: bool,
163
        strip_class_variables: bool,
164
        strip_attr: tuple,
165
        strict: bool) -> Dict[str, Optional[type]]:
166
    # Get the attributes that are known in the class.
167
    attributes_and_types = _get_attributes_and_types(cls, strict)
168
    return _filter_attributes(cls, attributes_and_types, strip_privates,
169
                              strip_properties, strip_class_variables,
170
                              strip_attr)
171
172
173
def _get_attributes_from_object(
174
        obj: object,
175
        strip_privates: bool,
176
        strip_properties: bool,
177
        strip_class_variables: bool,
178
        strip_attr: tuple,
179
        strict: bool) -> Dict[str, Optional[type]]:
180
    # Get the attributes that are known in the object.
181
    cls = obj.__class__
182
    attributes_and_types = _get_attributes_and_types(cls, strict)
183
    attributes = {attr: attributes_and_types.get(attr, None)
184
                  for attr in dir(obj)}
185
    return _filter_attributes(cls, attributes, strip_privates,
186
                              strip_properties, strip_class_variables,
187
                              strip_attr)
188
189
190
@cached
191
def _get_attributes_and_types(cls: type,
192
                              strict: bool) -> Dict[str, Optional[type]]:
193
    if '__slots__' in cls.__dict__:
194
        attributes = {attr: None for attr in cls.__slots__}
195
    elif hasattr(cls, '__annotations__'):
196
        attributes = cls.__annotations__
197
    elif strict:
198
        hints = get_type_hints(cls.__init__)
199
        attributes = {k: hints[k] for k in hints if k != 'self'}
200
    else:
201
        attributes = {}
202
203
    # Add properties and class variables.
204
    props, class_vars = _get_class_props(cls)
205
    for elem in props + class_vars:
206
        attributes[elem] = None
207
208
    return attributes
209
210
211
def _filter_attributes(
212
        cls: type,
213
        attributes: Dict[str, Optional[type]],
214
        strip_privates: bool,
215
        strip_properties: bool,
216
        strip_class_variables: bool,
217
        strip_attr: tuple) -> Dict[str, Optional[type]]:
218
    # Filter the given attributes with the given preferences.
219
    strip_attr = strip_attr + _ABC_ATTRS
220
    excluded_elems = dir(JsonSerializable)
221
    props, other_cls_vars = _get_class_props(cls)
222
223
    return {attr: type_ for attr, type_ in attributes.items()
224
            if not attr.startswith('__')
225
            and not (strip_privates and attr.startswith('_'))
226
            and not (strip_properties and attr in props)
227
            and not (strip_class_variables and attr in other_cls_vars)
228
            and attr not in strip_attr
229
            and attr != 'json'
230
            and not isfunction(getattr(cls, attr, None))
231
            and attr not in excluded_elems}
232
233
234
def _get_class_props(cls: type) -> Tuple[list, list]:
235
    props = []
236
    other_cls_vars = []
237
    for n, v in _get_complete_class_dict(cls).items():
238
        list_to_append = props if isinstance(v, property) else other_cls_vars
239
        list_to_append.append(n)
240
    return props, other_cls_vars
241
242
243
def _get_complete_class_dict(cls: type) -> dict:
244
    cls_dict = {}
245
    # Loop reversed so values of sub-classes override those of super-classes.
246
    for cls_or_elder in reversed(cls.mro()):
247
        cls_dict.update(cls_or_elder.__dict__)
248
    return cls_dict
249
250
251
def _get_dict_with_meta(
252
        obj: dict,
253
        cls_name: str,
254
        verbose: Verbosity,
255
        fork_inst: type) -> dict:
256
    # This function will add a -meta section to the given obj (provided that
257
    # the given obj has -cls attributes for all children).
258
    if verbose is Verbosity.WITH_NOTHING:
259
        return obj
260
261
    obj[META_ATTR] = {}
262
    if Verbosity.WITH_CLASS_INFO in verbose:
263
        collection_of_types = {}
264
        _fill_collection_of_types(obj, cls_name, '/', collection_of_types)
265
        collection_of_types['/'] = cls_name
266
        obj[META_ATTR]['classes'] = collection_of_types
267
    if Verbosity.WITH_DUMP_TIME in verbose:
268
        dump_time = to_str(datetime.now(tz=timezone.utc), True, fork_inst)
269
        obj[META_ATTR]['dump_time'] = dump_time
270
    return obj
271
272
273
def _fill_collection_of_types(
274
        obj_: dict,
275
        cls_name_: Optional[str],
276
        prefix: str,
277
        collection_of_types_: dict) -> str:
278
    # This function loops through obj_ to fill collection_of_types_ with the
279
    # class names. All of the -cls attributes are removed in the process.
280
    cls_name_ = _get_class_name_and_strip_cls(cls_name_, obj_)
281
    for attr in obj_:
282
        if attr != META_ATTR and isinstance(obj_[attr], dict):
283
            attr_class = _fill_collection_of_types(obj_[attr],
284
                                                   None,
285
                                                   prefix + attr + '/',
286
                                                   collection_of_types_)
287
            collection_of_types_[prefix + attr] = attr_class
288
    return cls_name_
289
290
291
def _get_class_name_and_strip_cls(cls_name: Optional[str], obj: dict) -> str:
292
    result = cls_name
293
    if not cls_name and '-cls' in obj:
294
        result = obj['-cls']
295
    if '-cls' in obj:
296
        del obj['-cls']
297
    return result
298
299
300
def _store_cls_info(result: object, original_obj: dict, kwargs):
301
    if kwargs.get('_store_cls', None) and isinstance(result, dict):
302
        cls = get_type(original_obj)
303
        if cls.__module__ == 'typing':
304
            cls_name = repr(cls)
305
        else:
306
            cls_name = get_class_name(cls, fully_qualified=True,
307
                                      fork_inst=kwargs['fork_inst'])
308
        result['-cls'] = cls_name
309
310
311
_ABC_ATTRS = ('_abc_registry', '_abc_cache', '_abc_negative_cache',
312
              '_abc_negative_cache_version', '_abc_impl')
313