Passed
Push — master ( 32cd5d...2f7d9b )
by Max
50s
created

structured_data.adt   F

Complexity

Total Complexity 70

Size/Duplication

Total Lines 435
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 245
dl 0
loc 435
rs 2.8
c 0
b 0
f 0
wmc 70

10 Functions

Rating   Name   Duplication   Size   Complexity  
A _name() 0 3 1
A _all_annotations() 0 6 3
A _sum_new() 0 12 2
A _add_methods() 0 5 2
A _cant_set_new_functions() 0 11 3
A _sum_args_from_annotations() 0 7 2
A _nillable_write() 0 5 2
A _product_new() 0 22 1
A _product_args_from_annotations() 0 9 3
A _set_new_functions() 0 12 3

13 Methods

Rating   Name   Duplication   Size   Complexity  
A Product.__getattribute__() 0 8 3
A Product.__gt__() 0 5 2
A Product.__lt__() 0 5 2
A Product.__repr__() 0 5 2
A Product.__le__() 0 5 2
A Product.__hash__() 0 5 2
A Product.__dir__() 0 2 1
B Product.__new__() 0 14 6
C Sum.__init_subclass__() 0 50 9
D Product.__init_subclass__() 0 55 13
A Product.__eq__() 0 5 2
A Product.__ne__() 0 5 2
A Product.__ge__() 0 5 2

How to fix   Complexity   

Complexity

Complex classes like structured_data.adt often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""Base classes for defining abstract data types.
2
3
This module provides three public members, which are used together.
4
5
Given a structure, possibly a choice of different structures, that you'd like
6
to associate with a type:
7
8
- First, create a class, that subclasses the Sum class.
9
- Then, for each possible structure, add an attribute annotation to the class
10
  with the desired name of the constructor, and a type of ``Ctor``, with the
11
  types within the constructor as arguments.
12
13
To look inside an ADT instance, use the functions from the
14
:mod:`structured_data.match` module.
15
16
Putting it together:
17
18
>>> from structured_data import match
19
>>> class Example(Sum):
20
...     FirstConstructor: Ctor[int, str]
21
...     SecondConstructor: Ctor[bytes]
22
...     ThirdConstructor: Ctor
23
...     def __iter__(self):
24
...         matchable = match.Matchable(self)
25
...         if matchable(Example.FirstConstructor(match.pat.count, match.pat.string)):
26
...             count, string = matchable[match.pat.count, match.pat.string]
27
...             for _ in range(count):
28
...                 yield string
29
...         elif matchable(Example.SecondConstructor(match.pat.bytes)):
30
...             bytes_ = matchable[match.pat.bytes]
31
...             for byte in bytes_:
32
...                 yield chr(byte)
33
...         elif matchable(Example.ThirdConstructor()):
34
...             yield "Third"
35
...             yield "Constructor"
36
>>> list(Example.FirstConstructor(5, "abc"))
37
['abc', 'abc', 'abc', 'abc', 'abc']
38
>>> list(Example.SecondConstructor(b"abc"))
39
['a', 'b', 'c']
40
>>> list(Example.ThirdConstructor())
41
['Third', 'Constructor']
42
"""
43
44
import inspect
45
import sys
46
import typing
47
48
from ._adt_constructor import ADTConstructor
49
from ._adt_constructor import make_constructor
50
from ._ctor import get_args
51
from ._prewritten_methods import SUBCLASS_ORDER
52
from ._prewritten_methods import PrewrittenProductMethods
53
from ._prewritten_methods import PrewrittenSumMethods
54
55
_T = typing.TypeVar("_T")
56
57
58
if typing.TYPE_CHECKING:  # pragma: nocover
59
60
    class Ctor:
61
        """Dummy class for type-checking purposes."""
62
63
    class ConcreteCtor(typing.Generic[_T]):
64
        """Wrapper class for type-checking purposes.
65
66
        The type parameter should be a Tuple type of fixed size.
67
        Classes containing this annotation (meaning they haven't been
68
        processed by the ``adt`` decorator) should not be instantiated.
