Passed
Push — master ( 93e9b6...c3e849 )
by Max
54s
created

structured_data.adt   B

Complexity

Total Complexity 49

Size/Duplication

Total Lines 396
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 210
dl 0
loc 396
rs 8.48
c 0
b 0
f 0
wmc 49

11 Functions

Rating   Name   Duplication   Size   Complexity  
A _name() 0 3 1
A _cant_set_new_functions() 0 12 3
A _set_new_functions() 0 12 3
A _extract_defaults() 0 6 2
A _sum_new() 0 12 2
A _values_until_non_empty() 0 5 3
A _set_ordering() 0 9 3
A _values_non_empty() 0 6 3
A _unpack_args() 0 9 5
A _product_new() 0 22 1
A _ordering_options_are_valid() 0 3 3

6 Methods

Rating   Name   Duplication   Size   Complexity  
A Sum.__new__() 0 5 2
A Product.__getattribute__() 0 8 3
A Product.__dir__() 0 2 1
A Product.__new__() 0 17 2
B Sum.__init_subclass__() 0 44 6
B Product.__init_subclass__() 0 40 6

How to fix   Complexity   

Complexity

Complex classes like structured_data.adt often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""Base classes for defining abstract data types.
2
3
This module provides three public members, which are used together.
4
5
Given a structure, possibly a choice of different structures, that you'd like
6
to associate with a type:
7
8
- First, create a class, that subclasses the Sum class.
9
- Then, for each possible structure, add an attribute annotation to the class
10
  with the desired name of the constructor, and a type of ``Ctor``, with the
11
  types within the constructor as arguments.
12
13
To look inside an ADT instance, use the functions from the
14
:mod:`structured_data.match` module.
15
16
Putting it together:
17
18
>>> from structured_data import match
19
>>> class Example(Sum):
20
...     FirstConstructor: Ctor[int, str]
21
...     SecondConstructor: Ctor[bytes]
22
...     ThirdConstructor: Ctor
23
...     def __iter__(self):
24
...         matchable = match.Matchable(self)
25
...         if matchable(Example.FirstConstructor(match.pat.count, match.pat.string)):
26
...             count, string = matchable[match.pat.count, match.pat.string]
27
...             for _ in range(count):
28
...                 yield string
29
...         elif matchable(Example.SecondConstructor(match.pat.bytes)):
30
...             bytes_ = matchable[match.pat.bytes]
31
...             for byte in bytes_:
32
...                 yield chr(byte)
33
...         elif matchable(Example.ThirdConstructor()):
34
...             yield "Third"
35
...             yield "Constructor"
36
>>> list(Example.FirstConstructor(5, "abc"))
37
['abc', 'abc', 'abc', 'abc', 'abc']
38
>>> list(Example.SecondConstructor(b"abc"))
39
['a', 'b', 'c']
40
>>> list(Example.ThirdConstructor())
41
['Third', 'Constructor']
42
"""
43
44
import inspect
45
import typing
46
47
from . import _adt_constructor
48
from . import _annotations
49
from . import _conditional_method
50
from . import _prewritten_methods
51
52
_T = typing.TypeVar("_T")
53
54
55
if typing.TYPE_CHECKING:  # pragma: nocover
56
57
    class Ctor:
58
        """Dummy class for type-checking purposes."""
59
60
    class ConcreteCtor(typing.Generic[_T]):
61
        """Wrapper class for type-checking purposes.
62
63
        The type parameter should be a Tuple type of fixed size.
64
        Classes containing this annotation (meaning they haven't been
65
        processed by the ``adt`` decorator) should not be instantiated.
66
        """
67
68
69
else:
70
    from ._ctor import Ctor
71
72
73
# This is fine.
74
def _name(cls: typing.Type[_T], function) -> str:
75
    """Return the name of a function accessed through a descriptor."""
76
    return function.__get__(None, cls).__name__
77
78
79
# This is mostly fine, though the list of classes is somewhat ad-hoc, to say
80
# the least.
81
def _cant_set_new_functions(cls: typing.Type[_T], *functions) -> typing.Optional[str]:
82
    for function in functions:
83
        name = _name(cls, function)
84
        existing = getattr(cls, name, None)
85
        if existing not in (
86
            getattr(object, name, None),
87
            getattr(Product, name, None),
88
            None,
89
            function,
90
        ):
91
            return name
92
    return None
93
94
95
def _set_new_functions(cls: typing.Type[_T], *functions) -> typing.Optional[str]:
96
    """Attempt to set the attributes corresponding to the functions on cls.
97
98
    If any attributes are already defined, fail *before* setting any, and
99
    return the already-defined name.
