Passed
Push — master ( ce73ec...38d9d4 )
by Max
01:00
created

structured_data.adt   C

Complexity

Total Complexity 57

Size/Duplication

Total Lines 425
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 229
dl 0
loc 425
rs 5.04
c 0
b 0
f 0
wmc 57

11 Functions

Rating   Name   Duplication   Size   Complexity  
A _set_new_functions() 0 12 3
A _extract_defaults() 0 6 2
A _sum_new() 0 12 2
A _name() 0 3 1
A _cant_set_new_functions() 0 12 3
A _values_until_non_empty() 0 5 3
A _values_non_empty() 0 6 3
A _set_ordering() 0 9 3
A _unpack_args() 0 9 5
A _product_new() 0 22 1
A _ordering_options_are_valid() 0 3 3

12 Methods

Rating   Name   Duplication   Size   Complexity  
A Sum.__new__() 0 5 2
A _ConditionalMethod.__get__() 0 5 3
A Product.__getattribute__() 0 8 3
A _ConditionalMethod.__set__() 0 3 1
A _ConditionalMethod.__init__() 0 2 1
A _ConditionalMethod.__set_name__() 0 3 1
A Product.__dir__() 0 2 1
A _ConditionalMethod.__getattr__() 0 3 1
A Product.__new__() 0 17 2
B Sum.__init_subclass__() 0 44 6
B Product.__init_subclass__() 0 40 6
A _ConditionalMethod.__delete__() 0 3 1

How to fix   Complexity   

Complexity

Complex classes like structured_data.adt often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""Base classes for defining abstract data types.
2
3
This module provides three public members, which are used together.
4
5
Given a structure, possibly a choice of different structures, that you'd like
6
to associate with a type:
7
8
- First, create a class, that subclasses the Sum class.
9
- Then, for each possible structure, add an attribute annotation to the class
10
  with the desired name of the constructor, and a type of ``Ctor``, with the
11
  types within the constructor as arguments.
12
13
To look inside an ADT instance, use the functions from the
14
:mod:`structured_data.match` module.
15
16
Putting it together:
17
18
>>> from structured_data import match
19
>>> class Example(Sum):
20
...     FirstConstructor: Ctor[int, str]
21
...     SecondConstructor: Ctor[bytes]
22
...     ThirdConstructor: Ctor
23
...     def __iter__(self):
24
...         matchable = match.Matchable(self)
25
...         if matchable(Example.FirstConstructor(match.pat.count, match.pat.string)):
26
...             count, string = matchable[match.pat.count, match.pat.string]
27
...             for _ in range(count):
28
...                 yield string
29
...         elif matchable(Example.SecondConstructor(match.pat.bytes)):
30
...             bytes_ = matchable[match.pat.bytes]
31
...             for byte in bytes_:
32
...                 yield chr(byte)
33
...         elif matchable(Example.ThirdConstructor()):
34
...             yield "Third"
35
...             yield "Constructor"
36
>>> list(Example.FirstConstructor(5, "abc"))
37
['abc', 'abc', 'abc', 'abc', 'abc']
38
>>> list(Example.SecondConstructor(b"abc"))
39
['a', 'b', 'c']
40
>>> list(Example.ThirdConstructor())
41
['Third', 'Constructor']
42
"""
43
44
import inspect
45
import typing
46
47
from . import _adt_constructor
48
from . import _annotations
49
from . import _prewritten_methods
50
51
_T = typing.TypeVar("_T")
52
53
54
if typing.TYPE_CHECKING:  # pragma: nocover
55
56
    class Ctor:
57
        """Dummy class for type-checking purposes."""
58
59
    class ConcreteCtor(typing.Generic[_T]):
60
        """Wrapper class for type-checking purposes.
61
62
        The type parameter should be a Tuple type of fixed size.
63
        Classes containing this annotation (meaning they haven't been
64
        processed by the ``adt`` decorator) should not be instantiated.