69
        """
70
71
72
else:
73
    from ._ctor import Ctor
74
75
76
def _name(cls: typing.Type[_T], function) -> str:
77
    """Return the name of a function accessed through a descriptor."""
78
    return function.__get__(None, cls).__name__
79
80
81
def _cant_set_new_functions(cls: typing.Type[_T], *functions) -> typing.Optional[str]:
82
    for function in functions:
83
        name = _name(cls, function)
84
        existing = getattr(cls, name, None)
85
        if existing not in (
86
            getattr(object, name, None),
87
            getattr(Product, name, None),
88
            None,
89
            function,
90
        ):
91
            return name
92
93
94
def _set_new_functions(cls: typing.Type[_T], *functions) -> typing.Optional[str]:
95
    """Attempt to set the attributes corresponding to the functions on cls.
96
97
    If any attributes are already defined, fail *before* setting any, and
98
    return the already-defined name.
99
    """
100
    cant_set = _cant_set_new_functions(cls, *functions)
101
    if cant_set:
102
        return cant_set
103
    for function in functions:
104
        setattr(cls, _name(cls, function), function)
105
    return None
106
107
108
_K = typing.TypeVar("_K")
109
_V = typing.TypeVar("_V")
110
111
112
def _nillable_write(dct: typing.Dict[_K, _V], key: _K, value: typing.Optional[_V]):
113
    if value is None:
114
        dct.pop(key, typing.cast(_V, None))
115
    else:
116
        dct[key] = value
117
118
119
def _add_methods(cls: typing.Type[_T], do_set, *methods):
120
    methods_were_set = False
121
    if do_set:
122
        methods_were_set = not _set_new_functions(cls, *methods)
123
    return methods_were_set
124
125
126
def _sum_new(_cls: typing.Type[_T], subclasses):
127
    def base(cls, args):
128
        return super(_cls, cls).__new__(cls, args)
129
130
    new = _cls.__dict__.get("__new__", staticmethod(base))
131
132
    def __new__(cls, args):
133
        if cls not in subclasses:
134
            raise TypeError
135
        return new.__get__(None, cls)(cls, args)
136
137
    _cls.__new__ = staticmethod(__new__)  # type: ignore
138
139
140
def _product_new(
141
    _cls: typing.Type[_T],
142
    annotations: typing.Dict[str, typing.Any],
143
    defaults: typing.Dict[str, typing.Any],
144
):
145
    def __new__(*args, **kwargs):
146
        cls, *args = args
147
        return super(_cls, cls).__new__(cls, *args, **kwargs)
148
149
    __new__.__signature__ = inspect.signature(__new__).replace(
150
        parameters=[inspect.Parameter("cls", inspect.Parameter.POSITIONAL_ONLY)]
151
        + [
152
            inspect.Parameter(
153
                field,
154
                inspect.Parameter.POSITIONAL_OR_KEYWORD,
155
                annotation=annotation,
156
                default=defaults.get(field, inspect.Parameter.empty),
157
            )
158
            for (field, annotation) in annotations.items()
159
        ]
160
    )
161
    _cls.__new__ = __new__
162
163
164
def _all_annotations(
165
    cls: typing.Type[_T]
166
) -> typing.Iterator[typing.Tuple[typing.Type[_T], str, typing.Any]]:
167
    for superclass in reversed(cls.__mro__):
168
        for key, value in vars(superclass).get("__annotations__", {}).items():
169
            yield (superclass, key, value)
170
171
172
def _sum_args_from_annotations(cls: typing.Type[_T]) -> typing.Dict[str, typing.Tuple]:
173
    args: typing.Dict[str, typing.Tuple] = {}
174
    for superclass, key, value in _all_annotations(cls):
175
        _nillable_write(
176
            args, key, get_args(value, vars(sys.modules[superclass.__module__]))
177
        )
178
    return args
179
180
181
def _product_args_from_annotations(
182
    cls: typing.Type[_T]
183
) -> typing.Dict[str, typing.Any]:
184
    args: typing.Dict[str, typing.Any] = {}
185
    for _, key, value in _all_annotations(cls):
186
        if value == "None":
187
            value = None
188
        _nillable_write(args, key, value)
189
    return args
190
191
192
class Sum:
193
    """Base class of classes with disjoint constructors.
194
195
    Examines PEP 526 __annotations__ to determine subclasses.
196
197
    If repr is true, a __repr__() method is added to the class.
