Passed
Push — master ( 3ccb78...243c32 )
by Max
53s
created

structured_data.enum   B

Complexity

Total Complexity 51

Size/Duplication

Total Lines 232
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 152
dl 0
loc 232
rs 7.92
c 0
b 0
f 0
wmc 51

3 Methods

Rating   Name   Duplication   Size   Complexity  
A Ctor.__new__() 0 6 2
A Ctor.__init_subclass__() 0 2 1
A Ctor.__class_getitem__() 0 4 2

18 Functions

Rating   Name   Duplication   Size   Complexity  
A _parse_constructor() 0 5 2
A _set_new_functions() 0 12 4
A _name() 0 3 1
A _make_nested_new() 0 7 2
A _get_args_from_index() 0 4 3
A _extract_tuple_ast() 0 13 5
A _interpret_args_from_non_string() 0 5 2
A _checked_eval() 0 5 2
A _args() 0 7 3
A _enum_super() 0 4 1
A _nillable_write() 0 5 2
A enum() 0 11 2
A _process_class() 0 31 4
A _add_methods() 0 5 2
A _set_hash() 0 3 2
A _args_from_annotations() 0 7 3
A _custom_new() 0 5 2
A _add_order() 0 17 4

How to fix   Complexity   

Complexity

Complex classes like structured_data.enum often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""Class decorator for defining abstract data types."""
2
3
import ast
4
import sys
5
import typing
6
import weakref
7
8
import astor
9
10
from ._enum_constructor import make_constructor
11
from ._prewritten_methods import SUBCLASS_ORDER
12
from ._prewritten_methods import PrewrittenMethods
13
14
_CTOR_CACHE = {}
15
16
17
ARGS = weakref.WeakKeyDictionary()
18
19
20
class Ctor:
21
    """Marker class for enum constructors.
22
23
    To use, index with a sequence of types, and annotate a variable in an
24
    enum-decorated class with it.
25
    """
26
27
    def __new__(cls, args):
28
        if args == ():
29
            return cls
30
        self = object.__new__(cls)
31
        ARGS[self] = args
32
        return _CTOR_CACHE.setdefault(args, self)
33
34
    def __init_subclass__(cls, **kwargs):
35
        raise TypeError
36
37
    def __class_getitem__(cls, args):
38
        if not isinstance(args, tuple):
39
            args = (args,)
40
        return cls(args)
41
42
43
ARGS[Ctor] = ()
44
45
46
def _interpret_args_from_non_string(constructor):
47
    try:
48
        return ARGS.get(constructor)
49
    except TypeError:
50
        return None
51
52
53
def _parse_constructor(constructor):
54
    try:
55
        return ast.parse(constructor, mode='eval')
56
    except Exception:
57
        raise ValueError('parsing annotation failed')
58
59
60
def _get_args_from_index(index):
61
    if isinstance(index, ast.Tuple):
62
        return tuple(astor.to_source(elt) for elt in index.elts)
63
    return (astor.to_source(index),)
64
65
66
def _checked_eval(source, global_ns):
67
    try:
68
        return eval(source, global_ns)
69
    except Exception:
70
        return None
71
72
73
def _extract_tuple_ast(constructor, global_ns):
74
    ctor_ast = _parse_constructor(constructor)
75
    if (
76
            isinstance(ctor_ast.body, ast.Subscript)
77
            and isinstance(ctor_ast.body.slice, ast.Index)):
78
        index = ctor_ast.body.slice.value
79
        ctor_ast.body = ctor_ast.body.value
80
        value = _checked_eval(compile(ctor_ast, '<annotation>', 'eval'), global_ns)
81
        if value is Ctor:
82
            return _get_args_from_index(index)
83
        if value is None:
84
            return None
85
    return _interpret_args_from_non_string(_checked_eval(constructor, global_ns))
86
87
88
def _args(constructor, global_ns):
89
    if isinstance(constructor, str):
90
        try:
91
            return _extract_tuple_ast(constructor, global_ns)
92
        except ValueError:
93
            return None
94
    return _interpret_args_from_non_string(constructor)
95
96
97
def _name(cls, function) -> str:
98
    """Return the name of a function accessed through a descriptor."""
99
    return function.__get__(None, cls).__name__
100
101
102
def _set_new_functions(cls, *functions) -> typing.Optional[str]:
103
    """Attempt to set the attributes corresponding to the functions on cls.