65
        """
66
67
68
else:
69
    from ._ctor import Ctor
70
71
72
# This is fine.
73
def _name(cls: typing.Type[_T], function) -> str:
74
    """Return the name of a function accessed through a descriptor."""
75
    return function.__get__(None, cls).__name__
76
77
78
# This is mostly fine, though the list of classes is somewhat ad-hoc, to say
79
# the least.
80
def _cant_set_new_functions(cls: typing.Type[_T], *functions) -> typing.Optional[str]:
81
    for function in functions:
82
        name = _name(cls, function)
83
        existing = getattr(cls, name, None)
84
        if existing not in (
85
            getattr(object, name, None),
86
            getattr(Product, name, None),
87
            None,
88
            function,
89
        ):
90
            return name
91
    return None
92
93
94
def _set_new_functions(cls: typing.Type[_T], *functions) -> typing.Optional[str]:
95
    """Attempt to set the attributes corresponding to the functions on cls.
96
97
    If any attributes are already defined, fail *before* setting any, and
98
    return the already-defined name.
99
    """
100
    cant_set = _cant_set_new_functions(cls, *functions)
101
    if cant_set:
102
        return cant_set
103
    for function in functions:
104
        setattr(cls, _name(cls, function), function)
105
    return None
106
107
108
def _sum_new(_cls: typing.Type[_T], subclasses):
109
    def base(cls, args):
110
        return super(_cls, cls).__new__(cls, args)
111
112
    new = _cls.__dict__.get("__new__", staticmethod(base))
113
114
    def __new__(cls, args):
115
        if cls not in subclasses:
116
            raise TypeError
117
        return new.__get__(None, cls)(cls, args)
118
119
    _cls.__new__ = staticmethod(__new__)  # type: ignore
120
121
122
def _product_new(
123
    _cls: typing.Type[_T],
124
    annotations: typing.Dict[str, typing.Any],
125
    defaults: typing.Dict[str, typing.Any],
126
):
127
    def __new__(*args, **kwargs):
128
        cls, *args = args
129
        return super(_cls, cls).__new__(cls, *args, **kwargs)
130
131
    __new__.__signature__ = inspect.signature(__new__).replace(
132
        parameters=[inspect.Parameter("cls", inspect.Parameter.POSITIONAL_ONLY)]
133
        + [
134
            inspect.Parameter(
135
                field,
136
                inspect.Parameter.POSITIONAL_OR_KEYWORD,
137
                annotation=annotation,
138
                default=defaults.get(field, inspect.Parameter.empty),
139
            )
140
            for (field, annotation) in annotations.items()
141
        ]
142
    )
143
    _cls.__new__ = __new__
144
145
146
def _ordering_options_are_valid(*, eq, order):
147
    if order and not eq:
148
        raise ValueError("eq must be true if order is true")
149
150
151
def _set_ordering(*, can_set, setter, cls, source):
152
    if not can_set:
153
        raise ValueError("Can't add ordering methods if equality methods are provided.")
154
    collision = setter(cls, source.__lt__, source.__le__, source.__gt__, source.__ge__)
155
    if collision:
156
        raise TypeError(
157
            "Cannot overwrite attribute {collision} in class "
158
            "{name}. Consider using functools.total_ordering".format(
159
                collision=collision, name=cls.__name__
160
            )
161
        )
162
163
164
def _values_non_empty(cls, field_names):
165
    for field in field_names:
166
        default = getattr(cls, field, inspect.Parameter.empty)
167
        if default is inspect.Parameter.empty:
168
            return
169
        yield (field, default)
170
171
172
def _values_until_non_empty(cls, field_names):
173
    for field in field_names:
174
        default = getattr(cls, field, inspect.Parameter.empty)
175
        if default is not inspect.Parameter.empty:
176
            yield
177
178
179
def _extract_defaults(*, cls, annotations):
180
    field_names = iter(reversed(tuple(annotations)))
181
    defaults = dict(_values_non_empty(cls, field_names))
182
    for _ in _values_until_non_empty(cls, field_names):
183
        raise TypeError
184
    return defaults
185
186
187
def _unpack_args(*, args, kwargs, fields, values):
188
    fields_iter = iter(fields)
189
    values.update({field: arg for (arg, field) in zip(args, fields_iter)})
190
    for field in fields_iter:
191
        if field in values and field not in kwargs:
192
            continue
193
        values[field] = kwargs.pop(field)
194
    if kwargs:
195
        raise TypeError(kwargs)
196
197
198
class Sum:
199
    """Base class of classes with disjoint constructors.
