Passed
Push — master ( 55adfd...84b7e9 )
by Michael
03:30
created

JsonRpcMethod.__call__()   B

Complexity

Conditions 7

Size

Total Lines 19
Code Lines 14

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 14
dl 0
loc 19
rs 8
c 0
b 0
f 0
cc 7
nop 4
1
import abc
2
import asyncio
3
import inspect
4
import typing
5
6
from .. import errors, utils
7
8
9
__all__ = (
10
    'BaseJsonRpcMethod',
11
    'JsonRpcMethod',
12
)
13
14
15
class BaseJsonRpcMethod(abc.ABC):
16
    name: str
17
    func: typing.Union[typing.Callable, typing.Type]
18
    separator: str = '__'
19
20
    def __init__(self,
21
                 prefix: str,
22
                 func: typing.Union[typing.Callable, typing.Type], *,
23
                 custom_name: typing.Optional[str] = None) -> None:
24
        assert callable(func)
25
26
        self.func = func
27
        self.name = custom_name if custom_name else func.__name__
28
29
        if prefix:
30
            self.name = f'{prefix}{self.separator}{self.name}'
31
32
    @abc.abstractmethod
33
    async def __call__(self, args: list, kwargs: dict, extra_args: typing.Optional[dict] = None) -> typing.Any:
34
        pass
35
36
37
class JsonRpcMethod(BaseJsonRpcMethod):
38
    add_extra_args: bool
39
    is_coroutine: bool
40
    is_class: bool
41
    supported_args: list
42
    supported_kwargs: list
43
    prepare_result: typing.Optional[typing.Callable]
44
45
    def __init__(self,
46
                 prefix: str,
47
                 func: typing.Union[typing.Callable, typing.Type], *,
48
                 custom_name: typing.Optional[str] = None,
49
                 add_extra_args: bool = True,
50
                 prepare_result: typing.Optional[typing.Callable] = None) -> None:
51
        super().__init__(prefix, func, custom_name=custom_name)
52
53
        self.add_extra_args = add_extra_args
54
        self.prepare_result = prepare_result
55
56
        self._inspect_func()
57
58
    async def __call__(self, args: list, kwargs: dict, extra_args: typing.Optional[dict] = None) -> typing.Any:
59
        if self.add_extra_args and extra_args:
60
            args, kwargs = self._add_extra_args_in_args_and_kwargs(args, kwargs, extra_args)
61
62
        try:
63
            func = self.func.__init__ if self.is_class else self.func
64
            inspect.signature(func).bind(*args, **kwargs)
65
        except TypeError as e:
66
            raise errors.InvalidParams(utils.get_exc_message(e)) from e
67
68
        if self.is_coroutine:
69
            result = await self.func(*args, **kwargs)
70
        else:
71
            result = self.func(*args, **kwargs)
72
73
        if self.prepare_result:
74
            result = self.prepare_result(result)
75
76
        return result
77
78
    def _inspect_func(self) -> None:
79
        self.is_class = inspect.isclass(self.func)
80
        func = self.func.__init__ if self.is_class else self.func
81
82
        argspec = inspect.getfullargspec(func)
83
84
        if inspect.ismethod(func):
85
            self.supported_args = argspec.args[1:]
86
        else:
87
            self.supported_args = argspec.args
88
89
        self.supported_kwargs = argspec.kwonlyargs
90
        self.is_coroutine = asyncio.iscoroutinefunction(func)
91
92
    def _add_extra_args_in_args_and_kwargs(self,
93
                                           args: list,
94
                                           kwargs: dict,
95
                                           extra_args: dict) -> typing.Tuple[list, dict]:
96
        if not extra_args:
97
            return args, kwargs
98
99
        new_args = self._add_extra_args_in_args(args, extra_args)
100
101
        if (len(new_args) - len(args)) == len(extra_args):
102
            return new_args, kwargs
103
104
        new_kwargs = self._add_extra_kwargs_in_args(kwargs, extra_args)
105
        return new_args, new_kwargs
106
107
    def _add_extra_args_in_args(self, args: list, extra_args: dict) -> list:
108
        new_args = []
109
110
        for supported_arg in self.supported_args:
111
            if supported_arg not in extra_args:
112
                break
113
114
            new_args.append(extra_args[supported_arg])
115
116
        if new_args:
117
            args = [*new_args, *args]
118
119
        return args
120
121
    def _add_extra_kwargs_in_args(self, kwargs: dict, extra_args: dict) -> dict:
122
        new_kwargs = {}
123
124
        for extra_arg, value in extra_args.items():
125
            if extra_arg in self.supported_kwargs:
126
                new_kwargs[extra_arg] = value
127
128
        if new_kwargs:
129
            kwargs = {**kwargs, **new_kwargs}
130
131
        return kwargs
132