Passed
Push — master ( 1d9f68...a2ade1 )
by Michael
05:08
created

JsonRpcMethod._unwrap_func()   A

Complexity

Conditions 2

Size

Total Lines 6
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 5
dl 0
loc 6
rs 10
c 0
b 0
f 0
cc 2
nop 1
1
import abc
2
import asyncio
3
import inspect
4
import typing
5
6
from .. import errors, utils
7
8
9
__all__ = (
10
    'BaseJsonRpcMethod',
11
    'JsonRpcMethod',
12
)
13
14
15
class BaseJsonRpcMethod(abc.ABC):
16
    name: str
17
    func: typing.Union[typing.Callable, typing.Type]
18
    separator: str = '__'
19
20
    def __init__(self,
21
                 prefix: str,
22
                 func: typing.Union[typing.Callable, typing.Type], *,
23
                 custom_name: typing.Optional[str] = None) -> None:
24
        assert callable(func)
25
26
        self.func = func
27
        self.name = custom_name if custom_name else func.__name__
28
29
        if prefix:
30
            self.name = f'{prefix}{self.separator}{self.name}'
31
32
    @abc.abstractmethod
33
    async def __call__(self, args: list, kwargs: dict, extra_args: typing.Optional[dict] = None) -> typing.Any:
34
        pass
35
36
37
class JsonRpcMethod(BaseJsonRpcMethod):
38
    add_extra_args: bool
39
    is_coroutine: bool
40
    is_class: bool
41
    supported_args: list
42
    supported_kwargs: list
43
    prepare_result: typing.Optional[typing.Callable]
44
45
    def __init__(self,
46
                 prefix: str,
47
                 func: typing.Union[typing.Callable, typing.Type], *,
48
                 custom_name: typing.Optional[str] = None,
49
                 add_extra_args: bool = True,
50
                 prepare_result: typing.Optional[typing.Callable] = None) -> None:
51
        super().__init__(prefix, func, custom_name=custom_name)
52
53
        self.add_extra_args = add_extra_args
54
        self.prepare_result = prepare_result
55
56
        self._inspect_func()
57
58
    async def __call__(self, args: list, kwargs: dict, extra_args: typing.Optional[dict] = None) -> typing.Any:
59
        if self.add_extra_args and extra_args:
60
            args, kwargs = self._add_extra_args_in_args_and_kwargs(args, kwargs, extra_args)
61
62
        self._check_func_signature(args, kwargs)
63
64
        if self.is_coroutine:
65
            result = await self.func(*args, **kwargs)
66
        else:
67
            result = self.func(*args, **kwargs)
68
69
        if self.prepare_result:
70
            result = self.prepare_result(result)
71
72
        return result
73
74
    def _inspect_func(self) -> None:
75
        self.is_class = inspect.isclass(self.func)
76
        func = self.func.__init__ if self.is_class else self._unwrap_func(self.func)
77
78
        argspec = inspect.getfullargspec(func)
79
80
        if self.is_class or inspect.ismethod(func):
81
            self.supported_args = argspec.args[1:]
82
        else:
83
            self.supported_args = argspec.args
84
85
        self.supported_kwargs = argspec.kwonlyargs
86
        self.is_coroutine = asyncio.iscoroutinefunction(func)
87
88
    @staticmethod
89
    def _unwrap_func(func: typing.Callable) -> typing.Callable:
90
        while hasattr(func, '__wrapped__'):
91
            func = func.__wrapped__
92
93
        return func
94
95
    def _add_extra_args_in_args_and_kwargs(self,
96
                                           args: list,
97
                                           kwargs: dict,
98
                                           extra_args: dict) -> typing.Tuple[list, dict]:
99
        if not extra_args:
100
            return args, kwargs
101
102
        new_args = self._add_extra_args_in_args(args, extra_args)
103
104
        if (len(new_args) - len(args)) == len(extra_args):
105
            return new_args, kwargs
106
107
        new_kwargs = self._add_extra_kwargs_in_args(kwargs, extra_args)
108
        return new_args, new_kwargs
109
110
    def _add_extra_args_in_args(self, args: list, extra_args: dict) -> list:
111
        new_args = []
112
113
        for supported_arg in self.supported_args:
114
            if supported_arg not in extra_args:
115
                break
116
117
            new_args.append(extra_args[supported_arg])
118
119
        if new_args:
120
            args = [*new_args, *args]
121
122
        return args
123
124
    def _add_extra_kwargs_in_args(self, kwargs: dict, extra_args: dict) -> dict:
125
        new_kwargs = {}
126
127
        for extra_arg, value in extra_args.items():
128
            if extra_arg in self.supported_kwargs:
129
                new_kwargs[extra_arg] = value
130
131
        if new_kwargs:
132
            kwargs = {**kwargs, **new_kwargs}
133
134
        return kwargs
135
136
    def _check_func_signature(self, args: list, kwargs: dict) -> None:
137
        try:
138
            if self.is_class:
139
                inspect.signature(self.func.__init__).bind(None, *args, **kwargs)
140
            else:
141
                inspect.signature(self.func).bind(*args, **kwargs)
142
        except TypeError as e:
143
            raise errors.InvalidParams(utils.get_exc_message(e)) from e
144