Passed
Push — master ( fcc3bd...c60240 )
by Max
53s
created

structured_data.adt   B

Complexity

Total Complexity 49

Size/Duplication

Total Lines 339
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 175
dl 0
loc 339
rs 8.48
c 0
b 0
f 0
wmc 49

14 Functions

Rating   Name   Duplication   Size   Complexity  
A _name() 0 3 1
A _add_methods() 0 5 2
A _nillable_write() 0 5 2
A _set_new_functions() 0 13 4
A _make_nested_new() 0 7 2
A _sum_super() 0 5 1
A _sum_new() 0 3 1
A _process_class() 0 31 4
A _sum_args_from_annotations() 0 8 3
A _product_new() 0 22 1
A _set_hash() 0 3 2
A _add_order() 0 14 4
A _product_args_from_annotations() 0 10 4
A _tuple_getter() 0 7 1

3 Methods

Rating   Name   Duplication   Size   Complexity  
B Product.__new__() 0 14 6
A Sum.__init_subclass__() 0 4 2
C Product.__init_subclass__() 0 41 9

How to fix   Complexity   

Complexity

Complex classes like structured_data.adt often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""Class decorator for defining abstract data types.
2
3
This module provides two public members, which are used together.
4
5
Given a structure, possibly a choice of different structures, that you'd like
6
to associate with a type:
7
8
- First, create a class, that subclasses the Sum class.
9
- Then, for each possible structure, add an attribute annotation to the class
10
  with the desired name of the constructor, and a type of ``Ctor``, with the
11
  types within the constructor as arguments.
12
13
To look inside an ADT instance, use the functions from the
14
:mod:`structured_data.match` module.
15
16
Putting it together:
17
18
>>> from structured_data import match
19
>>> class Example(Sum):
20
...     FirstConstructor: Ctor[int, str]
21
...     SecondConstructor: Ctor[bytes]
22
...     ThirdConstructor: Ctor
23
...     def __iter__(self):
24
...         matchable = match.Matchable(self)
25
...         if matchable(Example.FirstConstructor(match.pat.count, match.pat.string)):
26
...             count, string = matchable[match.pat.count, match.pat.string]
27
...             for _ in range(count):
28
...                 yield string
29
...         elif matchable(Example.SecondConstructor(match.pat.bytes)):
30
...             bytes_ = matchable[match.pat.bytes]
31
...             for byte in bytes_:
32
...                 yield chr(byte)
33
...         elif matchable(Example.ThirdConstructor()):
34
...             yield "Third"
35
...             yield "Constructor"
36
>>> list(Example.FirstConstructor(5, "abc"))
37
['abc', 'abc', 'abc', 'abc', 'abc']
38
>>> list(Example.SecondConstructor(b"abc"))
39
['a', 'b', 'c']
40
>>> list(Example.ThirdConstructor())
41
['Third', 'Constructor']
42
"""
43
44
import inspect
45
import sys
46
import typing
47
48
from ._adt_constructor import ADTConstructor
49
from ._adt_constructor import make_constructor
50
from ._ctor import get_args
51
from ._prewritten_methods import SUBCLASS_ORDER
52
from ._prewritten_methods import PrewrittenProductMethods
53
from ._prewritten_methods import PrewrittenSumMethods
54
55
_T = typing.TypeVar("_T")
56
57
58
if typing.TYPE_CHECKING:  # pragma: nocover
59
60
    class Ctor:
61
        """Dummy class for type-checking purposes."""
62
63
    class ConcreteCtor(typing.Generic[_T]):
64
        """Wrapper class for type-checking purposes.
65
66
        The type parameter should be a Tuple type of fixed size.
67
        Classes containing this annotation (meaning they haven't been
68
        processed by the ``adt`` decorator) should not be instantiated.
69
        """
70
71
72
else:
73
    from ._ctor import Ctor
74
75
76
def _name(cls: typing.Type[_T], function) -> str:
77
    """Return the name of a function accessed through a descriptor."""
78
    return function.__get__(None, cls).__name__
79
80
81
def _set_new_functions(cls: typing.Type[_T], *functions) -> typing.Optional[str]:
82
    """Attempt to set the attributes corresponding to the functions on cls.
83
84
    If any attributes are already defined, fail *before* setting any, and
85
    return the already-defined name.
