BaseJsonRpcServer._process_input_data()   B
last analyzed

Complexity

Conditions 5

Size

Total Lines 30
Code Lines 21

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 21
dl 0
loc 30
rs 8.9093
c 0
b 0
f 0
cc 5
nop 4
1
import abc
2
import asyncio
3
import typing
4
from functools import partial
5
6
from .. import errors, protocol, typedefs, utils
7
8
9
__all__ = (
10
    'BaseJsonRpcServer',
11
)
12
13
14
class BaseJsonRpcServer(abc.ABC):
15
    methods: typing.MutableMapping[str, protocol.BaseJsonRpcMethod]
16
    middlewares: typing.Sequence[typing.Callable]
17
    json_serialize: typedefs.UnboundJSONEncoderType
18
    _middleware_chain: typing.ClassVar[typedefs.UnboundSingleRequestProcessorType]
19
20
    def __init__(self, *,
21
                 json_serialize: typedefs.JSONEncoderType = utils.json_serialize,
22
                 middlewares: typing.Sequence = (),
23
                 methods: typing.Optional[typing.MutableMapping[str, protocol.BaseJsonRpcMethod]] = None) -> None:
24
        if methods is None:
25
            methods = {
26
                'get_method': protocol.JsonRpcMethod(self.get_method),
27
                'get_methods': protocol.JsonRpcMethod(self.get_methods),
28
            }
29
30
        self.methods = methods
31
32
        self.middlewares = middlewares
33
        self._load_middlewares()
34
35
        self.json_serialize = json_serialize  # type: ignore
36
37
    def add_method(self,
38
                   method: typing.Union[typedefs.ServerMethodDescriptionType], *,
39
                   replace: bool = False) -> protocol.BaseJsonRpcMethod:
40
        if not isinstance(method, protocol.BaseJsonRpcMethod):
41
            method = protocol.JsonRpcMethod(method)
42
43
        if not replace and method.name in self.methods:
44
            raise errors.InvalidParams(f'Method {method.name} has already been added.')
45
46
        self.methods[method.name] = method
47
48
        return method
49
50
    def add_methods(self,
51
                    methods: typing.Sequence[typedefs.ServerMethodDescriptionType], *,
52
                    replace: bool = False) -> typing.Tuple[protocol.BaseJsonRpcMethod, ...]:
53
        return tuple(
54
            self.add_method(method, replace=replace)
55
            for method in methods
56
        )
57
58
    async def call(self,
59
                   method_name: str, *,
60
                   args: typing.Optional[typing.Sequence] = None,
61
                   kwargs: typing.Optional[typing.Mapping] = None,
62
                   extra_args: typing.Optional[typing.Mapping] = None) -> typing.Any:
63
        if args is None:
64
            args = ()
65
66
        if kwargs is None:
67
            kwargs = {}
68
69
        if method_name not in self.methods:
70
            raise errors.MethodNotFound
71
72
        return await self.methods[method_name](args=args, kwargs=kwargs, extra_args=extra_args)
73
74
    def get_methods(self) -> typing.Mapping[str, typing.Mapping[str, typing.Any]]:
75
        return {
76
            name: {
77
                'doc': method.doc,
78
                'args': method.supported_args,
79
                'kwargs': method.supported_kwargs,
80
            }
81
            for name, method in self.methods.items()
82
        }
83
84
    def get_method(self, name: str) -> typing.Optional[typing.Mapping[str, typing.Any]]:
85
        method = self.methods.get(name)
86
87
        if not method:
88
            return None
89
90
        return {
91
            'doc': method.doc,
92
            'args': method.supported_args,
93
            'kwargs': method.supported_kwargs,
94
        }
95
96
    def _load_middlewares(self) -> None:
97
        self._middleware_chain = self._process_single_request  # type: ignore
98
99
        for middleware in reversed(self.middlewares):
100
            self._middleware_chain: typedefs.SingleRequestProcessorType = partial(  # type: ignore
101
                middleware,
102
                handler=self._middleware_chain,
103
            )
104
105
    async def _process_input_data(
106
            self,
107
            data: typing.Any, *,
108
            context: typing.MutableMapping[str, typing.Any],
109
    ) -> typing.Optional[typing.Union[typing.Mapping, typing.Tuple[typing.Mapping, ...]]]:
110
        if isinstance(data, typing.Sequence):
111
            if not data:
112
                return protocol.JsonRpcResponse(error=errors.InvalidRequest()).dump()
113
114
            json_responses = await asyncio.gather(
115
                *(
116
                    self._process_single_json_request(raw_rcp_request, context=context)
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable raw_rcp_request does not seem to be defined.
Loading history...
117
                    for raw_rcp_request in data
118
                ),
119
                return_exceptions=True,
120
            )
121
122
            result = tuple(
123
                json_response
124
                for json_response in self._raise_exception_if_have(json_responses)
125
                if json_response is not None  # Skip notifications.
126
            )
127
128
            return result if result else None
129
130
        if isinstance(data, typing.Mapping):
131
            return await self._process_single_json_request(data, context=context)
132
133
        response = protocol.JsonRpcResponse(error=errors.InvalidRequest('Data must be a dict or an list.'))
134
        return response.dump()
135
136
    @staticmethod
137
    def _raise_exception_if_have(values: typing.Iterable) -> typing.Iterable:
138
        for i, value in enumerate(values):
139
            if isinstance(value, Exception):
140
                # Use middlewares (`exception_middleware`) to process exceptions.
141
                raise value
142
            else:
143
                yield value
144
145
    async def _process_single_json_request(self,
146
                                           json_request: typing.Any, *,
147
                                           context: typing.MutableMapping[str, typing.Any],
148
                                           ) -> typing.Optional[typing.Mapping]:
149
        if not isinstance(json_request, typing.Mapping):
150
            return protocol.JsonRpcResponse(error=errors.InvalidRequest('Data must be a dict.')).dump()
151
152
        try:
153
            request = protocol.JsonRpcRequest.load(json_request, context=context)
154
        except errors.JsonRpcError as e:
155
            return protocol.JsonRpcResponse(id=json_request.get('id'), error=e).dump()
156
157
        response = await self._middleware_chain(request)
158
159
        if response.is_notification:
160
            return None
161
162
        return response.dump()
163
164
    async def _process_single_request(self, request: protocol.JsonRpcRequest) -> protocol.JsonRpcResponse:
165
        result, error = None, None
166
167
        try:
168
            result = await self.call(
169
                request.method_name,
170
                args=request.args,
171
                kwargs=request.kwargs,
172
                extra_args=request.extra_args,
173
            )
174
        except errors.JsonRpcError as e:
175
            error = e
176
177
        response = protocol.JsonRpcResponse(
178
            id=request.id,
179
            jsonrpc=request.jsonrpc,
180
            result=result,
181
            error=error,
182
        )
183
184
        return response
185