200
201
    Examines PEP 526 __annotations__ to determine subclasses.
202
203
    If repr is true, a __repr__() method is added to the class.
204
    If order is true, rich comparison dunder methods are added.
205
206
    The Sum class examines the class to find Ctor annotations.
207
    A Ctor annotation is the adt.Ctor class itself, or the result of indexing
208
    the class, either with a single type hint, or a tuple of type hints.
209
    All other annotations are ignored.
210
211
    The subclass is not subclassable, but has subclasses at each of the
212
    names that had Ctor annotations. Each subclass takes a fixed number of
213
    arguments, corresponding to the type hints given to its annotation, if any.
214
    """
215
216
    __slots__ = ()
217
218
    def __new__(*args, **kwargs):  # pylint: disable=no-method-argument
219
        cls, *args = args
220
        if not issubclass(cls, _adt_constructor.ADTConstructor):
221
            raise TypeError
222
        return super(Sum, cls).__new__(cls, *args, **kwargs)
223
224
    # Both of these are for consistency with modules defined in the stdlib.
225
    # BOOM!
226
    def __init_subclass__(
227
        cls,
228
        *,
229
        repr=True,  # pylint: disable=redefined-builtin
230
        eq=True,  # pylint: disable=invalid-name
231
        order=False,
232
        **kwargs
233
    ):
234
        super().__init_subclass__(**kwargs)
235
        if issubclass(cls, _adt_constructor.ADTConstructor):
236
            return
237
        _ordering_options_are_valid(eq=eq, order=order)
238
239
        _prewritten_methods.SUBCLASS_ORDER[cls] = _adt_constructor.make_constructors(
240
            cls
241
        )
242
243
        source = _prewritten_methods.PrewrittenSumMethods
244
245
        cls.__init_subclass__ = source.__init_subclass__  # type: ignore
246
247
        _sum_new(cls, frozenset(_prewritten_methods.SUBCLASS_ORDER[cls]))
248
249
        _set_new_functions(cls, source.__setattr__, source.__delattr__)
250
        _set_new_functions(cls, source.__bool__)
251
252
        if repr:
253
            _set_new_functions(cls, source.__repr__)
254
255
        equality_methods_were_set = False
256
        if eq:
257
            equality_methods_were_set = not _set_new_functions(
258
                cls, source.__eq__, source.__ne__
259
            )
260
261
        if equality_methods_were_set:
262
            cls.__hash__ = source.__hash__
263
264
        if order:
265
            _set_ordering(
266
                can_set=equality_methods_were_set,
267
                setter=_set_new_functions,
268
                cls=cls,
269
                source=source,
270
            )
271
272
273
class _ConditionalMethod:
274
    name = None
275
    field_check = None
276
277
    def __init__(self, source):
278
        self.source = source
279
280
    def __getattr__(self, name):
281
        self.field_check = name
282
        return self
283
284
    def __set_name__(self, owner, name):
285
        self.__objclass__ = owner
286
        self.name = name
287
288
    def __get__(self, instance, owner):
289
        if getattr(owner, self.field_check):
290
            return getattr(self.source, self.name).__get__(instance, owner)
291
        target = owner if instance is None else instance
292
        return getattr(super(self.__objclass__, target), self.name)
293
294
    def __set__(self, instance, value):
295
        # Don't care about this coverage
296
        raise AttributeError  # pragma: nocover
297
298
    def __delete__(self, instance):
299
        # Don't care about this coverage
300
        raise AttributeError  # pragma: nocover
301
302
303
class Product(_adt_constructor.ADTConstructor, tuple):
304
    """Base class of classes with typed fields.
305
306
    Examines PEP 526 __annotations__ to determine fields.
