Passed
Push — master ( 60734f...5235f1 )
by Max
01:10
created

structured_data.adt   F

Complexity

Total Complexity 73

Size/Duplication

Total Lines 454
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 253
dl 0
loc 454
rs 2.56
c 0
b 0
f 0
wmc 73

10 Functions

Rating   Name   Duplication   Size   Complexity  
A _name() 0 3 1
A _all_annotations() 0 6 3
A _sum_new() 0 12 2
A _add_methods() 0 5 2
A _cant_set_new_functions() 0 12 3
A _sum_args_from_annotations() 0 7 2
A _nillable_write() 0 5 2
A _product_new() 0 22 1
A _product_args_from_annotations() 0 9 3
A _set_new_functions() 0 12 3

14 Methods

Rating   Name   Duplication   Size   Complexity  
A Sum.__new__() 0 5 2
A Product.__getattribute__() 0 8 3
A Product.__gt__() 0 7 2
A Product.__lt__() 0 7 2
A Product.__repr__() 0 5 2
A Product.__hash__() 0 5 2
A Product.__le__() 0 7 2
A Product.__dir__() 0 2 1
B Product.__new__() 0 16 7
D Product.__init_subclass__() 0 55 13
C Sum.__init_subclass__() 0 50 9
A Product.__eq__() 0 7 2
A Product.__ne__() 0 7 2
A Product.__ge__() 0 7 2

How to fix   Complexity   

Complexity

Complex classes like structured_data.adt often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""Base classes for defining abstract data types.
2
3
This module provides three public members, which are used together.
4
5
Given a structure, possibly a choice of different structures, that you'd like
6
to associate with a type:
7
8
- First, create a class, that subclasses the Sum class.
9
- Then, for each possible structure, add an attribute annotation to the class
10
  with the desired name of the constructor, and a type of ``Ctor``, with the
11
  types within the constructor as arguments.
12
13
To look inside an ADT instance, use the functions from the
14
:mod:`structured_data.match` module.
15
16
Putting it together:
17
18
>>> from structured_data import match
19
>>> class Example(Sum):
20
...     FirstConstructor: Ctor[int, str]
21
...     SecondConstructor: Ctor[bytes]
22
...     ThirdConstructor: Ctor
23
...     def __iter__(self):
24
...         matchable = match.Matchable(self)
25
...         if matchable(Example.FirstConstructor(match.pat.count, match.pat.string)):
26
...             count, string = matchable[match.pat.count, match.pat.string]
27
...             for _ in range(count):
28
...                 yield string
29
...         elif matchable(Example.SecondConstructor(match.pat.bytes)):
30
...             bytes_ = matchable[match.pat.bytes]
31
...             for byte in bytes_:
32
...                 yield chr(byte)
33
...         elif matchable(Example.ThirdConstructor()):
34
...             yield "Third"
35
...             yield "Constructor"
36
>>> list(Example.FirstConstructor(5, "abc"))
37
['abc', 'abc', 'abc', 'abc', 'abc']
38
>>> list(Example.SecondConstructor(b"abc"))
39
['a', 'b', 'c']
40
>>> list(Example.ThirdConstructor())
41
['Third', 'Constructor']
42
"""
43
44
import inspect
45
import sys
46
import typing
47
48
from ._adt_constructor import ADTConstructor
49
from ._adt_constructor import make_constructor
50
from ._ctor import get_args
51
from ._prewritten_methods import SUBCLASS_ORDER
52
from ._prewritten_methods import PrewrittenProductMethods
53
from ._prewritten_methods import PrewrittenSumMethods
54
55
_T = typing.TypeVar("_T")
56
57
58
if typing.TYPE_CHECKING:  # pragma: nocover
59
60
    class Ctor:
61
        """Dummy class for type-checking purposes."""
62
63
    class ConcreteCtor(typing.Generic[_T]):
64
        """Wrapper class for type-checking purposes.
65
66
        The type parameter should be a Tuple type of fixed size.
67
        Classes containing this annotation (meaning they haven't been
68
        processed by the ``adt`` decorator) should not be instantiated.
69
        """
70
71
72
else:
73
    from ._ctor import Ctor
74
75
76
def _name(cls: typing.Type[_T], function) -> str:
77
    """Return the name of a function accessed through a descriptor."""
78
    return function.__get__(None, cls).__name__
79
80
81
def _cant_set_new_functions(cls: typing.Type[_T], *functions) -> typing.Optional[str]:
82
    for function in functions:
83
        name = _name(cls, function)
84
        existing = getattr(cls, name, None)
85
        if existing not in (
86
            getattr(object, name, None),
87
            getattr(Product, name, None),
88
            None,
89
            function,
90
        ):
91
            return name
92
    return None
93
94
95
def _set_new_functions(cls: typing.Type[_T], *functions) -> typing.Optional[str]:
96
    """Attempt to set the attributes corresponding to the functions on cls.