100
    """
101
    cant_set = _cant_set_new_functions(cls, *functions)
102
    if cant_set:
103
        return cant_set
104
    for function in functions:
105
        setattr(cls, _name(cls, function), function)
106
    return None
107
108
109
def _sum_new(_cls: typing.Type[_T], subclasses):
110
    def base(cls, args):
111
        return super(_cls, cls).__new__(cls, args)
112
113
    new = _cls.__dict__.get("__new__", staticmethod(base))
114
115
    def __new__(cls, args):
116
        if cls not in subclasses:
117
            raise TypeError
118
        return new.__get__(None, cls)(cls, args)
119
120
    _cls.__new__ = staticmethod(__new__)  # type: ignore
121
122
123
def _product_new(
124
    _cls: typing.Type[_T],
125
    annotations: typing.Dict[str, typing.Any],
126
    defaults: typing.Dict[str, typing.Any],
127
):
128
    def __new__(*args, **kwargs):
129
        cls, *args = args
130
        return super(_cls, cls).__new__(cls, *args, **kwargs)
131
132
    __new__.__signature__ = inspect.signature(__new__).replace(
133
        parameters=[inspect.Parameter("cls", inspect.Parameter.POSITIONAL_ONLY)]
134
        + [
135
            inspect.Parameter(
136
                field,
137
                inspect.Parameter.POSITIONAL_OR_KEYWORD,
138
                annotation=annotation,
139
                default=defaults.get(field, inspect.Parameter.empty),
140
            )
141
            for (field, annotation) in annotations.items()
142
        ]
143
    )
144
    _cls.__new__ = __new__
145
146
147
def _ordering_options_are_valid(*, eq, order):
148
    if order and not eq:
149
        raise ValueError("eq must be true if order is true")
150
151
152
def _set_ordering(*, can_set, setter, cls, source):
153
    if not can_set:
154
        raise ValueError("Can't add ordering methods if equality methods are provided.")
155
    collision = setter(cls, source.__lt__, source.__le__, source.__gt__, source.__ge__)
156
    if collision:
157
        raise TypeError(
158
            "Cannot overwrite attribute {collision} in class "
159
            "{name}. Consider using functools.total_ordering".format(
160
                collision=collision, name=cls.__name__
161
            )
162
        )
163
164
165
def _values_non_empty(cls, field_names):
166
    for field in field_names:
167
        default = getattr(cls, field, inspect.Parameter.empty)
168
        if default is inspect.Parameter.empty:
169
            return
170
        yield (field, default)
171
172
173
def _values_until_non_empty(cls, field_names):
174
    for field in field_names:
175
        default = getattr(cls, field, inspect.Parameter.empty)
176
        if default is not inspect.Parameter.empty:
177
            yield
178
179
180
def _extract_defaults(*, cls, annotations):
181
    field_names = iter(reversed(tuple(annotations)))
182
    defaults = dict(_values_non_empty(cls, field_names))
183
    for _ in _values_until_non_empty(cls, field_names):
184
        raise TypeError
185
    return defaults
186
187
188
def _unpack_args(*, args, kwargs, fields, values):
189
    fields_iter = iter(fields)
190
    values.update({field: arg for (arg, field) in zip(args, fields_iter)})
191
    for field in fields_iter:
192
        if field in values and field not in kwargs:
193
            continue
194
        values[field] = kwargs.pop(field)
195
    if kwargs:
196
        raise TypeError(kwargs)
197
198
199
class Sum:
200
    """Base class of classes with disjoint constructors.
201
202
    Examines PEP 526 __annotations__ to determine subclasses.
203
204
    If repr is true, a __repr__() method is added to the class.
205
    If order is true, rich comparison dunder methods are added.
206
207
    The Sum class examines the class to find Ctor annotations.
208
    A Ctor annotation is the adt.Ctor class itself, or the result of indexing
209
    the class, either with a single type hint, or a tuple of type hints.
210
    All other annotations are ignored.
211
212
    The subclass is not subclassable, but has subclasses at each of the
213
    names that had Ctor annotations. Each subclass takes a fixed number of
214
    arguments, corresponding to the type hints given to its annotation, if any.
215
    """
216
217
    __slots__ = ()
218
219
    def __new__(*args, **kwargs):  # pylint: disable=no-method-argument
220
        cls, *args = args
221
        if not issubclass(cls, _adt_constructor.ADTConstructor):
222
            raise TypeError
223
        return super(Sum, cls).__new__(cls, *args, **kwargs)
224
225
    # Both of these are for consistency with modules defined in the stdlib.
226
    # BOOM!
227
    def __init_subclass__(
228
        cls,
229
        *,
230
        repr=True,  # pylint: disable=redefined-builtin
231
        eq=True,  # pylint: disable=invalid-name
232
        order=False,
233
        **kwargs
234
    ):
235
        super().__init_subclass__(**kwargs)
236
        if issubclass(cls, _adt_constructor.ADTConstructor):
237
            return
238
        _ordering_options_are_valid(eq=eq, order=order)
239
240
        _prewritten_methods.SUBCLASS_ORDER[cls] = _adt_constructor.make_constructors(
241
            cls
242
        )
243
244
        source = _prewritten_methods.PrewrittenSumMethods
245
246
        cls.__init_subclass__ = source.__init_subclass__  # type: ignore
247
248
        _sum_new(cls, frozenset(_prewritten_methods.SUBCLASS_ORDER[cls]))
249
250
        _set_new_functions(cls, source.__setattr__, source.__delattr__)
251
        _set_new_functions(cls, source.__bool__)
252
253
        if repr:
254
            _set_new_functions(cls, source.__repr__)
255
256
        equality_methods_were_set = False
257
        if eq:
258
            equality_methods_were_set = not _set_new_functions(
259
                cls, source.__eq__, source.__ne__
260
            )
261
262
        if equality_methods_were_set:
263
            cls.__hash__ = source.__hash__
264
265
        if order:
266
            _set_ordering(
267
                can_set=equality_methods_were_set,
268
                setter=_set_new_functions,
269
                cls=cls,
270
                source=source,
271
            )
272
273
274
class Product(_adt_constructor.ADTConstructor, tuple):
275
    """Base class of classes with typed fields.