307
308
    If repr is true, a __repr__() method is added to the class.
309
    If order is true, rich comparison dunder methods are added.
310
311
    The Product class examines the class to find annotations.
312
    Annotations with a value of "None" are discarded.
313
    Fields may have default values, and can be set to inspect.empty to
314
    indicate "no default".
315
316
    The subclass is subclassable. The implementation was designed with a focus
317
    on flexibility over ideals of purity, and therefore provides various
318
    optional facilities that conflict with, for example, Liskov
319
    substitutability. For the purposes of matching, each class is considered
320
    distinct.
321
    """
322
323
    __slots__ = ()
324
325
    def __new__(*args, **kwargs):  # pylint: disable=no-method-argument
326
        cls, *args = args
327
        if cls is Product:
328
            raise TypeError
329
        # Probably a result of not having positional-only args.
330
        values = cls.__defaults.copy()  # pylint: disable=protected-access
331
        _unpack_args(
332
            args=args,
333
            kwargs=kwargs,
334
            fields=cls.__fields,  # pylint: disable=protected-access
335
            values=values,
336
        )
337
        return super(Product, cls).__new__(
338
            cls,
339
            [
340
                values[field]
341
                for field in cls.__fields  # pylint: disable=protected-access
342
            ],
343
        )
344
345
    __repr = True
346
    __eq = True
347
    __order = False
348
    __eq_succeeded = None
349
350
    # Both of these are for consistency with modules defined in the stdlib.
351
    # BOOM!
352
    def __init_subclass__(
353
        cls,
354
        *,
355
        repr=None,  # pylint: disable=redefined-builtin
356
        eq=None,  # pylint: disable=invalid-name
357
        order=None,
358
        **kwargs
359
    ):
360
        super().__init_subclass__(**kwargs)
361
362
        if repr is not None:
363
            cls.__repr = repr
364
        if eq is not None:
365
            cls.__eq = eq
366
        if order is not None:
367
            cls.__order = order
368
369
        _ordering_options_are_valid(eq=cls.__eq, order=cls.__order)
370
371
        cls.__annotations = _annotations._product_args_from_annotations(cls)
372
        cls.__fields = {field: index for (index, field) in enumerate(cls.__annotations)}
373
374
        cls.__defaults = _extract_defaults(cls=cls, annotations=cls.__annotations)
375
376
        _product_new(cls, cls.__annotations, cls.__defaults)
377
378
        source = _prewritten_methods.PrewrittenProductMethods
379
380
        cls.__eq_succeeded = False
381
        if cls.__eq:
382
            cls.__eq_succeeded = not _cant_set_new_functions(
383
                cls, source.__eq__, source.__ne__
384
            )
385
386
        if cls.__order:
387
            _set_ordering(
388
                can_set=cls.__eq_succeeded,
389
                setter=_cant_set_new_functions,
390
                cls=cls,
391
                source=source,
392
            )
393
394
    def __dir__(self):
395
        return super().__dir__() + list(self.__fields)
396
397
    def __getattribute__(self, name):
398
        try:
399
            return super().__getattribute__(name)
400
        except AttributeError:
401
            index = self.__fields.get(name)
402
            if index is None:
403
                raise
404
            return tuple.__getitem__(self, index)
405
406
    source = _prewritten_methods.PrewrittenProductMethods
407
408
    __setattr__ = source.__setattr__
409
    __delattr__ = source.__delattr__
410
    __bool__ = source.__bool__
411
412
    __repr__ = _ConditionalMethod(source).__repr
413
    __hash__ = _ConditionalMethod(source).__eq_succeeded
414
    __eq__ = _ConditionalMethod(source).__eq_succeeded
415
    __ne__ = _ConditionalMethod(source).__eq_succeeded
416
    __lt__ = _ConditionalMethod(source).__order
417
    __le__ = _ConditionalMethod(source).__order
418
    __gt__ = _ConditionalMethod(source).__order
419
    __ge__ = _ConditionalMethod(source).__order
420
421
    del source
422
423
424
__all__ = ["Ctor", "Product", "Sum"]
425