Passed
Push — master ( 738c54...1e73b7 )
by Michael
04:59
created

BaseJsonRpcServer.get_methods()   A

Complexity

Conditions 1

Size

Total Lines 8
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 7
dl 0
loc 8
rs 10
c 0
b 0
f 0
cc 1
nop 1
1
import abc
2
import asyncio
3
import typing
4
from functools import partial
5
6
from .. import constants, errors, protocol, utils
7
from .. import typedefs
8
9
10
__all__ = (
11
    'BaseJsonRpcServer',
12
)
13
14
15
class BaseJsonRpcServer(abc.ABC):
16
    methods: typing.Dict[str, protocol.BaseJsonRpcMethod]
17
    middlewares: typing.Tuple[typing.Callable, ...]
18
    json_serialize: typedefs.JSONEncoderType
19
    _middleware_chain: typedefs.SingleRequestProcessorType
20
21
    def __init__(self, *,
22
                 json_serialize: typedefs.JSONEncoderType = utils.json_serialize,
23
                 middlewares: typing.Iterable = (),
24
                 methods: typing.Optional[typing.Dict[str, protocol.BaseJsonRpcMethod]] = None) -> None:
25
        if methods is None:
26
            methods = {
27
                'get_method': protocol.JsonRpcMethod('', self.get_method),
28
                'get_methods': protocol.JsonRpcMethod('', self.get_methods),
29
            }
30
31
        self.methods = methods
32
33
        self.middlewares = tuple(middlewares)
34
        self.load_middlewares()
35
36
        self.json_serialize = json_serialize
37
38
    def load_middlewares(self) -> None:
39
        self._middleware_chain = self._process_single_request
40
41
        for middleware in reversed(self.middlewares):
42
            self._middleware_chain = partial(middleware, handler=self._middleware_chain)
43
44
    def add_method(self,
45
                   method: typing.Union[protocol.BaseJsonRpcMethod, tuple, list, typing.Callable], *,
46
                   replace: bool = False) -> protocol.BaseJsonRpcMethod:
47
        if not isinstance(method, protocol.BaseJsonRpcMethod):
48
            if callable(method):
49
                method = protocol.JsonRpcMethod('', method)
50
            else:
51
                method = protocol.JsonRpcMethod(*method)
52
53
        if not replace and method.name in self.methods:
54
            raise errors.InvalidParams(f'Method {method.name} has already been added.')
55
56
        self.methods[method.name] = method
57
58
        return method
59
60
    def add_methods(self,
61
                    methods: typing.Iterable[typing.Union[protocol.BaseJsonRpcMethod, tuple, list, typing.Callable]], *,
62
                    replace: bool = False) -> typing.List[protocol.BaseJsonRpcMethod]:
63
        return [
64
            self.add_method(method, replace=replace)
65
            for method in methods
66
        ]
67
68
    async def call(self,
69
                   method_name: str, *,
70
                   args: typing.Optional[typing.Sequence] = None,
71
                   kwargs: typing.Optional[dict] = None,
72
                   extra_args: typing.Optional[dict] = None) -> typing.Any:
73
        if args is None:
74
            args = ()
75
76
        if kwargs is None:
77
            kwargs = {}
78
79
        if method_name not in self.methods:
80
            raise errors.MethodNotFound
81
82
        return await self.methods[method_name](args=args, kwargs=kwargs, extra_args=extra_args)
83
84
    def get_methods(self) -> dict:
85
        return {
86
            name: {
87
                'doc': method.func.__doc__,
88
                'args': method.supported_args,
89
                'kwargs': method.supported_kwargs,
90
            }
91
            for name, method in self.methods.items()
92
        }
93
94
    def get_method(self, name: str) -> typing.Optional[dict]:
95
        method = self.methods.get(name)
96
97
        if not method:
98
            return None
99
100
        return {
101
            'doc': method.func.__doc__,
102
            'args': method.supported_args,
103
            'kwargs': method.supported_kwargs,
104
        }
105
106
    async def _process_input_data(self,
107
                                  data: typing.Union[dict, list], *,
108
                                  context: typing.Dict[str, typing.Any],
109
                                  ) -> typing.Optional[typing.Union[dict, typing.List[dict]]]:
110
        if isinstance(data, list):
111
            if not data:
112
                return protocol.JsonRpcResponse(error=errors.InvalidRequest()).to_dict()
113
114
            json_responses = await asyncio.gather(
115
                *(
116
                    self._process_single_json_request(raw_rcp_request, context=context)
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable raw_rcp_request does not seem to be defined.
Loading history...
117
                    for raw_rcp_request in data
118
                ),
119
                return_exceptions=True,
120
            )
121
122
            result = [
123
                json_response
124
                for json_response in self._prepare_exceptions(json_responses)
125
                if json_response is not None
126
            ]
127
128
            return result if result else None
129
130
        if isinstance(data, dict):
131
            return await self._process_single_json_request(data, context=context)
132
133
        response = protocol.JsonRpcResponse(error=errors.InvalidRequest('Data must be a dict or an list.'))
134
        return response.to_dict()
135
136
    @staticmethod
137
    def _prepare_exceptions(values: typing.Iterable) -> typing.Iterable:
138
        for i, value in enumerate(values):
139
            if isinstance(value, errors.JsonRpcError):
140
                yield protocol.JsonRpcResponse(error=value)
141
            elif isinstance(value, Exception):
142
                raise value
143
            else:
144
                yield value
145
146
    async def _process_single_json_request(self,
147
                                           json_request: dict, *,
148
                                           context: typing.Dict[str, typing.Any]) -> typing.Optional[dict]:
149
        try:
150
            if not isinstance(json_request, dict):
151
                raise errors.InvalidRequest('Data must be a dict.')
152
153
            request = protocol.JsonRpcRequest.from_dict(json_request, context=context)
154
        except errors.JsonRpcError as e:
155
            if isinstance(json_request, dict):
156
                request_id = json_request.get('id', constants.NOTHING)
157
            else:
158
                request_id = constants.NOTHING
159
160
            response = protocol.JsonRpcResponse(id=request_id, error=e)
161
            return response.to_dict()
162
163
        response = await self._middleware_chain(request)
164
165
        if response.is_notification:
166
            return None
167
168
        return response.to_dict()
169
170
    async def _process_single_request(self, request: protocol.JsonRpcRequest) -> protocol.JsonRpcResponse:
171
        result, error = constants.NOTHING, constants.NOTHING
172
173
        try:
174
            result = await self.call(
175
                request.method_name,
176
                args=request.args,
177
                kwargs=request.kwargs,
178
                extra_args=request.extra_args,
179
            )
180
        except errors.JsonRpcError as e:
181
            error = e
182
183
        response = protocol.JsonRpcResponse(
184
            id=request.id,
185
            jsonrpc=request.jsonrpc,
186
            result=result,
187
            error=error,
188
        )
189
190
        return response
191