Passed
Push — master ( 1de29a...581576 )
by Michael
03:27
created

BaseJsonRpcServer.add_method()   A

Complexity

Conditions 5

Size

Total Lines 15
Code Lines 11

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 11
dl 0
loc 15
rs 9.3333
c 0
b 0
f 0
cc 5
nop 4
1
import abc
2
import asyncio
3
import typing
4
5
from aiohttp import web
6
7
from .. import constants, errors, middlewares as rpc_middleware, protocol, utils
8
9
10
__all__ = (
11
    'BaseJsonRpcServer',
12
)
13
14
15
class BaseJsonRpcServer(abc.ABC):
16
    methods: typing.Dict[str, protocol.JsonRpcMethod]
17
    middlewares: typing.Tuple[typing.Type[rpc_middleware.BaseJsonRpcMiddleware], ...]
18
    json_serialize: typing.Callable
19
    _middleware_chain: typing.Callable
20
21
    def __init__(self, *,
22
                 json_serialize: typing.Callable = utils.json_serialize,
23
                 middlewares: typing.Iterable = (),
24
                 methods: typing.Optional[typing.Dict[str, protocol.JsonRpcMethod]] = None) -> None:
25
        if methods is None:
26
            methods = {'get_methods': protocol.JsonRpcMethod('', self.get_methods)}
27
28
        self.methods = methods
29
30
        self.middlewares = tuple(middlewares)
31
        self.load_middlewares()
32
33
        self.json_serialize = json_serialize
34
35
    def load_middlewares(self):
36
        self._middleware_chain = self._process_single_request
37
38
        for middleware_class in reversed(self.middlewares):
39
            if isinstance(middleware_class, (list, tuple,)):
40
                middleware_class, kwargs = middleware_class
41
                self._middleware_chain = middleware_class(server=self, get_response=self._middleware_chain, **kwargs)
42
                continue
43
44
            self._middleware_chain = middleware_class(server=self, get_response=self._middleware_chain)
45
46
    def add_method(self,
47
                   method: typing.Union[protocol.JsonRpcMethod, tuple, list, typing.Callable], *,
48
                   replace: bool = False) -> protocol.JsonRpcMethod:
49
        if not isinstance(method, protocol.JsonRpcMethod):
50
            if callable(method):
51
                method = protocol.JsonRpcMethod('', method)
52
            else:
53
                method = protocol.JsonRpcMethod(*method)
54
55
        if not replace and method.name in self.methods:
56
            raise errors.InvalidParams(f'Method {method.name} has already been added.')
57
58
        self.methods[method.name] = method
59
60
        return method
61
62
    def add_methods(self,
63
                    methods: typing.Iterable[typing.Union[protocol.JsonRpcMethod, tuple, list, typing.Callable]], *,
64
                    replace: bool = False) -> typing.List[protocol.JsonRpcMethod]:
65
        return [
66
            self.add_method(method, replace=replace)
67
            for method in methods
68
        ]
69
70
    async def call(self,
71
                   method: str, *,
72
                   args: typing.Optional[list] = None,
73
                   kwargs: typing.Optional[dict] = None,
74
                   extra_args: typing.Optional[dict] = None) -> typing.Any:
75
        if args is None:
76
            args = []
77
78
        if kwargs is None:
79
            kwargs = {}
80
81
        if method not in self.methods:
82
            raise errors.MethodNotFound
83
84
        return await self.methods[method](args=args, kwargs=kwargs, extra_args=extra_args)
85
86
    def get_methods(self) -> dict:
87
        return {
88
            name: {
89
                'doc': method.func.__doc__,
90
                'args': method.supported_args,
91
                'kwargs': method.supported_kwargs,
92
            }
93
            for name, method in self.methods.items()
94
        }
95
96
    async def _process_input_data(self,
97
                                  data: typing.Union[dict, list], *,
98
                                  http_request: typing.Optional[web.Request] = None) -> typing.Any:
99
        if isinstance(data, list):
100
            json_responses = await asyncio.gather(*(
101
                self._process_single_json_request(raw_rcp_request, http_request=http_request)
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable raw_rcp_request does not seem to be defined.
Loading history...
102
                for raw_rcp_request in data
103
            ), return_exceptions=True)
104
105
            for i, json_response in enumerate(json_responses):
106
                if isinstance(json_response, errors.JsonRpcError):
107
                    json_responses[i] = protocol.JsonRpcResponse(error=json_response)
108
                elif isinstance(json_response, Exception):
109
                    json_responses[i] = protocol.JsonRpcResponse(
110
                        error=errors.JsonRpcError(utils.get_exc_message(json_response)),
111
                    )
112
113
            return json_responses
114
115
        if isinstance(data, dict):
116
            return await self._process_single_json_request(data, http_request=http_request)
117
118
        response = protocol.JsonRpcResponse(error=errors.ParseError('Data must be a dict or an list.'))
119
        return response.to_dict()
120
121
    async def _process_single_json_request(self,
122
                                           json_request: dict, *,
123
                                           http_request: typing.Optional[web.Request] = None) -> dict:
124
        if not isinstance(json_request, dict):
125
            raise errors.ParseError('Data must be a dict or an list.')
126
127
        msg_id = json_request.get('id')
128
129
        try:
130
            request = protocol.JsonRpcRequest.from_dict(json_request, context={'http_request': http_request})
131
        except errors.JsonRpcError as e:
132
            response = protocol.JsonRpcResponse(msg_id=msg_id, error=e)
133
            return response.to_dict()
134
135
        response = await self._middleware_chain(request)
136
        return response.to_dict()
137
138
    async def _process_single_request(self, request: protocol.JsonRpcRequest) -> protocol.JsonRpcResponse:
139
        result, error = constants.NOTHING, constants.NOTHING
140
141
        try:
142
            result = await self.call(
143
                request.method,
144
                args=request.args,
145
                kwargs=request.kwargs,
146
                extra_args=request.extra_args,
147
            )
148
        except errors.JsonRpcError as e:
149
            error = e
150
151
        response = protocol.JsonRpcResponse(
152
            msg_id=request.msg_id,
153
            jsonrpc=request.jsonrpc,
154
            result=result,
155
            error=error,
156
        )
157
158
        return response
159