Test Failed
Pull Request — master (#5)
by Michael
13:51
created

BaseJsonRpcClient._parse_method_description()   B

Complexity

Conditions 7

Size

Total Lines 36
Code Lines 26

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 26
dl 0
loc 36
rs 7.856
c 0
b 0
f 0
cc 7
nop 3
1
import abc
2
import types
3
import typing
4
from functools import partial
5
6
from .. import constants, errors, protocol, utils
7
8
9
__all__ = (
10
    'BaseJsonRpcClient',
11
)
12
13
MethodDescription = typing.Union[str, list, tuple, protocol.JsonRpcRequest]
14
MethodDescriptionList = typing.Iterable[typing.Union[MethodDescription, protocol.JsonRpcBatchRequest]]
15
16
17
class BaseJsonRpcClient(abc.ABC):
18
    error_map: typing.Dict[int, errors.JsonRpcError] = {
19
        error.code: error
20
        for error in errors.DEFAULT_KNOWN_ERRORS
21
    }
22
    json_serialize: typing.Callable = utils.json_serialize
23
24
    async def __aenter__(self) -> 'BaseJsonRpcClient':
25
        await self.connect()
26
        return self
27
28
    async def __aexit__(self,
29
                        exc_type: typing.Optional[typing.Type[BaseException]],
30
                        exc_value: typing.Optional[BaseException],
31
                        traceback: typing.Optional[types.TracebackType]) -> None:
32
        await self.disconnect()
33
34
    def __getattr__(self, method_name: str) -> typing.Callable:
35
        return partial(self.call, method_name)
36
37
    @abc.abstractmethod
38
    async def connect(self) -> None:
39
        pass
40
41
    @abc.abstractmethod
42
    async def disconnect(self) -> None:
43
        pass
44
45
    async def call(self, method_name: str, *args, **kwargs) -> typing.Any:
46
        request = protocol.JsonRpcRequest(id=utils.get_random_id(), method_name=method_name, args=args, kwargs=kwargs)
47
        response = await self.direct_call(request)
48
49
        if response.error not in constants.EMPTY_VALUES:
50
            raise response.error
51
52
        return response.result
53
54
    async def notify(self, method: str, *args, **kwargs) -> None:
55
        request = protocol.JsonRpcRequest(method_name=method, args=args, kwargs=kwargs)
56
        await self.direct_call(request)
57
58
    async def batch(self,
59
                    method_descriptions: MethodDescriptionList, *,
60
                    save_order: bool = True) -> typing.Any:
61
        if isinstance(method_descriptions, protocol.JsonRpcBatchRequest):
62
            batch_request = method_descriptions
63
        else:
64
            requests = [
65
                self._parse_method_description(method_description)
66
                for method_description in method_descriptions
67
            ]
68
            batch_request = protocol.JsonRpcBatchRequest(requests=requests)
69
70
        batch_response = await self.direct_batch(batch_request)
71
72
        if save_order:
73
            return self._collect_batch_result(batch_request, batch_response)
74
        else:
75
            return [
76
                response.result if response.error in constants.EMPTY_VALUES else response.error
77
                for response in batch_response.responses
78
            ]
79
80
    async def batch_notify(self, method_descriptions: MethodDescriptionList) -> None:
81
        if isinstance(method_descriptions, protocol.JsonRpcBatchRequest):
82
            batch_request = method_descriptions
83
        else:
84
            requests = [
85
                self._parse_method_description(method_description, is_notification=True)
86
                for method_description in method_descriptions
87
            ]
88
            batch_request = protocol.JsonRpcBatchRequest(requests=requests)
89
90
        await self.direct_batch(batch_request)
91
92
    async def direct_call(self, request: protocol.JsonRpcRequest) -> typing.Optional[protocol.JsonRpcResponse]:
93
        json_response, context = await self.send_json(request.to_dict(), without_response=request.is_notification)
94
95
        if request.is_notification:
96
            return None
97
98
        response = protocol.JsonRpcResponse.from_dict(
99
            json_response,
100
            error_map=self.error_map,
101
            context=context,
102
        )
103
104
        return response
105
106
    async def direct_batch(self, batch_request: protocol.JsonRpcBatchRequest) -> typing.Optional[protocol.JsonRpcBatchResponse]:
107
        if not batch_request.requests:
108
            raise errors.InvalidRequest('You can not send an empty batch request.')
109
110
        is_notification = batch_request.is_notification
111
        json_response, context = await self.send_json(batch_request.to_list(), without_response=is_notification)
112
113
        if is_notification:
114
            return None
115
116
        if not json_response:
117
            raise errors.ParseError('Server returned an empty batch response.')
118
119
        return protocol.JsonRpcBatchResponse.from_list(json_response)
120
121
    @abc.abstractmethod
122
    async def send_json(self,
123
                        data: typing.Any, *,
124
                        without_response: bool = False) -> typing.Tuple[typing.Any, typing.Optional[dict]]:
125
        pass
126
127
    @staticmethod
128
    def _collect_batch_result(batch_request: protocol.JsonRpcBatchRequest,
129
                              batch_response: protocol.JsonRpcBatchResponse) -> list:
130
        unlinked_results = protocol.UnlinkedResults()
131
        responses_map = {}
132
133
        for response in batch_response.responses:
134
            if response.error in constants.EMPTY_VALUES:
135
                value = response.result
136
            else:
137
                value = response.error
138
139
            if response.id in constants.EMPTY_VALUES:
140
                unlinked_results.add(value)
141
                continue
142
143
            if response.id in responses_map:
144
                if isinstance(responses_map[response.id], protocol.DuplicatedResults):
145
                    responses_map[response.id].add(value)
146
                else:
147
                    responses_map[response.id] = protocol.DuplicatedResults(data=[
148
                        responses_map[response.id],
149
                        value,
150
                    ])
151
152
            responses_map[response.id] = value
153
154
        if not unlinked_results:
155
            unlinked_results = None
156
157
        result = []
158
159
        for request in batch_request.requests:
160
            if request.is_notification:
161
                result.append(unlinked_results)
162
                continue
163
164
            result.append(responses_map.get(request.id, unlinked_results))
165
166
        return result
167
168
    @staticmethod
169
    def _parse_method_description(method_description: MethodDescription, *,
170
                                  is_notification: bool = False) -> protocol.JsonRpcRequest:
171
        if isinstance(method_description, protocol.JsonRpcRequest):
172
            return method_description
173
174
        request_id = constants.NOTHING if is_notification else utils.get_random_id()
175
176
        if isinstance(method_description, str):
177
            return protocol.JsonRpcRequest(
178
                id=request_id,
179
                method_name=method_description,
180
            )
181
182
        if len(method_description) == 1:
183
            return protocol.JsonRpcRequest(
184
                id=request_id,
185
                method_name=method_description[0],
186
            )
187
188
        if len(method_description) == 2:
189
            return protocol.JsonRpcRequest(
190
                id=request_id,
191
                method_name=method_description[0],
192
                params=method_description[1],
193
            )
194
195
        if len(method_description) == 3:
196
            return protocol.JsonRpcRequest(
197
                id=request_id,
198
                method_name=method_description[0],
199
                args=method_description[1],
200
                kwargs=method_description[2],
201
            )
202
203
        raise errors.InvalidParams('Use string or list (length less than or equal to 3).')
204