97
98
    If any attributes are already defined, fail *before* setting any, and
99
    return the already-defined name.
100
    """
101
    cant_set = _cant_set_new_functions(cls, *functions)
102
    if cant_set:
103
        return cant_set
104
    for function in functions:
105
        setattr(cls, _name(cls, function), function)
106
    return None
107
108
109
_K = typing.TypeVar("_K")
110
_V = typing.TypeVar("_V")
111
112
113
def _nillable_write(dct: typing.Dict[_K, _V], key: _K, value: typing.Optional[_V]):
114
    if value is None:
115
        dct.pop(key, typing.cast(_V, None))
116
    else:
117
        dct[key] = value
118
119
120
def _add_methods(cls: typing.Type[_T], do_set, *methods):
121
    methods_were_set = False
122
    if do_set:
123
        methods_were_set = not _set_new_functions(cls, *methods)
124
    return methods_were_set
125
126
127
def _sum_new(_cls: typing.Type[_T], subclasses):
128
    def base(cls, args):
129
        return super(_cls, cls).__new__(cls, args)
130
131
    new = _cls.__dict__.get("__new__", staticmethod(base))
132
133
    def __new__(cls, args):
134
        if cls not in subclasses:
135
            raise TypeError
136
        return new.__get__(None, cls)(cls, args)
137
138
    _cls.__new__ = staticmethod(__new__)  # type: ignore
139
140
141
def _product_new(
142
    _cls: typing.Type[_T],
143
    annotations: typing.Dict[str, typing.Any],
144
    defaults: typing.Dict[str, typing.Any],
145
):
146
    def __new__(*args, **kwargs):
147
        cls, *args = args
148
        return super(_cls, cls).__new__(cls, *args, **kwargs)
149
150
    __new__.__signature__ = inspect.signature(__new__).replace(
151
        parameters=[inspect.Parameter("cls", inspect.Parameter.POSITIONAL_ONLY)]
152
        + [
153
            inspect.Parameter(
154
                field,
155
                inspect.Parameter.POSITIONAL_OR_KEYWORD,
156
                annotation=annotation,
157
                default=defaults.get(field, inspect.Parameter.empty),
158
            )
159
            for (field, annotation) in annotations.items()
160
        ]
161
    )
162
    _cls.__new__ = __new__
163
164
165
def _all_annotations(
166
    cls: typing.Type[_T]
167
) -> typing.Iterator[typing.Tuple[typing.Type[_T], str, typing.Any]]:
168
    for superclass in reversed(cls.__mro__):
169
        for key, value in vars(superclass).get("__annotations__", {}).items():
170
            yield (superclass, key, value)
171
172
173
def _sum_args_from_annotations(cls: typing.Type[_T]) -> typing.Dict[str, typing.Tuple]:
174
    args: typing.Dict[str, typing.Tuple] = {}
175
    for superclass, key, value in _all_annotations(cls):
176
        _nillable_write(
177
            args, key, get_args(value, vars(sys.modules[superclass.__module__]))
178
        )
179
    return args
180
181
182
def _product_args_from_annotations(
183
    cls: typing.Type[_T]
184
) -> typing.Dict[str, typing.Any]:
185
    args: typing.Dict[str, typing.Any] = {}
186
    for _, key, value in _all_annotations(cls):
187
        if value == "None":
188
            value = None
189
        _nillable_write(args, key, value)
190
    return args
191
192
193
class Sum:
194
    """Base class of classes with disjoint constructors.
195
196
    Examines PEP 526 __annotations__ to determine subclasses.
197
198
    If repr is true, a __repr__() method is added to the class.
199
    If order is true, rich comparison dunder methods are added.
200
201
    The Sum class examines the class to find Ctor annotations.
202
    A Ctor annotation is the adt.Ctor class itself, or the result of indexing
203
    the class, either with a single type hint, or a tuple of type hints.
204
    All other annotations are ignored.
205
206
    The subclass is not subclassable, but has subclasses at each of the
207
    names that had Ctor annotations. Each subclass takes a fixed number of
208
    arguments, corresponding to the type hints given to its annotation, if any.