198
    If order is true, rich comparison dunder methods are added.
199
200
    The Sum class examines the class to find Ctor annotations.
201
    A Ctor annotation is the adt.Ctor class itself, or the result of indexing
202
    the class, either with a single type hint, or a tuple of type hints.
203
    All other annotations are ignored.
204
205
    The subclass is not subclassable, but has subclasses at each of the
206
    names that had Ctor annotations. Each subclass takes a fixed number of
207
    arguments, corresponding to the type hints given to its annotation, if any.
208
    """
209
210
    __slots__ = ()
211
212
    def __init_subclass__(cls, *, repr=True, eq=True, order=False, **kwargs):
213
        super().__init_subclass__(**kwargs)
214
        if issubclass(cls, ADTConstructor):
215
            return
216
        if order and not eq:
217
            raise ValueError("eq must be true if order is true")
218
219
        subclass_order: typing.List[typing.Type[_T]] = []
220
221
        for name, args in _sum_args_from_annotations(cls).items():
222
            make_constructor(cls, name, args, subclass_order)
223
224
        SUBCLASS_ORDER[cls] = tuple(subclass_order)
225
226
        cls.__init_subclass__ = PrewrittenSumMethods.__init_subclass__  # type: ignore
227
228
        _sum_new(cls, frozenset(subclass_order))
229
230
        _set_new_functions(
231
            cls, PrewrittenSumMethods.__setattr__, PrewrittenSumMethods.__delattr__
232
        )
233
        _set_new_functions(cls, PrewrittenSumMethods.__bool__)
234
235
        _add_methods(cls, repr, PrewrittenSumMethods.__repr__)
236
237
        equality_methods_were_set = _add_methods(
238
            cls, eq, PrewrittenSumMethods.__eq__, PrewrittenSumMethods.__ne__
239
        )
240
241
        if equality_methods_were_set:
242
            cls.__hash__ = PrewrittenSumMethods.__hash__
243
244
        if order:
245
246
            if not equality_methods_were_set:
247
                raise ValueError(
248
                    "Can't add ordering methods if equality methods are provided."
249
                )
250
            collision = _set_new_functions(
251
                cls,
252
                PrewrittenSumMethods.__lt__,
253
                PrewrittenSumMethods.__le__,
254
                PrewrittenSumMethods.__gt__,
255
                PrewrittenSumMethods.__ge__,
256
            )
257
            if collision:
258
                raise TypeError(
259
                    "Cannot overwrite attribute {collision} in class "
260
                    "{name}. Consider using functools.total_ordering".format(
261
                        collision=collision, name=cls.__name__
262
                    )
263
                )
264
265
266
class Product(ADTConstructor, tuple):
267
    """Base class of classes with typed fields.
268
269
    Examines PEP 526 __annotations__ to determine fields.
270
271
    If repr is true, a __repr__() method is added to the class.
272
    If order is true, rich comparison dunder methods are added.
273
274
    The Product class examines the class to find annotations.
275
    Annotations with a value of "None" are discarded.
276
    Fields may have default values, and can be set to inspect.empty to
277
    indicate "no default".
278
279
    The subclass is subclassable. The implementation was designed with a focus
280
    on flexibility over ideals of purity, and therefore provides various
281
    optional facilities that conflict with, for example, Liskov
282
    substitutability. For the purposes of matching, each class is considered
283
    distinct.
