|
1
|
|
|
""" |
|
2
|
|
|
PRIVATE MODULE: do not import (from) it directly. |
|
3
|
|
|
|
|
4
|
|
|
This module contains implementations of common functionality that can be used |
|
5
|
|
|
throughout `jsons`. |
|
6
|
|
|
""" |
|
7
|
|
|
import warnings |
|
8
|
|
|
from importlib import import_module |
|
9
|
|
|
from typing import Callable, Optional |
|
10
|
|
|
from jsons.exceptions import UnknownClassError |
|
11
|
|
|
|
|
12
|
|
|
META_ATTR = '-meta' # The name of the attribute holding meta info. |
|
13
|
|
|
|
|
14
|
|
|
|
|
15
|
|
|
class StateHolder: |
|
16
|
|
|
""" |
|
17
|
|
|
This class holds the registered serializers and deserializers. |
|
18
|
|
|
""" |
|
19
|
|
|
_classes_serializers = list() |
|
20
|
|
|
_classes_deserializers = list() |
|
21
|
|
|
_serializers = dict() |
|
22
|
|
|
_deserializers = dict() |
|
23
|
|
|
_announced_classes = dict() |
|
24
|
|
|
_suppress_warnings = False |
|
25
|
|
|
|
|
26
|
|
|
@classmethod |
|
27
|
|
|
def _warn(cls, msg, *args, **kwargs): |
|
28
|
|
|
if not cls._suppress_warnings: |
|
29
|
|
|
msg_ = ('{} You can suppress warnings like this using ' |
|
30
|
|
|
'jsons.suppress_warnings().'.format(msg)) |
|
31
|
|
|
warnings.warn(msg_, *args, **kwargs) |
|
32
|
|
|
|
|
33
|
|
|
|
|
34
|
|
|
def get_class_name(cls: type, |
|
35
|
|
|
transformer: Optional[Callable[[str], str]] = None, |
|
36
|
|
|
fully_qualified: bool = False, |
|
37
|
|
|
fork_inst: Optional[type] = StateHolder) -> Optional[str]: |
|
38
|
|
|
""" |
|
39
|
|
|
Return the name of a class. |
|
40
|
|
|
:param cls: the class of which the name if to be returned. |
|
41
|
|
|
:param transformer: any string transformer, e.g. ``str.lower``. |
|
42
|
|
|
:param fully_qualified: if ``True`` return the fully qualified name (i.e. |
|
43
|
|
|
complete with module name). |
|
44
|
|
|
:param fork_inst if given, it uses this fork of ``JsonSerializable`` for |
|
45
|
|
|
finding the class name. |
|
46
|
|
|
:return: the name of ``cls``, transformed if a transformer is given. |
|
47
|
|
|
""" |
|
48
|
|
|
if cls in fork_inst._announced_classes: |
|
49
|
|
|
return fork_inst._announced_classes[cls] |
|
50
|
|
|
cls_name = _get_simple_name(cls) |
|
51
|
|
|
module = _get_module(cls) |
|
52
|
|
|
transformer = transformer or (lambda x: x) |
|
53
|
|
|
if not cls_name and hasattr(cls, '__origin__'): |
|
54
|
|
|
origin = cls.__origin__ |
|
55
|
|
|
cls_name = get_class_name(origin, transformer, |
|
56
|
|
|
fully_qualified, fork_inst) |
|
57
|
|
|
if not cls_name: |
|
58
|
|
|
cls_name = str(cls) |
|
59
|
|
|
if fully_qualified and module: |
|
60
|
|
|
cls_name = '{}.{}'.format(module, cls_name) |
|
61
|
|
|
cls_name = transformer(cls_name) |
|
62
|
|
|
return cls_name |
|
63
|
|
|
|
|
64
|
|
|
|
|
65
|
|
|
def get_cls_from_str(cls_str: str, source: object, fork_inst) -> type: |
|
66
|
|
|
try: |
|
67
|
|
|
splitted = cls_str.split('.') |
|
68
|
|
|
module_name = '.'.join(splitted[:-1]) |
|
69
|
|
|
cls_name = splitted[-1] |
|
70
|
|
|
cls_module = import_module(module_name) |
|
71
|
|
|
cls = getattr(cls_module, cls_name) |
|
72
|
|
|
if not cls or not isinstance(cls, type): |
|
73
|
|
|
cls = _lookup_announced_class(cls_str, source, fork_inst) |
|
74
|
|
|
except (ImportError, AttributeError, ValueError): |
|
75
|
|
|
cls = _lookup_announced_class(cls_str, source, fork_inst) |
|
76
|
|
|
return cls |
|
77
|
|
|
|
|
78
|
|
|
|
|
79
|
|
|
def _lookup_announced_class( |
|
80
|
|
|
cls_str: str, |
|
81
|
|
|
source: object, |
|
82
|
|
|
fork_inst: type) -> type: |
|
83
|
|
|
cls = fork_inst._announced_classes.get(cls_str) |
|
84
|
|
|
if not cls: |
|
85
|
|
|
msg = ('Could not find a suitable type for "{}". Make sure it can be ' |
|
86
|
|
|
'imported or that is has been announced.'.format(cls_str)) |
|
87
|
|
|
raise UnknownClassError(msg, source, cls_str) |
|
88
|
|
|
return cls |
|
89
|
|
|
|
|
90
|
|
|
|
|
91
|
|
|
def _get_simple_name(cls: type) -> str: |
|
92
|
|
|
cls_name = getattr(cls, '__name__', None) |
|
93
|
|
|
if not cls_name: |
|
94
|
|
|
cls_name = getattr(cls, '_name', None) |
|
95
|
|
|
if not cls_name: |
|
96
|
|
|
cls_name = repr(cls) |
|
97
|
|
|
cls_name = cls_name.split('[')[0] # Remove generic types. |
|
98
|
|
|
cls_name = cls_name.split('.')[-1] # Remove any . caused by repr. |
|
99
|
|
|
return cls_name |
|
100
|
|
|
|
|
101
|
|
|
|
|
102
|
|
|
def _get_module(cls: type) -> Optional[str]: |
|
103
|
|
|
builtin_module = str.__class__.__module__ |
|
104
|
|
|
module = cls.__module__ |
|
105
|
|
|
if module and module != builtin_module: |
|
106
|
|
|
return module |
|
107
|
|
|
|
|
108
|
|
|
|
|
109
|
|
|
def get_parents(cls: type, lizers: list) -> list: |
|
110
|
|
|
""" |
|
111
|
|
|
Return a list of serializers or deserializers that can handle a parent |
|
112
|
|
|
of ``cls``. |
|
113
|
|
|
:param cls: the type that |
|
114
|
|
|
:param lizers: a list of serializers or deserializers. |
|
115
|
|
|
:return: a list of serializers or deserializers. |
|
116
|
|
|
""" |
|
117
|
|
|
parents = [] |
|
118
|
|
|
for cls_ in lizers: |
|
119
|
|
|
try: |
|
120
|
|
|
if issubclass(cls, cls_): |
|
121
|
|
|
parents.append(cls_) |
|
122
|
|
|
except TypeError: |
|
123
|
|
|
pass # Some types do not support `issubclass` (e.g. Union). |
|
124
|
|
|
return parents |
|
125
|
|
|
|