209
    """
210
211
    __slots__ = ()
212
213
    def __new__(*args, **kwargs):
214
        cls, *args = args
215
        if not issubclass(cls, ADTConstructor):
216
            raise TypeError
217
        return super(Sum, cls).__new__(cls, *args, **kwargs)
218
219
    def __init_subclass__(cls, *, repr=True, eq=True, order=False, **kwargs):
220
        super().__init_subclass__(**kwargs)
221
        if issubclass(cls, ADTConstructor):
222
            return
223
        if order and not eq:
224
            raise ValueError("eq must be true if order is true")
225
226
        subclass_order: typing.List[typing.Type[_T]] = []
227
228
        for name, args in _sum_args_from_annotations(cls).items():
229
            make_constructor(cls, name, args, subclass_order)
230
231
        SUBCLASS_ORDER[cls] = tuple(subclass_order)
232
233
        cls.__init_subclass__ = PrewrittenSumMethods.__init_subclass__  # type: ignore
234
235
        _sum_new(cls, frozenset(subclass_order))
236
237
        _set_new_functions(
238
            cls, PrewrittenSumMethods.__setattr__, PrewrittenSumMethods.__delattr__
239
        )
240
        _set_new_functions(cls, PrewrittenSumMethods.__bool__)
241
242
        _add_methods(cls, repr, PrewrittenSumMethods.__repr__)
243
244
        equality_methods_were_set = _add_methods(
245
            cls, eq, PrewrittenSumMethods.__eq__, PrewrittenSumMethods.__ne__
246
        )
247
248
        if equality_methods_were_set:
249
            cls.__hash__ = PrewrittenSumMethods.__hash__
250
251
        if order:
252
253
            if not equality_methods_were_set:
254
                raise ValueError(
255
                    "Can't add ordering methods if equality methods are provided."
256
                )
257
            collision = _set_new_functions(
258
                cls,
259
                PrewrittenSumMethods.__lt__,
260
                PrewrittenSumMethods.__le__,
261
                PrewrittenSumMethods.__gt__,
262
                PrewrittenSumMethods.__ge__,
263
            )
264
            if collision:
265
                raise TypeError(
266
                    "Cannot overwrite attribute {collision} in class "
267
                    "{name}. Consider using functools.total_ordering".format(
268
                        collision=collision, name=cls.__name__
269
                    )
270
                )
271
272
273
class Product(ADTConstructor, tuple):
274
    """Base class of classes with typed fields.
275
276
    Examines PEP 526 __annotations__ to determine fields.
277
278
    If repr is true, a __repr__() method is added to the class.
279
    If order is true, rich comparison dunder methods are added.
280
281
    The Product class examines the class to find annotations.
282
    Annotations with a value of "None" are discarded.
283
    Fields may have default values, and can be set to inspect.empty to
284
    indicate "no default".
285
286
    The subclass is subclassable. The implementation was designed with a focus
287
    on flexibility over ideals of purity, and therefore provides various
288
    optional facilities that conflict with, for example, Liskov
289
    substitutability. For the purposes of matching, each class is considered
290
    distinct.
