Passed
Push — master ( 55adfd...84b7e9 )
by Michael
03:30
created

BaseJsonRpcServer.get_method()   A

Complexity

Conditions 2

Size

Total Lines 10
Code Lines 8

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 8
dl 0
loc 10
rs 10
c 0
b 0
f 0
cc 2
nop 2
1
import abc
2
import asyncio
3
import typing
4
from functools import partial
5
6
from aiohttp import web
7
8
from .. import constants, errors, middlewares as rpc_middleware, protocol, utils
9
10
11
__all__ = (
12
    'BaseJsonRpcServer',
13
)
14
15
16
class BaseJsonRpcServer(abc.ABC):
17
    methods: typing.Dict[str, protocol.BaseJsonRpcMethod]
18
    middlewares: typing.Tuple[typing.Callable, ...]
19
    json_serialize: typing.Callable
20
    _middleware_chain: typing.Callable
21
22
    def __init__(self, *,
23
                 json_serialize: typing.Callable = utils.json_serialize,
24
                 middlewares: typing.Iterable = (),
25
                 methods: typing.Optional[typing.Dict[str, protocol.BaseJsonRpcMethod]] = None) -> None:
26
        if methods is None:
27
            methods = {
28
                'get_method': protocol.JsonRpcMethod('', self.get_method),
29
                'get_methods': protocol.JsonRpcMethod('', self.get_methods),
30
            }
31
32
        self.methods = methods
33
34
        self.middlewares = tuple(middlewares)
35
        self.load_middlewares()
36
37
        self.json_serialize = json_serialize
38
39
    def load_middlewares(self):
40
        self._middleware_chain = self._process_single_request
41
42
        for middleware in reversed(self.middlewares):
43
            self._middleware_chain = partial(middleware, handler=self._middleware_chain)
44
45
    def add_method(self,
46
                   method: typing.Union[protocol.BaseJsonRpcMethod, tuple, list, typing.Callable], *,
47
                   replace: bool = False) -> protocol.BaseJsonRpcMethod:
48
        if not isinstance(method, protocol.BaseJsonRpcMethod):
49
            if callable(method):
50
                method = protocol.JsonRpcMethod('', method)
51
            else:
52
                method = protocol.JsonRpcMethod(*method)
53
54
        if not replace and method.name in self.methods:
55
            raise errors.InvalidParams(f'Method {method.name} has already been added.')
56
57
        self.methods[method.name] = method
58
59
        return method
60
61
    def add_methods(self,
62
                    methods: typing.Iterable[typing.Union[protocol.BaseJsonRpcMethod, tuple, list, typing.Callable]], *,
63
                    replace: bool = False) -> typing.List[protocol.BaseJsonRpcMethod]:
64
        return [
65
            self.add_method(method, replace=replace)
66
            for method in methods
67
        ]
68
69
    async def call(self,
70
                   method: str, *,
71
                   args: typing.Optional[list] = None,
72
                   kwargs: typing.Optional[dict] = None,
73
                   extra_args: typing.Optional[dict] = None) -> typing.Any:
74
        if args is None:
75
            args = []
76
77
        if kwargs is None:
78
            kwargs = {}
79
80
        if method not in self.methods:
81
            raise errors.MethodNotFound
82
83
        return await self.methods[method](args=args, kwargs=kwargs, extra_args=extra_args)
84
85
    def get_methods(self) -> dict:
86
        return {
87
            name: {
88
                'doc': method.func.__doc__,
89
                'args': method.supported_args,
90
                'kwargs': method.supported_kwargs,
91
            }
92
            for name, method in self.methods.items()
93
        }
94
95
    def get_method(self, name: str) -> typing.Optional[dict]:
96
        method = self.methods.get(name)
97
98
        if not method:
99
            return None
100
101
        return {
102
            'doc': method.func.__doc__,
103
            'args': method.supported_args,
104
            'kwargs': method.supported_kwargs,
105
        }
106
107
    async def _process_input_data(self,
108
                                  data: typing.Union[dict, list], *,
109
                                  http_request: typing.Optional[web.Request] = None) -> typing.Any:
110
        if isinstance(data, list):
111
            coros = (
112
                self._process_single_json_request(raw_rcp_request, http_request=http_request)
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable raw_rcp_request does not seem to be defined.
Loading history...
113
                for raw_rcp_request in data
114
            )
115
            json_responses = await asyncio.gather(*coros, return_exceptions=True)
116
            self._prepare_exceptions(json_responses)
117
            return json_responses
118
119
        if isinstance(data, dict):
120
            return await self._process_single_json_request(data, http_request=http_request)
121
122
        response = protocol.JsonRpcResponse(error=errors.ParseError('Data must be a dict or an list.'))
123
        return response.to_dict()
124
125
    @staticmethod
126
    def _prepare_exceptions(values: list):
127
        for i, value in enumerate(values):
128
            if isinstance(value, errors.JsonRpcError):
129
                values[i] = protocol.JsonRpcResponse(error=value)
130
            elif isinstance(value, Exception):
131
                values[i] = protocol.JsonRpcResponse(error=errors.JsonRpcError(utils.get_exc_message(value)))
132
133
    async def _process_single_json_request(self,
134
                                           json_request: dict, *,
135
                                           http_request: typing.Optional[web.Request] = None) -> dict:
136
        if not isinstance(json_request, dict):
137
            raise errors.ParseError('Data must be a dict or an list.')
138
139
        msg_id = json_request.get('id')
140
141
        try:
142
            request = protocol.JsonRpcRequest.from_dict(json_request, context={'http_request': http_request})
143
        except errors.JsonRpcError as e:
144
            response = protocol.JsonRpcResponse(msg_id=msg_id, error=e)
145
            return response.to_dict()
146
147
        response = await self._middleware_chain(request)
148
        return response.to_dict()
149
150
    async def _process_single_request(self, request: protocol.JsonRpcRequest) -> protocol.JsonRpcResponse:
151
        result, error = constants.NOTHING, constants.NOTHING
152
153
        try:
154
            result = await self.call(
155
                request.method,
156
                args=request.args,
157
                kwargs=request.kwargs,
158
                extra_args=request.extra_args,
159
            )
160
        except errors.JsonRpcError as e:
161
            error = e
162
163
        response = protocol.JsonRpcResponse(
164
            msg_id=request.msg_id,
165
            jsonrpc=request.jsonrpc,
166
            result=result,
167
            error=error,
168
        )
169
170
        return response
171