104
105
    If any attributes are already defined, fail *before* setting any, and
106
    return the already-defined name.
107
    """
108
    for function in functions:
109
        if _name(cls, function) in cls.__dict__:
110
            return _name(cls, function)
111
    for function in functions:
112
        setattr(cls, _name(cls, function), function)
113
    return None
114
115
116
def _enum_super(_cls):
117
    def base(cls, args):
118
        return super(_cls, cls).__new__(cls, args)
119
    return base
120
121
122
def _make_nested_new(_cls, subclasses, base__new__):
123
    @staticmethod
124
    def __new__(cls, args):
125
        if cls not in subclasses:
126
            raise TypeError
127
        return base__new__(cls, args)
128
    return __new__
129
130
131
def _nillable_write(dct, key, value):
132
    if value is None:
133
        dct.pop(key, None)
134
    else:
135
        dct[key] = value
136
137
138
def _add_methods(cls, do_set, *methods):
139
    methods_were_set = False
140
    if do_set:
141
        methods_were_set = not _set_new_functions(cls, *methods)
142
    return methods_were_set
143
144
145
def _set_hash(cls, set_hash):
146
    if set_hash:
147
        cls.__hash__ = PrewrittenMethods.__hash__
148
149
150
def _add_order(cls, set_order, equality_methods_were_set):
151
    if set_order:
152
        if not equality_methods_were_set:
153
            raise ValueError(
154
                "Can't add ordering methods if equality methods are provided.")
155
        collision = _set_new_functions(
156
            cls,
157
            PrewrittenMethods.__lt__,
158
            PrewrittenMethods.__le__,
159
            PrewrittenMethods.__gt__,
160
            PrewrittenMethods.__ge__
161
            )
162
        if collision:
163
            raise TypeError(
164
                'Cannot overwrite attribute {collision} in class '
165
                '{name}. Consider using functools.total_ordering'.format(
166
                    collision=collision, name=cls.__name__))
167
168
169
def _custom_new(cls, subclasses):
170
    basic_new = _make_nested_new(cls, subclasses, _enum_super(cls))
171
    if _set_new_functions(cls, basic_new):
172
        augmented_new = _make_nested_new(cls, subclasses, cls.__new__)
173
        cls.__new__ = augmented_new
174
175
176
def _args_from_annotations(cls):
177
    args = {}
178
    for superclass in reversed(cls.__mro__):
179
        for key, value in getattr(superclass, '__annotations__', {}).items():
180
            _nillable_write(
181
                args, key, _args(value, vars(sys.modules[superclass.__module__])))
182
    return args
183
184
185
def _process_class(_cls, _repr, eq, order):
186
    if order and not eq:
187
        raise ValueError('eq must be true if order is true')
188
189
    subclasses = set()
190
    subclass_order = []
191
    args = _args_from_annotations(_cls)
192
193
    for name, args_ in args.items():
194
        make_constructor(_cls, name, args_, subclasses, subclass_order)
195
196
    SUBCLASS_ORDER[_cls] = tuple(subclass_order)
197
198
    _cls.__init_subclass__ = PrewrittenMethods.__init_subclass__
199
200
    _custom_new(_cls, subclasses)
201
202
    _set_new_functions(
203
        _cls, PrewrittenMethods.__setattr__, PrewrittenMethods.__delattr__)
204
    _set_new_functions(_cls, PrewrittenMethods.__bool__)
205
206
    _add_methods(_cls, _repr, PrewrittenMethods.__repr__)
207
208
    equality_methods_were_set = _add_methods(
209
        _cls, eq, PrewrittenMethods.__eq__, PrewrittenMethods.__ne__)
210
211
    _set_hash(_cls, equality_methods_were_set)
212
213
    _add_order(_cls, order, equality_methods_were_set)
214
215
    return _cls
216
217
218
def enum(_cls=None, *, repr=True, eq=True, order=False):
219
    """Decorate a class to be an algebraic data type."""
220
221
    def wrap(cls):
222
        """Return the processed class."""
223
        return _process_class(cls, repr, eq, order)
224
225
    if _cls is None:
226
        return wrap
227
228
    return wrap(_cls)
229
230
231
__all__ = ['Ctor', 'enum']
232