86
    """
87
    for function in functions:
88
        name = _name(cls, function)
89
        if getattr(object, name, None) is not getattr(cls, name, None):
90
            return name
91
    for function in functions:
92
        setattr(cls, _name(cls, function), function)
93
    return None
94
95
96
def _sum_super(_cls: typing.Type[_T]):
97
    def base(cls, args):
98
        return super(_cls, cls).__new__(cls, args)
99
100
    return staticmethod(base)
101
102
103
def _make_nested_new(_cls: typing.Type[_T], subclasses, base__new__):
104
    def __new__(cls, args):
105
        if cls not in subclasses:
106
            raise TypeError
107
        return base__new__.__get__(None, cls)(cls, args)
108
109
    return staticmethod(__new__)
110
111
112
_K = typing.TypeVar("_K")
113
_V = typing.TypeVar("_V")
114
115
116
def _nillable_write(dct: typing.Dict[_K, _V], key: _K, value: typing.Optional[_V]):
117
    if value is None:
118
        dct.pop(key, typing.cast(_V, None))
119
    else:
120
        dct[key] = value
121
122
123
def _add_methods(cls: typing.Type[_T], do_set, *methods):
124
    methods_were_set = False
125
    if do_set:
126
        methods_were_set = not _set_new_functions(cls, *methods)
127
    return methods_were_set
128
129
130
def _set_hash(cls: typing.Type[_T], set_hash, src):
131
    if set_hash:
132
        cls.__hash__ = src.__hash__  # type: ignore
133
134
135
def _add_order(cls: typing.Type[_T], set_order, equality_methods_were_set, src):
136
    if set_order:
137
        if not equality_methods_were_set:
138
            raise ValueError(
139
                "Can't add ordering methods if equality methods are provided."
140
            )
141
        collision = _set_new_functions(
142
            cls, src.__lt__, src.__le__, src.__gt__, src.__ge__
143
        )
144
        if collision:
145
            raise TypeError(
146
                "Cannot overwrite attribute {collision} in class "
147
                "{name}. Consider using functools.total_ordering".format(
148
                    collision=collision, name=cls.__name__
149
                )
150
            )
151
152
153
def _sum_new(cls: typing.Type[_T], subclasses):
154
    new = cls.__dict__.get("__new__", _sum_super(cls))
155
    cls.__new__ = _make_nested_new(cls, subclasses, new)  # type: ignore
156
157
158
_SENTINEL = object()
159
160
161
def _product_new(
162
    _cls: typing.Type[_T],
163
    annotations: typing.Dict[str, typing.Any],
164
    defaults: typing.Dict[str, typing.Any],
165
):
166
    def __new__(*args, **kwargs):
167
        cls, *args = args
168
        return super(_cls, cls).__new__(cls, *args, **kwargs)
169
170
    __new__.__signature__ = inspect.signature(__new__).replace(
171
        parameters=[inspect.Parameter("cls", inspect.Parameter.POSITIONAL_ONLY)]
172
        + [
173
            inspect.Parameter(
174
                field,
175
                inspect.Parameter.POSITIONAL_OR_KEYWORD,
176
                annotation=annotation,
177
                default=defaults.get(field, inspect.Parameter.empty),
178
            )
179
            for (field, annotation) in annotations.items()
180
        ]
181
    )
182
    _cls.__new__ = __new__
183
184
185
def _sum_args_from_annotations(cls: typing.Type[_T]) -> typing.Dict[str, typing.Tuple]:
186
    args: typing.Dict[str, typing.Tuple] = {}
187
    for superclass in reversed(cls.__mro__):
188
        for key, value in vars(superclass).get("__annotations__", {}).items():
189
            _nillable_write(
190
                args, key, get_args(value, vars(sys.modules[superclass.__module__]))
191
            )
192
    return args
193
194
195
def _product_args_from_annotations(
196
    cls: typing.Type[_T]
197
) -> typing.Dict[str, typing.Any]:
198
    args: typing.Dict[str, typing.Any] = {}
199
    for superclass in reversed(cls.__mro__):
200
        for key, value in vars(superclass).get("__annotations__", {}).items():
201
            if value == "None":
202
                value = None
203
            _nillable_write(args, key, value)
204
    return args
205
206
207
def _tuple_getter(index: int):
208
    # TODO: __name__ and __qualname__
209
    @property
210
    def getter(self):
211
        return tuple.__getitem__(self, index)
212
213
    return getter
214
215
216
def _process_class(_cls: typing.Type[_T], _repr, eq, order) -> typing.Type[_T]:
217
    if order and not eq:
218
        raise ValueError("eq must be true if order is true")
219
220
    subclass_order: typing.List[typing.Type[_T]] = []
221
222
    for name, args in _sum_args_from_annotations(_cls).items():
223
        make_constructor(_cls, name, args, subclass_order)
224
225
    SUBCLASS_ORDER[_cls] = tuple(subclass_order)
226
227
    _cls.__init_subclass__ = PrewrittenSumMethods.__init_subclass__  # type: ignore
228
229
    _sum_new(_cls, frozenset(subclass_order))
230
231
    _set_new_functions(
232
        _cls, PrewrittenSumMethods.__setattr__, PrewrittenSumMethods.__delattr__
233
    )
234
    _set_new_functions(_cls, PrewrittenSumMethods.__bool__)
235
236
    _add_methods(_cls, _repr, PrewrittenSumMethods.__repr__)
237
238
    equality_methods_were_set = _add_methods(
239
        _cls, eq, PrewrittenSumMethods.__eq__, PrewrittenSumMethods.__ne__
240
    )
241
242
    _set_hash(_cls, equality_methods_were_set, PrewrittenSumMethods)
243
244
    _add_order(_cls, order, equality_methods_were_set, PrewrittenSumMethods)
245
246
    return _cls
247
248
249
class Sum:
250
    """Base class of classes with disjoint constructors.