291
    """
292
293
    __slots__ = ()
294
295
    def __new__(*args, **kwargs):  # pylint: disable=no-method-argument
296
        cls, *args = args
297
        if cls is Product:
298
            raise TypeError
299
        values = cls.__defaults.copy()
300
        fields_iter = iter(cls.__annotations)
301
        for arg, field in zip(args, fields_iter):
302
            values[field] = arg
303
        for field in fields_iter:
304
            if field in values and field not in kwargs:
305
                continue
306
            values[field] = kwargs.pop(field)
307
        if kwargs:
308
            raise TypeError(kwargs)
309
        return super(Product, cls).__new__(
310
            cls, [values[field] for field in cls.__annotations]
311
        )
312
313
    __repr = True
314
    __eq = True
315
    __order = False
316
    __eq_succeeded = None
317
318
    def __init_subclass__(cls, *, repr=None, eq=None, order=None, **kwargs):
319
        super().__init_subclass__(**kwargs)
320
321
        if repr is not None:
322
            cls.__repr = repr
323
        if eq is not None:
324
            cls.__eq = eq
325
        if order is not None:
326
            cls.__order = order
327
328
        if cls.__order and not cls.__eq:
329
            raise ValueError("eq must be true if order is true")
330
331
        cls.__annotations = _product_args_from_annotations(cls)
332
        cls.__fields = {field: index for (index, field) in enumerate(cls.__annotations)}
333
334
        cls.__defaults = {}
335
        field_names = iter(reversed(tuple(cls.__annotations)))
336
        for field in field_names:
337
            default = getattr(cls, field, inspect.Parameter.empty)
338
            if default is inspect.Parameter.empty:
339
                break
340
            cls.__defaults[field] = default
341
        if any(
342
            getattr(cls, field, inspect.Parameter.empty) is not inspect.Parameter.empty
343
            for field in field_names
344
        ):
345
            raise TypeError
346
347
        _product_new(cls, cls.__annotations, cls.__defaults)
348
349
        cls.__eq_succeeded = False
350
        if cls.__eq:
351
            cls.__eq_succeeded = not _cant_set_new_functions(
352
                cls, PrewrittenProductMethods.__eq__, PrewrittenProductMethods.__ne__
353
            )
354
355
        if order:
356
357
            if not cls.__eq_succeeded:
358
                raise ValueError(
359
                    "Can't add ordering methods if equality methods are provided."
360
                )
361
            collision = _cant_set_new_functions(
362
                cls,
363
                PrewrittenProductMethods.__lt__,
364
                PrewrittenProductMethods.__le__,
365
                PrewrittenProductMethods.__gt__,
366
                PrewrittenProductMethods.__ge__,
367
            )
368
            if collision:
369
                raise TypeError(
370
                    "Cannot overwrite attribute {collision} in class "
371
                    "{name}. Consider using functools.total_ordering".format(
372
                        collision=collision, name=cls.__name__
373
                    )
374
                )
375
376
    def __dir__(self):
377
        return super().__dir__() + list(self.__fields)
378
379
    def __getattribute__(self, name):
380
        try:
381
            return super().__getattribute__(name)
382
        except AttributeError:
383
            index = self.__fields.get(name)
384
            if index is None:
385
                raise
386
            return tuple.__getitem__(self, index)
387
388
    __setattr__ = PrewrittenProductMethods.__setattr__
389
    __delattr__ = PrewrittenProductMethods.__delattr__
390
    __bool__ = PrewrittenProductMethods.__bool__
391
392
    @property
393
    def __repr__(self):
394
        if self.__repr:
395
            return PrewrittenProductMethods.__repr__.__get__(self, type(self))
396
        return super().__repr__
397
398
    @property
399
    def __hash__(self):
400
        if self.__eq_succeeded:
401
            return PrewrittenProductMethods.__hash__.__get__(self, type(self))
402
        return super().__hash__
403
404
    @property
405
    def __eq__(self):  # pylint: disable=unexpected-special-method-signature
406
        if self.__eq_succeeded:
407
            # I think this is a Pylint bug, but I'm not sure how to reduce it.
408
            # pylint: disable=no-value-for-parameter
409
            return PrewrittenProductMethods.__eq__.__get__(self, type(self))
410
        return super().__eq__
411
412
    @property
413
    def __ne__(self):  # pylint: disable=unexpected-special-method-signature
414
        if self.__eq_succeeded:
415
            # I think this is a Pylint bug, but I'm not sure how to reduce it.
416
            # pylint: disable=no-value-for-parameter
417
            return PrewrittenProductMethods.__ne__.__get__(self, type(self))
418
        return super().__ne__
419
420
    @property
421
    def __lt__(self):  # pylint: disable=unexpected-special-method-signature
422
        if self.__order:
423
            # I think this is a Pylint bug, but I'm not sure how to reduce it.
424
            # pylint: disable=no-value-for-parameter
425
            return PrewrittenProductMethods.__lt__.__get__(self, type(self))
426
        return super().__lt__
427
428
    @property
429
    def __le__(self):  # pylint: disable=unexpected-special-method-signature
430
        if self.__order:
431
            # I think this is a Pylint bug, but I'm not sure how to reduce it.
432
            # pylint: disable=no-value-for-parameter
433
            return PrewrittenProductMethods.__le__.__get__(self, type(self))
434
        return super().__le__
435
436
    @property
437
    def __gt__(self):  # pylint: disable=unexpected-special-method-signature
438
        if self.__order:
439
            # I think this is a Pylint bug, but I'm not sure how to reduce it.
440
            # pylint: disable=no-value-for-parameter
441
            return PrewrittenProductMethods.__gt__.__get__(self, type(self))
442
        return super().__gt__
443
444
    @property
445
    def __ge__(self):  # pylint: disable=unexpected-special-method-signature
446
        if self.__order:
447
            # I think this is a Pylint bug, but I'm not sure how to reduce it.
448
            # pylint: disable=no-value-for-parameter
449
            return PrewrittenProductMethods.__ge__.__get__(self, type(self))
450
        return super().__ge__
451
452
453
__all__ = ["Ctor", "Product", "Sum"]
454