276
277
    Examines PEP 526 __annotations__ to determine fields.
278
279
    If repr is true, a __repr__() method is added to the class.
280
    If order is true, rich comparison dunder methods are added.
281
282
    The Product class examines the class to find annotations.
283
    Annotations with a value of "None" are discarded.
284
    Fields may have default values, and can be set to inspect.empty to
285
    indicate "no default".
286
287
    The subclass is subclassable. The implementation was designed with a focus
288
    on flexibility over ideals of purity, and therefore provides various
289
    optional facilities that conflict with, for example, Liskov
290
    substitutability. For the purposes of matching, each class is considered
291
    distinct.
292
    """
293
294
    __slots__ = ()
295
296
    def __new__(*args, **kwargs):  # pylint: disable=no-method-argument
297
        cls, *args = args
298
        if cls is Product:
299
            raise TypeError
300
        # Probably a result of not having positional-only args.
301
        values = cls.__defaults.copy()  # pylint: disable=protected-access
302
        _unpack_args(
303
            args=args,
304
            kwargs=kwargs,
305
            fields=cls.__fields,  # pylint: disable=protected-access
306
            values=values,
307
        )
308
        return super(Product, cls).__new__(
309
            cls,
310
            [
311
                values[field]
312
                for field in cls.__fields  # pylint: disable=protected-access
313
            ],
314
        )
315
316
    __repr = True
317
    __eq = True
318
    __order = False
319
    __eq_succeeded = None
320
321
    # Both of these are for consistency with modules defined in the stdlib.
322
    # BOOM!
323
    def __init_subclass__(
324
        cls,
325
        *,
326
        repr=None,  # pylint: disable=redefined-builtin
327
        eq=None,  # pylint: disable=invalid-name
328
        order=None,
329
        **kwargs
330
    ):
331
        super().__init_subclass__(**kwargs)
332
333
        if repr is not None:
334
            cls.__repr = repr
335
        if eq is not None:
336
            cls.__eq = eq
337
        if order is not None:
338
            cls.__order = order
339
340
        _ordering_options_are_valid(eq=cls.__eq, order=cls.__order)
341
342
        cls.__annotations = _annotations._product_args_from_annotations(cls)
343
        cls.__fields = {field: index for (index, field) in enumerate(cls.__annotations)}
344
345
        cls.__defaults = _extract_defaults(cls=cls, annotations=cls.__annotations)
346
347
        _product_new(cls, cls.__annotations, cls.__defaults)
348
349
        source = _prewritten_methods.PrewrittenProductMethods
350
351
        cls.__eq_succeeded = False
352
        if cls.__eq:
353
            cls.__eq_succeeded = not _cant_set_new_functions(
354
                cls, source.__eq__, source.__ne__
355
            )
356
357
        if cls.__order:
358
            _set_ordering(
359
                can_set=cls.__eq_succeeded,
360
                setter=_cant_set_new_functions,
361
                cls=cls,
362
                source=source,
363
            )
364
365
    def __dir__(self):
366
        return super().__dir__() + list(self.__fields)
367
368
    def __getattribute__(self, name):
369
        try:
370
            return super().__getattribute__(name)
371
        except AttributeError:
372
            index = self.__fields.get(name)
373
            if index is None:
374
                raise
375
            return tuple.__getitem__(self, index)
376
377
    source = _prewritten_methods.PrewrittenProductMethods
378
379
    __setattr__ = source.__setattr__
380
    __delattr__ = source.__delattr__
381
    __bool__ = source.__bool__
382
383
    __repr__ = _conditional_method.conditional_method(source).__repr
384
    __hash__ = _conditional_method.conditional_method(source).__eq_succeeded
385
    __eq__ = _conditional_method.conditional_method(source).__eq_succeeded
386
    __ne__ = _conditional_method.conditional_method(source).__eq_succeeded
387
    __lt__ = _conditional_method.conditional_method(source).__order
388
    __le__ = _conditional_method.conditional_method(source).__order
389
    __gt__ = _conditional_method.conditional_method(source).__order
390
    __ge__ = _conditional_method.conditional_method(source).__order
391
392
    del source
393
394
395
__all__ = ["Ctor", "Product", "Sum"]
396