251
252
    Examines PEP 526 __annotations__ to determine subclasses.
253
254
    If repr is true, a __repr__() method is added to the class.
255
    If order is true, rich comparison dunder methods are added.
256
257
    The Sum class examines the class to find Ctor annotations.
258
    A Ctor annotation is the adt.Ctor class itself, or the result of indexing
259
    the class, either with a single type hint, or a tuple of type hints.
260
    All other annotations are ignored.
261
262
    The subclass is not subclassable, but has subclasses at each of the
263
    names that had Ctor annotations. Each subclass takes a fixed number of
264
    arguments, corresponding to the type hints given to its annotation, if any.
265
    """
266
267
    __slots__ = ()
268
269
    def __init_subclass__(cls, *, repr=True, eq=True, order=False, **kwargs):
270
        super().__init_subclass__(**kwargs)
271
        if not issubclass(cls, ADTConstructor):
272
            _process_class(cls, repr, eq, order)
273
274
275
class Product(ADTConstructor, tuple):
276
277
    __slots__ = ()
278
279
    def __new__(*args, **kwargs):
280
        cls, *args = args
281
        values = cls.__defaults.copy()
282
        fields_iter = iter(cls.__annotations)
283
        for arg, field in zip(args, fields_iter):
284
            values[field] = arg
285
        for field in fields_iter:
286
            if field in values and field not in kwargs:
287
                continue
288
            values[field] = kwargs.pop(field)
289
        if kwargs:
290
            raise TypeError(kwargs)
291
        return super(Product, cls).__new__(
292
            cls, [values[field] for field in cls.__annotations]
293
        )
294
295
    def __init_subclass__(cls, *, repr=True, eq=True, order=False, **kwargs):
296
        super().__init_subclass__(**kwargs)
297
        if "__annotations__" not in vars(cls):
298
            return
299
        if order and not eq:
300
            raise ValueError("eq must be true if order is true")
301
302
        cls.__annotations = _product_args_from_annotations(cls)
303
304
        cls.__defaults = {}
305
        field_names = iter(reversed(tuple(cls.__annotations)))
306
        for field in field_names:
307
            default = getattr(cls, field, _SENTINEL)
308
            if default is _SENTINEL:
309
                break
310
            cls.__defaults[field] = default
311
        for field in field_names:
312
            if getattr(cls, field, _SENTINEL) is not _SENTINEL:
313
                raise TypeError
314
315
        _product_new(cls, cls.__annotations, cls.__defaults)
316
317
        for index, field in enumerate(cls.__annotations):
318
            setattr(cls, field, _tuple_getter(index))
319
320
        _set_new_functions(
321
            cls,
322
            PrewrittenProductMethods.__setattr__,
323
            PrewrittenProductMethods.__delattr__,
324
        )
325
        _set_new_functions(cls, PrewrittenProductMethods.__bool__)
326
327
        _add_methods(cls, repr, PrewrittenProductMethods.__repr__)
328
329
        equality_methods_were_set = _add_methods(
330
            cls, eq, PrewrittenProductMethods.__eq__, PrewrittenProductMethods.__ne__
331
        )
332
333
        _set_hash(cls, equality_methods_were_set, PrewrittenProductMethods)
334
335
        _add_order(cls, order, equality_methods_were_set, PrewrittenProductMethods)
336
337
338
__all__ = ["Ctor", "Product", "Sum"]
339