Passed
Push — master ( f1dd32...841d03 )
by Michael
04:20 queued 01:19
created

aiohttp_rpc.protocol.method.BaseJsonRpcMethod.__init__()   A

Complexity

Conditions 3

Size

Total Lines 11
Code Lines 9

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 9
dl 0
loc 11
rs 9.95
c 0
b 0
f 0
cc 3
nop 5
1
import abc
2
import asyncio
3
import inspect
4
import typing
5
6
from .. import errors, utils
7
8
9
__all__ = (
10
    'BaseJsonRpcMethod',
11
    'JsonRpcMethod',
12
)
13
14
15
class BaseJsonRpcMethod(abc.ABC):
16
    name: str
17
    func: typing.Union[typing.Callable, typing.Type]
18
    separator: str = '__'
19
20
    def __init__(self,
21
                 prefix: str,
22
                 func: typing.Union[typing.Callable, typing.Type], *,
23
                 custom_name: typing.Optional[str] = None) -> None:
24
        assert callable(func)
25
26
        self.func = func
27
        self.name = custom_name if custom_name else func.__name__
28
29
        if prefix:
30
            self.name = f'{prefix}{self.separator}{self.name}'
31
32
    @abc.abstractmethod
33
    async def __call__(self, args: list, kwargs: dict, extra_args: typing.Optional[dict] = None) -> typing.Any:
34
        pass
35
36
37
class JsonRpcMethod(BaseJsonRpcMethod):
38
    add_extra_args: bool
39
    is_coroutine: bool
40
    is_class: bool
41
    supported_args: list
42
    supported_kwargs: list
43
    prepare_result: typing.Optional[typing.Callable]
44
45
    def __init__(self,
46
                 prefix: str,
47
                 func: typing.Union[typing.Callable, typing.Type], *,
48
                 custom_name: typing.Optional[str] = None,
49
                 add_extra_args: bool = True,
50
                 prepare_result: typing.Optional[typing.Callable] = None) -> None:
51
        super().__init__(prefix, func, custom_name=custom_name)
52
53
        self.add_extra_args = add_extra_args
54
        self.prepare_result = prepare_result
55
56
        self._inspect_func()
57
58
    async def __call__(self, args: list, kwargs: dict, extra_args: typing.Optional[dict] = None) -> typing.Any:
59
        if self.add_extra_args and extra_args:
60
            args, kwargs = self._add_extra_args_in_args_and_kwargs(args, kwargs, extra_args)
61
62
        self._check_func_signature(args, kwargs)
63
64
        if self.is_coroutine:
65
            result = await self.func(*args, **kwargs)
66
        else:
67
            result = self.func(*args, **kwargs)
68
69
        if self.prepare_result:
70
            result = self.prepare_result(result)
71
72
        return result
73
74
    def _inspect_func(self) -> None:
75
        self.is_class = inspect.isclass(self.func)
76
        func = self.func.__init__ if self.is_class else self._unwrap_func(self.func)
77
78
        argspec = inspect.getfullargspec(func)
79
80
        if self.is_class or inspect.ismethod(func):
81
            self.supported_args = argspec.args[1:]
82
        else:
83
            self.supported_args = argspec.args
84
85
        self.supported_kwargs = argspec.kwonlyargs
86
        self.is_coroutine = asyncio.iscoroutinefunction(func)
87
88
    @staticmethod
89
    def _unwrap_func(func: typing.Callable) -> typing.Callable:
90
        i = 0
91
92
        while hasattr(func, '__wrapped__'):
93
            func = func.__wrapped__
94
            i += 1
95
96
            if i > 1_000:
97
                raise errors.InternalError('The method has too many wrappers.')
98
99
        return func
100
101
    def _add_extra_args_in_args_and_kwargs(self,
102
                                           args: list,
103
                                           kwargs: dict,
104
                                           extra_args: dict) -> typing.Tuple[list, dict]:
105
        if not extra_args:
106
            return args, kwargs
107
108
        new_args = self._add_extra_args_in_args(args, extra_args)
109
110
        if (len(new_args) - len(args)) == len(extra_args):
111
            return new_args, kwargs
112
113
        new_kwargs = self._add_extra_kwargs_in_args(kwargs, extra_args)
114
        return new_args, new_kwargs
115
116
    def _add_extra_args_in_args(self, args: list, extra_args: dict) -> list:
117
        new_args = []
118
119
        for supported_arg in self.supported_args:
120
            if supported_arg not in extra_args:
121
                break
122
123
            new_args.append(extra_args[supported_arg])
124
125
        if new_args:
126
            args = [*new_args, *args]
127
128
        return args
129
130
    def _add_extra_kwargs_in_args(self, kwargs: dict, extra_args: dict) -> dict:
131
        new_kwargs = {}
132
133
        for extra_arg, value in extra_args.items():
134
            if extra_arg in self.supported_kwargs:
135
                new_kwargs[extra_arg] = value
136
137
        if new_kwargs:
138
            kwargs = {**kwargs, **new_kwargs}
139
140
        return kwargs
141
142
    def _check_func_signature(self, args: list, kwargs: dict) -> None:
143
        try:
144
            if self.is_class:
145
                inspect.signature(self.func.__init__).bind(None, *args, **kwargs)
146
            else:
147
                inspect.signature(self.func).bind(*args, **kwargs)
148
        except TypeError as e:
149
            raise errors.InvalidParams(utils.get_exc_message(e)) from e
150