Passed
Push — master ( 239fba...767d9a )
by Michael
03:05
created

BaseJsonRpcServer._process_input_data()   A

Complexity

Conditions 3

Size

Total Lines 17
Code Lines 14

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 14
dl 0
loc 17
rs 9.7
c 0
b 0
f 0
cc 3
nop 4
1
import abc
2
import asyncio
3
import typing
4
5
from aiohttp import web
6
7
from .. import constants, errors, middlewares as rpc_middleware, protocol, utils
8
9
10
__all__ = (
11
    'BaseJsonRpcServer',
12
)
13
14
15
class BaseJsonRpcServer(abc.ABC):
16
    methods: typing.Dict[str, protocol.JsonRpcMethod]
17
    middlewares: typing.Tuple[typing.Type[rpc_middleware.BaseJsonRpcMiddleware], ...]
18
    json_serialize: typing.Callable
19
    _middleware_chain: typing.Callable
20
21
    def __init__(self, *,
22
                 json_serialize: typing.Callable = utils.json_serialize,
23
                 middlewares: typing.Iterable = (),
24
                 methods: typing.Optional[typing.Dict[str, protocol.JsonRpcMethod]] = None) -> None:
25
        if methods is None:
26
            methods = {'get_methods': protocol.JsonRpcMethod('', self.get_methods)}
27
28
        self.methods = methods
29
30
        self.middlewares = tuple(middlewares)
31
        self.load_middlewares()
32
33
        self.json_serialize = json_serialize
34
35
    def load_middlewares(self):
36
        self._middleware_chain = self._process_single_request
37
38
        for middleware_class in reversed(self.middlewares):
39
            if isinstance(middleware_class, (list, tuple,)):
40
                middleware_class, kwargs = middleware_class
41
                self._middleware_chain = middleware_class(server=self, get_response=self._middleware_chain, **kwargs)
42
                continue
43
44
            self._middleware_chain = middleware_class(server=self, get_response=self._middleware_chain)
45
46
    def add_method(self,
47
                   method: typing.Union[protocol.JsonRpcMethod, tuple, list, typing.Callable], *,
48
                   replace: bool = False) -> protocol.JsonRpcMethod:
49
        if not isinstance(method, protocol.JsonRpcMethod):
50
            if callable(method):
51
                method = protocol.JsonRpcMethod('', method)
52
            else:
53
                method = protocol.JsonRpcMethod(*method)
54
55
        if not replace and method.name in self.methods:
56
            raise errors.InvalidParams(f'Method {method.name} has already been added.')
57
58
        self.methods[method.name] = method
59
60
        return method
61
62
    def add_methods(self,
63
                    methods: typing.Iterable[typing.Union[protocol.JsonRpcMethod, tuple, list, typing.Callable]], *,
64
                    replace: bool = False) -> typing.List[protocol.JsonRpcMethod]:
65
        return [
66
            self.add_method(method, replace=replace)
67
            for method in methods
68
        ]
69
70
    async def call(self,
71
                   method: str, *,
72
                   args: typing.Optional[list] = None,
73
                   kwargs: typing.Optional[dict] = None,
74
                   extra_args: typing.Optional[dict] = None) -> typing.Any:
75
        if args is None:
76
            args = []
77
78
        if kwargs is None:
79
            kwargs = {}
80
81
        if method not in self.methods:
82
            raise errors.MethodNotFound
83
84
        return await self.methods[method](args=args, kwargs=kwargs, extra_args=extra_args)
85
86
    def get_methods(self) -> dict:
87
        return {
88
            name: {
89
                'doc': method.func.__doc__,
90
                'args': method.supported_args,
91
                'kwargs': method.supported_kwargs,
92
            }
93
            for name, method in self.methods.items()
94
        }
95
96
    async def _process_input_data(self,
97
                                  data: typing.Union[dict, list], *,
98
                                  http_request: typing.Optional[web.Request] = None) -> typing.Any:
99
        if isinstance(data, list):
100
            coros = (
101
                self._process_single_json_request(raw_rcp_request, http_request=http_request)
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable raw_rcp_request does not seem to be defined.
Loading history...
102
                for raw_rcp_request in data
103
            )
104
            json_responses = await asyncio.gather(*coros, return_exceptions=True)
105
            self._prepare_exceptions(json_responses)
106
            return json_responses
107
108
        if isinstance(data, dict):
109
            return await self._process_single_json_request(data, http_request=http_request)
110
111
        response = protocol.JsonRpcResponse(error=errors.ParseError('Data must be a dict or an list.'))
112
        return response.to_dict()
113
114
    @staticmethod
115
    def _prepare_exceptions(values: list):
116
        for i, value in enumerate(values):
117
            if isinstance(value, errors.JsonRpcError):
118
                values[i] = protocol.JsonRpcResponse(error=value)
119
            elif isinstance(value, Exception):
120
                values[i] = protocol.JsonRpcResponse(error=errors.JsonRpcError(utils.get_exc_message(value)))
121
122
    async def _process_single_json_request(self,
123
                                           json_request: dict, *,
124
                                           http_request: typing.Optional[web.Request] = None) -> dict:
125
        if not isinstance(json_request, dict):
126
            raise errors.ParseError('Data must be a dict or an list.')
127
128
        msg_id = json_request.get('id')
129
130
        try:
131
            request = protocol.JsonRpcRequest.from_dict(json_request, context={'http_request': http_request})
132
        except errors.JsonRpcError as e:
133
            response = protocol.JsonRpcResponse(msg_id=msg_id, error=e)
134
            return response.to_dict()
135
136
        response = await self._middleware_chain(request)
137
        return response.to_dict()
138
139
    async def _process_single_request(self, request: protocol.JsonRpcRequest) -> protocol.JsonRpcResponse:
140
        result, error = constants.NOTHING, constants.NOTHING
141
142
        try:
143
            result = await self.call(
144
                request.method,
145
                args=request.args,
146
                kwargs=request.kwargs,
147
                extra_args=request.extra_args,
148
            )
149
        except errors.JsonRpcError as e:
150
            error = e
151
152
        response = protocol.JsonRpcResponse(
153
            msg_id=request.msg_id,
154
            jsonrpc=request.jsonrpc,
155
            result=result,
156
            error=error,
157
        )
158
159
        return response
160