Passed
Pull Request — master (#77)
by Ramon
57s
created

jsons.serializers.default_object   B

Complexity

Total Complexity 48

Size/Duplication

Total Lines 312
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 48
eloc 227
dl 0
loc 312
rs 8.5599
c 0
b 0
f 0

14 Functions

Rating   Name   Duplication   Size   Complexity  
A _get_complete_class_dict() 0 6 2
A _get_attributes_from_object() 0 14 1
A _normalize_strip_attr() 0 7 3
A _get_attributes_from_class() 0 12 1
A _get_attributes_and_types() 0 9 3
A _get_dict_with_meta() 0 20 4
B _do_serialize() 0 47 8
A _store_cls_info() 0 9 4
B default_object_serializer() 0 82 5
A _filter_attributes() 0 21 1
A _get_class_props() 0 7 3
A _fill_collection_of_types() 0 16 4
A _get_class_name_and_strip_cls() 0 7 4
A _check_slots() 0 9 5

How to fix   Complexity   

Complexity

Complex classes like jsons.serializers.default_object often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
from datetime import datetime, timezone
2
from inspect import isfunction
3
from typing import Optional, Callable, Union, MutableSequence, Tuple, Dict
4
5
from typish import get_type
6
7
from jsons._cache import cached
8
from jsons._common_impl import get_class_name, META_ATTR, StateHolder
9
from jsons._datetime_impl import to_str
10
from jsons._dump_impl import dump
11
from jsons.classes import JsonSerializable
12
from jsons.classes.verbosity import Verbosity
13
from jsons.exceptions import SerializationError, RecursionDetectedError
14
15
16
def default_object_serializer(
17
        obj: object,
18
        cls: Optional[type] = None,
19
        *,
20
        key_transformer: Optional[Callable[[str], str]] = None,
21
        strip_nulls: bool = False,
22
        strip_privates: bool = False,
23
        strip_properties: bool = False,
24
        strip_class_variables: bool = False,
25
        strip_attr: Union[str, MutableSequence[str], Tuple[str]] = None,
26
        verbose: Union[Verbosity, bool] = False,
27
        strict: bool = False,
28
        fork_inst: Optional[type] = StateHolder,
29
        **kwargs) -> Optional[dict]:
30
    """
31
    Serialize the given ``obj`` to a dict. All values within ``obj`` are also
32
    serialized. If ``key_transformer`` is given, it will be used to transform
33
    the casing (e.g. snake_case) to a different format (e.g. camelCase).
34
    :param obj: the object that is to be serialized.
35
    :param cls: the type of the object that is to be dumped.
36
    :param key_transformer: a function that will be applied to all keys in the
37
    resulting dict.
38
    :param strip_nulls: if ``True`` the resulting dict will not contain null
39
    values.
40
    :param strip_privates: if ``True`` the resulting dict will not contain
41
    private attributes (i.e. attributes that start with an underscore).
42
    :param strip_properties: if ``True`` the resulting dict will not contain
43
    values from @properties.
44
    :param strip_class_variables: if ``True`` the resulting dict will not
45
    contain values from class variables.
46
    :param strip_attr: can be a name or a collection of names of attributes
47
    that are not to be included in the dump.
48
    dict will not contain attributes with
49
    :param verbose: if ``True`` the resulting dict will contain meta
50
    information (e.g. on how to deserialize).
51
    :param strict: a bool to determine if the serializer should be strict
52
    (i.e. only dumping stuff that is known to ``cls``).
53
    :param fork_inst: if given, it uses this fork of ``JsonSerializable``.
54
    :param kwargs: any keyword arguments that are to be passed to the
55
    serializer functions.
56
    :return: a Python dict holding the values of ``obj``.
57
    """
58
    if obj is None:
59
        return obj
60
    _check_slots(cls, fork_inst)
61
    strip_attr = _normalize_strip_attr(strip_attr)
62
    if strict and cls:
63
        attributes = _get_attributes_from_class(
64
            cls, strip_privates, strip_properties, strip_class_variables,
65
            strip_attr)
66
    else:
67
        attributes = _get_attributes_from_object(
68
            obj, strip_privates, strip_properties, strip_class_variables,
69
            strip_attr)
70
        cls = obj.__class__
71
    verbose = Verbosity.from_value(verbose)
72
    kwargs_ = {
73
        **kwargs,
74
        'fork_inst': fork_inst,
75
        'verbose': verbose,
76
        'strict': strict,
77
        # Set a flag in kwargs to temporarily store -cls:
78
        '_store_cls': Verbosity.WITH_CLASS_INFO in verbose
79
    }
80
81
    result = _do_serialize(obj=obj,
82
                           cls=cls,
83
                           attributes=attributes,
84
                           kwargs=kwargs_,
85
                           key_transformer=key_transformer,
86
                           strip_nulls=strip_nulls,
87
                           strip_privates=strip_privates,
88
                           strip_properties=strip_properties,
89
                           strip_class_variables=strip_class_variables,
90
                           strip_attr=strip_attr,
91
                           strict=strict,
92
                           fork_inst=fork_inst)
93
94
    cls_name = get_class_name(cls, fully_qualified=True, fork_inst=fork_inst)
95
    if not kwargs.get('_store_cls'):
96
        result = _get_dict_with_meta(result, cls_name, verbose, fork_inst)
97
    return result
98
99
100
def _do_serialize(
101
        obj: object,
102
        cls: type,
103
        attributes: Dict[str, Optional[type]],
104
        kwargs: dict,
105
        key_transformer: Optional[Callable[[str], str]] = None,
106
        strip_nulls: bool = False,
107
        strip_privates: bool = False,
108
        strip_properties: bool = False,
109
        strip_class_variables: bool = False,
110
        strip_attr: Union[str, MutableSequence[str], Tuple[str]] = None,
111
        strict: bool = False,
112
        fork_inst: Optional[type] = StateHolder) -> Dict[str, object]:
113
    result = dict()
114
    for attr_name, cls_ in attributes.items():
115
        attr = obj.__getattribute__(attr_name)
116
        dumped_elem = None
117
        try:
118
            dumped_elem = dump(attr,
119
                               cls=cls_,
120
                               key_transformer=key_transformer,
121
                               strip_nulls=strip_nulls,
122
                               strip_privates=strip_privates,
123
                               strip_properties=strip_properties,
124
                               strip_class_variables=strip_class_variables,
125
                               strip_attr=strip_attr,
126
                               **kwargs)
127
128
            _store_cls_info(dumped_elem, attr, kwargs)
129
        except RecursionDetectedError:
130
            fork_inst._warn('Recursive structure detected in attribute "{}" '
131
                            'of object of type "{}", ignoring the attribute.'
132
                            .format(attr_name, get_class_name(cls)))
133
        except SerializationError as err:
134
            if strict:
135
                raise
136
            else:
137
                fork_inst._warn('Failed to dump attribute "{}" of object of '
138
                                'type "{}". Reason: {}. Ignoring the '
139
                                'attribute.'
140
                                .format(attr, get_class_name(cls), err.message))
141
                break
142
        if not (strip_nulls and dumped_elem is None):
143
            if key_transformer:
144
                attr_name = key_transformer(attr_name)
145
            result[attr_name] = dumped_elem
146
    return result
147
148
149
def _check_slots(cls: type, fork_inst: Optional[type]):
150
    # Check for __slots__ or __dataclass_fields__.
151
    if (cls and not hasattr(cls, '__slots__')
152
            and not hasattr(cls, '__annotations__')
153
            and not hasattr(cls, '__dataclass_fields__')):
154
        raise SerializationError('Invalid type: "{}". Only dataclasses or '
155
                                 'types that have a __slots__ defined are '
156
                                 'allowed when providing "cls".'
157
                                 .format(get_class_name(cls, fork_inst=fork_inst, fully_qualified=True)))
158
159
160
def _normalize_strip_attr(strip_attr) -> tuple:
161
    # Make sure that strip_attr is always a tuple.
162
    strip_attr = strip_attr or tuple()
163
    if (not isinstance(strip_attr, MutableSequence)
164
            and not isinstance(strip_attr, tuple)):
165
        strip_attr = (strip_attr,)
166
    return strip_attr
167
168
169
@cached
170
def _get_attributes_from_class(
171
        cls: type,
172
        strip_privates: bool,
173
        strip_properties: bool,
174
        strip_class_variables: bool,
175
        strip_attr: tuple) -> Dict[str, Optional[type]]:
176
    # Get the attributes that are known in the class.
177
    attributes_and_types = _get_attributes_and_types(cls)
178
    return _filter_attributes(cls, attributes_and_types, strip_privates,
179
                              strip_properties, strip_class_variables,
180
                              strip_attr)
181
182
183
def _get_attributes_from_object(
184
        obj: object,
185
        strip_privates: bool,
186
        strip_properties: bool,
187
        strip_class_variables: bool,
188
        strip_attr: tuple) -> Dict[str, Optional[type]]:
189
    # Get the attributes that are known in the object.
190
    cls = obj.__class__
191
    attributes_and_types = _get_attributes_and_types(cls)
192
    attributes = {attr: attributes_and_types.get(attr, None)
193
                  for attr in dir(obj)}
194
    return _filter_attributes(cls, attributes, strip_privates,
195
                              strip_properties, strip_class_variables,
196
                              strip_attr)
197
198
199
@cached
200
def _get_attributes_and_types(cls) -> Dict[str, Optional[type]]:
201
    if '__slots__' in cls.__dict__:
202
        attributes = {attr: None for attr in cls.__slots__}
203
    elif hasattr(cls, '__annotations__'):
204
        attributes = cls.__annotations__
205
    else:
206
        attributes = {}
207
    return attributes
208
209
210
def _filter_attributes(
211
        cls: type,
212
        attributes: Dict[str, Optional[type]],
213
        strip_privates: bool,
214
        strip_properties: bool,
215
        strip_class_variables: bool,
216
        strip_attr: tuple) -> Dict[str, Optional[type]]:
217
    # Filter the given attributes with the given preferences.
218
    strip_attr = strip_attr + _ABC_ATTRS
219
    excluded_elems = dir(JsonSerializable)
220
    props, other_cls_vars = _get_class_props(cls)
221
222
    return {attr: type_ for attr, type_ in attributes.items()
223
            if not attr.startswith('__')
224
            and not (strip_privates and attr.startswith('_'))
225
            and not (strip_properties and attr in props)
226
            and not (strip_class_variables and attr in other_cls_vars)
227
            and attr not in strip_attr
228
            and attr != 'json'
229
            and not isfunction(getattr(cls, attr, None))
230
            and attr not in excluded_elems}
231
232
233
def _get_class_props(cls: type) -> Tuple[list, list]:
234
    props = []
235
    other_cls_vars = []
236
    for n, v in _get_complete_class_dict(cls).items():
237
        list_to_append = props if isinstance(v, property) else other_cls_vars
238
        list_to_append.append(n)
239
    return props, other_cls_vars
240
241
242
def _get_complete_class_dict(cls: type) -> dict:
243
    cls_dict = {}
244
    # Loop reversed so values of sub-classes override those of super-classes.
245
    for cls_or_elder in reversed(cls.mro()):
246
        cls_dict.update(cls_or_elder.__dict__)
247
    return cls_dict
248
249
250
def _get_dict_with_meta(
251
        obj: dict,
252
        cls_name: str,
253
        verbose: Verbosity,
254
        fork_inst: type) -> dict:
255
    # This function will add a -meta section to the given obj (provided that
256
    # the given obj has -cls attributes for all children).
257
    if verbose is Verbosity.WITH_NOTHING:
258
        return obj
259
260
    obj[META_ATTR] = {}
261
    if Verbosity.WITH_CLASS_INFO in verbose:
262
        collection_of_types = {}
263
        _fill_collection_of_types(obj, cls_name, '/', collection_of_types)
264
        collection_of_types['/'] = cls_name
265
        obj[META_ATTR]['classes'] = collection_of_types
266
    if Verbosity.WITH_DUMP_TIME in verbose:
267
        dump_time = to_str(datetime.now(tz=timezone.utc), True, fork_inst)
268
        obj[META_ATTR]['dump_time'] = dump_time
269
    return obj
270
271
272
def _fill_collection_of_types(
273
        obj_: dict,
274
        cls_name_: Optional[str],
275
        prefix: str,
276
        collection_of_types_: dict) -> str:
277
    # This function loops through obj_ to fill collection_of_types_ with the
278
    # class names. All of the -cls attributes are removed in the process.
279
    cls_name_ = _get_class_name_and_strip_cls(cls_name_, obj_)
280
    for attr in obj_:
281
        if attr != META_ATTR and isinstance(obj_[attr], dict):
282
            attr_class = _fill_collection_of_types(obj_[attr],
283
                                                   None,
284
                                                   prefix + attr + '/',
285
                                                   collection_of_types_)
286
            collection_of_types_[prefix + attr] = attr_class
287
    return cls_name_
288
289
290
def _get_class_name_and_strip_cls(cls_name: Optional[str], obj: dict) -> str:
291
    result = cls_name
292
    if not cls_name and '-cls' in obj:
293
        result = obj['-cls']
294
    if '-cls' in obj:
295
        del obj['-cls']
296
    return result
297
298
299
def _store_cls_info(result: object, original_obj: dict, kwargs):
300
    if kwargs.get('_store_cls', None) and isinstance(result, dict):
301
        cls = get_type(original_obj)
302
        if cls.__module__ == 'typing':
303
            cls_name = repr(cls)
304
        else:
305
            cls_name = get_class_name(cls, fully_qualified=True,
306
                                      fork_inst=kwargs['fork_inst'])
307
        result['-cls'] = cls_name
308
309
310
_ABC_ATTRS = ('_abc_registry', '_abc_cache', '_abc_negative_cache',
311
              '_abc_negative_cache_version', '_abc_impl')
312