Completed
Pull Request — master (#49)
by Max
03:39
created

structured_data._match.descriptor.function   A

Complexity

Total Complexity 27

Size/Duplication

Total Lines 127
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 86
dl 0
loc 127
rs 10
c 0
b 0
f 0
wmc 27
1
"""Callable descriptors that expose decorators for value-based dispatch."""
2
3
from __future__ import annotations
4
5
import functools
6
import inspect
7
import typing
8
9
from ... import _class_placeholder
10
from ... import _doc_wrapper
11
from .. import matchable
12
from ..patterns import mapping_match
13
from . import common
14
15
T = typing.TypeVar("T")
16
17
Kwargs = typing.Dict[str, typing.Any]
18
19
20
def _varargs(signature: inspect.Signature) -> typing.Iterator[inspect.Parameter]:
21
    for parameter in signature.parameters.values():
22
        if parameter.kind is inspect.Parameter.VAR_POSITIONAL:
23
            yield parameter
24
25
26
def _dispatch(
27
    func: typing.Callable,
28
    matches: typing.Mapping,
29
    bound_args: typing.Tuple,
30
    bound_kwargs: Kwargs,
31
) -> typing.Any:
32
    for key, value in matches.items():
33
        if key in bound_kwargs:
34
            raise TypeError
35
        bound_kwargs[key] = value
36
    function_sig = inspect.signature(func)
37
    function_args = function_sig.bind(**bound_kwargs)
38
    for parameter in _varargs(function_sig):
39
        function_args.arguments[parameter.name] = bound_args
40
    function_args.apply_defaults()
41
    return func(*function_args.args, **function_args.kwargs)
42
43
44
def _bound_and_values(
45
    signature: inspect.Signature, args: typing.Tuple, kwargs: Kwargs,
46
) -> typing.Tuple[typing.Tuple, Kwargs, Kwargs]:
47
    # The signature lets us regularize the call and apply any defaults
48
    bound_arguments = signature.bind(*args, **kwargs)
49
    bound_arguments.apply_defaults()
50
51
    # Extract the *args and **kwargs, if any.
52
    # These are never used in the matching, just passed to the underlying function
53
    bound_args = ()
54
    bound_kwargs: Kwargs = {}
55
    values = bound_arguments.arguments.copy()
56
    for parameter in signature.parameters.values():
57
        if parameter.kind is inspect.Parameter.VAR_POSITIONAL:
58
            bound_args = values.pop(parameter.name)
59
        if parameter.kind is inspect.Parameter.VAR_KEYWORD:
60
            bound_kwargs = values.pop(parameter.name)
61
    return bound_args, bound_kwargs, values
62
63
64
class ClassMethod(common.Descriptor):
65
    """Decorator with value-based dispatch. Acts as a classmethod."""
66
67
    __wrapped__: typing.Callable
68
69
    def __init__(self, func: typing.Callable) -> None:
70
        del func
71
        super().__init__()
72
        # A more specific annotation would be good, but that's waiting on
73
        # further development.
74
        self.matchers: common.MatchTemplate[typing.Any] = common.MatchTemplate()
75
76
    def __get__(self, instance, owner):
77
        if instance is None and common.owns(self, owner):
78
            return ClassMethodWhen(self, owner)
79
        return ClassMethodCall(self, owner)
80
81
    def when(
82
        self, /, **kwargs: typing.Any  # noqa: E225
83
    ) -> typing.Callable[[typing.Callable], typing.Callable]:
84
        """Add a binding for this function."""
85
        return common.decorate(self.matchers, _placeholder_kwargs(kwargs))
86
87
88
@_doc_wrapper.ProxyWrapper.wrap_class("class_method")
89
class ClassMethodCall:
90
    """Wrapper class that conceals the ``when()`` decorators."""
91
92
    def __init__(self, class_method: ClassMethod, owner: type) -> None:
93
        self.class_method = class_method
94
        self.owner = owner
95
96
    def __call__(
97
        self, /, *args: typing.Any, **kwargs: typing.Any  # noqa: E225
98
    ) -> typing.Any:
99
        bound_args, bound_kwargs, values = _bound_and_values(
100
            inspect.signature(self.class_method.__wrapped__),
101
            (self.owner,) + args,
102
            kwargs,
103
        )
104
105
        matchable_ = matchable.Matchable(values)
106
        for func in self.class_method.matchers.match(matchable_, self.owner):
107
            return _dispatch(
108
                func,
109
                typing.cast(typing.Mapping, matchable_.matches),
110
                bound_args,
111
                bound_kwargs,
112
            )
113
        return self.class_method.__wrapped__(self.owner, *args, **kwargs)
114
115
116
class ClassMethodWhen(ClassMethodCall):
117
    """Wrapper class that exposes the ``when()`` decorators."""
118
119
    def when(
120
        self, /, **kwargs  # noqa: E225
121
    ) -> typing.Callable[[typing.Callable], typing.Callable]:
122
        """Add a binding for the wrapped method."""
123
        return self.class_method.when(**kwargs)
124
125
126
class StaticMethod(common.Descriptor):
127
    """Decorator with value-based dispatch. Acts as a classmethod."""
128
129
    __wrapped__: typing.Callable
130
131
    def __init__(self, func: typing.Callable) -> None:
132
        del func
133
        super().__init__()
134
        # A more specific annotation would be good, but that's waiting on
135
        # further development.
136
        self.matchers: common.MatchTemplate[typing.Any] = common.MatchTemplate()
137
138
    def __get__(self, instance, owner):
139
        if instance is None and common.owns(self, owner):
140
            return StaticMethodWhen(self)
141
        return StaticMethodCall(self)
142
143
    def when(
144
        self, /, **kwargs  # noqa: E225
145
    ) -> typing.Callable[[typing.Callable], typing.Callable]:
146
        """Add a binding for this function."""
147
        return common.decorate(self.matchers, _no_placeholder_kwargs(kwargs))
148
149
150
@_doc_wrapper.ProxyWrapper.wrap_class("static_method")
151
class StaticMethodCall:
152
    """Wrapper class that conceals the ``when()`` decorators."""
153
154
    def __init__(self, static_method: StaticMethod) -> None:
155
        self.static_method = static_method
156
157
    def __call__(
158
        self, /, *args: typing.Any, **kwargs: typing.Any  # noqa: E225
159
    ) -> typing.Any:
160
        bound_args, bound_kwargs, values = _bound_and_values(
161
            inspect.signature(self.static_method.__wrapped__), args, kwargs,
162
        )
163
164
        matchable_ = matchable.Matchable(values)
165
        for func in self.static_method.matchers.match(matchable_, None):
166
            return _dispatch(
167
                func,
168
                typing.cast(typing.Mapping, matchable_.matches),
169
                bound_args,
170
                bound_kwargs,
171
            )
172
        return self.static_method.__wrapped__(*args, **kwargs)
173
174
175
class StaticMethodWhen(StaticMethodCall):
176
    """Wrapper class that exposes the ``when()`` decorators."""
177
178
    def when(
179
        self, /, **kwargs: typing.Any  # noqa: E225
180
    ) -> typing.Callable[[typing.Callable], typing.Callable]:
181
        """Add a binding for the wrapped method."""
182
        return self.static_method.when(**kwargs)
183
184
185
class Function(common.Descriptor):
186
    """Decorator with value-based dispatch. Acts as a function."""
187
188
    __wrapped__: typing.Callable
189
190
    def __init__(self, func: typing.Callable) -> None:
191
        del func
192
        super().__init__()
193
        # A more specific annotation would be good, but that's waiting on
194
        # further development.
195
        self.matchers: common.MatchTemplate[typing.Any] = common.MatchTemplate()
196
197
    def __call__(
198
        self, /, *args: typing.Any, **kwargs: typing.Any  # noqa: E225
199
    ) -> typing.Any:
200
        # Okay, so, this is a convoluted mess.
201
202
        bound_args, bound_kwargs, values = _bound_and_values(
203
            inspect.signature(self), args, kwargs
204
        )
205
206
        instance = args[0] if args else None
207
208
        matchable_ = matchable.Matchable(values)
209
        for func in self.matchers.match_instance(matchable_, instance):
210
            return _dispatch(
211
                func,
212
                typing.cast(typing.Mapping, matchable_.matches),
213
                bound_args,
214
                bound_kwargs,
215
            )
216
        # Hey, we can just fall back now.
217
        return self.__wrapped__(*args, **kwargs)
218
219
    def __get__(self, instance: typing.Optional[T], owner: typing.Type[T]):
220
        if instance is None:
221
            if common.owns(self, owner):
222
                return self
223
            return MethodProxy(self)
224
        return functools.partial(self, instance)
225
226
    def when(
227
        self, /, **kwargs: typing.Any  # noqa: E225
228
    ) -> typing.Callable[[typing.Callable], typing.Callable]:
229
        """Add a binding for this function."""
230
        return common.decorate(self.matchers, _placeholder_kwargs(kwargs))
231
232
233
@_doc_wrapper.ProxyWrapper.wrap_class("func")
234
class MethodProxy:
235
    """Wrapper class that conceals the ``when()`` decorators."""
236
237
    def __init__(self, func: Function) -> None:
238
        self.func = func
239
240
    def __call__(
241
        self, /, *args: typing.Any, **kwargs: typing.Any  # noqa: E225
242
    ) -> typing.Any:
243
        return self.func(*args, **kwargs)
244
245
    def __get__(self, instance, owner):
246
        return self.func.__get__(instance, owner)
247
248
249
def _kwarg_structure(kwargs: dict) -> mapping_match.DictPattern:
250
    return mapping_match.DictPattern(kwargs, exhaustive=True)
251
252
253
def _no_placeholder_kwargs(kwargs: Kwargs) -> common.Matcher:
254
    if any(
255
        isinstance(kwarg, _class_placeholder.Placeholder) for kwarg in kwargs.values()
256
    ):
257
        raise ValueError
258
259
    return _kwarg_structure(kwargs)
260
261
262
def _placeholder_kwargs(kwargs: Kwargs) -> common.Matcher:
263
    if any(
264
        isinstance(kwarg, _class_placeholder.Placeholder) for kwarg in kwargs.values()
265
    ):
266
267
        @_class_placeholder.Placeholder
268
        def _placeholder(cls: type) -> mapping_match.DictPattern:
269
            return _kwarg_structure(
270
                {
271
                    name: (
272
                        kwarg.func(cls)
273
                        if isinstance(kwarg, _class_placeholder.Placeholder)
274
                        else kwarg
275
                    )
276
                    for (name, kwarg) in kwargs.items()
277
                }
278
            )
279
280
        return _placeholder
281
282
    return _kwarg_structure(kwargs)
283