284
    """
285
286
    __slots__ = ()
287
288
    def __new__(*args, **kwargs):
289
        cls, *args = args
290
        values = cls.__defaults.copy()
291
        fields_iter = iter(cls.__annotations)
292
        for arg, field in zip(args, fields_iter):
293
            values[field] = arg
294
        for field in fields_iter:
295
            if field in values and field not in kwargs:
296
                continue
297
            values[field] = kwargs.pop(field)
298
        if kwargs:
299
            raise TypeError(kwargs)
300
        return super(Product, cls).__new__(
301
            cls, [values[field] for field in cls.__annotations]
302
        )
303
304
    __repr = True
305
    __eq = True
306
    __order = False
307
    __eq_succeeded = None
308
309
    def __init_subclass__(cls, *, repr=None, eq=None, order=None, **kwargs):
310
        super().__init_subclass__(**kwargs)
311
312
        if repr is not None:
313
            cls.__repr = repr
314
        if eq is not None:
315
            cls.__eq = eq
316
        if order is not None:
317
            cls.__order = order
318
319
        if cls.__order and not cls.__eq:
320
            raise ValueError("eq must be true if order is true")
321
322
        cls.__annotations = _product_args_from_annotations(cls)
323
        cls.__fields = {field: index for (index, field) in enumerate(cls.__annotations)}
324
325
        cls.__defaults = {}
326
        field_names = iter(reversed(tuple(cls.__annotations)))
327
        for field in field_names:
328
            default = getattr(cls, field, inspect.Parameter.empty)
329
            if default is inspect.Parameter.empty:
330
                break
331
            cls.__defaults[field] = default
332
        if any(
333
            getattr(cls, field, inspect.Parameter.empty) is not inspect.Parameter.empty
334
            for field in field_names
335
        ):
336
            raise TypeError
337
338
        _product_new(cls, cls.__annotations, cls.__defaults)
339
340
        cls.__eq_succeeded = False
341
        if cls.__eq:
342
            cls.__eq_succeeded = not _cant_set_new_functions(
343
                cls, PrewrittenProductMethods.__eq__, PrewrittenProductMethods.__ne__
344
            )
345
346
        if order:
347
348
            if not cls.__eq_succeeded:
349
                raise ValueError(
350
                    "Can't add ordering methods if equality methods are provided."
351
                )
352
            collision = _cant_set_new_functions(
353
                cls,
354
                PrewrittenProductMethods.__lt__,
355
                PrewrittenProductMethods.__le__,
356
                PrewrittenProductMethods.__gt__,
357
                PrewrittenProductMethods.__ge__,
358
            )
359
            if collision:
360
                raise TypeError(
361
                    "Cannot overwrite attribute {collision} in class "
362
                    "{name}. Consider using functools.total_ordering".format(
363
                        collision=collision, name=cls.__name__
364
                    )
365
                )
366
367
    def __dir__(self):
368
        return super().__dir__() + list(self.__fields)
369
370
    def __getattribute__(self, name):
371
        try:
372
            return super().__getattribute__(name)
373
        except AttributeError:
374
            index = self.__fields.get(name)
375
            if index is None:
376
                raise
377
            return tuple.__getitem__(self, index)
378
379
    __setattr__ = PrewrittenProductMethods.__setattr__
380
    __delattr__ = PrewrittenProductMethods.__delattr__
381
    __bool__ = PrewrittenProductMethods.__bool__
382
383
    # TODO: replace sets in __init_subclass__ with checks
384
385
    @property
386
    def __repr__(self):
387
        if self.__repr:
388
            return PrewrittenProductMethods.__repr__.__get__(self, type(self))
389
        return super().__repr__
390
391
    @property
392
    def __hash__(self):
393
        if self.__eq_succeeded:
394
            return PrewrittenProductMethods.__hash__.__get__(self, type(self))
395
        return super().__hash__
396
397
    @property
398
    def __eq__(self):
399
        if self.__eq_succeeded:
400
            return PrewrittenProductMethods.__eq__.__get__(self, type(self))
401
        return super().__eq__
402
403
    @property
404
    def __ne__(self):
405
        if self.__eq_succeeded:
406
            return PrewrittenProductMethods.__ne__.__get__(self, type(self))
407
        return super().__ne__
408
409
    @property
410
    def __lt__(self):
411
        if self.__order:
412
            return PrewrittenProductMethods.__lt__.__get__(self, type(self))
413
        return super().__lt__
414
415
    @property
416
    def __le__(self):
417
        if self.__order:
418
            return PrewrittenProductMethods.__le__.__get__(self, type(self))
419
        return super().__le__
420
421
    @property
422
    def __gt__(self):
423
        if self.__order:
424
            return PrewrittenProductMethods.__gt__.__get__(self, type(self))
425
        return super().__gt__
426
427
    @property
428
    def __ge__(self):
429
        if self.__order:
430
            return PrewrittenProductMethods.__ge__.__get__(self, type(self))
431
        return super().__ge__
432
433
434
__all__ = ["Ctor", "Product", "Sum"]
435