aiohttp_rpc.client.base.BaseJsonRpcClient.call()   A
last analyzed

Complexity

Conditions 2

Size

Total Lines 10
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 7
dl 0
loc 10
rs 10
c 0
b 0
f 0
cc 2
nop 4
1
import abc
2
import types
3
import typing
4
from functools import partial
5
6
from .. import errors, protocol, typedefs, utils
7
8
9
__all__ = (
10
    'BaseJsonRpcClient',
11
)
12
13
14
class BaseJsonRpcClient(abc.ABC):
15
    error_map: typing.Mapping[int, typing.Type[errors.JsonRpcError]] = {
16
        error.code: error
17
        for error in errors.DEFAULT_KNOWN_ERRORS
18
    }
19
20
    async def __aenter__(self) -> 'BaseJsonRpcClient':
21
        await self.connect()
22
        return self
23
24
    async def __aexit__(self,
25
                        exc_type: typing.Optional[typing.Type[BaseException]],
26
                        exc_value: typing.Optional[BaseException],
27
                        traceback: typing.Optional[types.TracebackType]) -> None:
28
        await self.disconnect()
29
30
    def __getattr__(self, method_name: str) -> typing.Callable:
31
        return partial(self.call, method_name)
32
33
    @abc.abstractmethod
34
    async def connect(self) -> None:
35
        pass
36
37
    @abc.abstractmethod
38
    async def disconnect(self) -> None:
39
        pass
40
41
    async def call(self, method_name: str, *args, **kwargs) -> typing.Any:
42
        request = protocol.JsonRpcRequest(id=utils.get_random_id(), method_name=method_name, args=args, kwargs=kwargs)
43
        response = await self.direct_call(request)
44
45
        assert response is not None  # Because it isn't a notification
46
47
        if response.error is not None:
48
            raise response.error
49
50
        return response.result
51
52
    async def notify(self, method_name: str, *args, **kwargs) -> None:
53
        request = protocol.JsonRpcRequest(method_name=method_name, args=args, kwargs=kwargs)
54
        await self.direct_call(request)
55
56
    async def batch(self,
57
                    method_descriptions: typedefs.ClientMethodDescriptionsType, *,
58
                    save_order: bool = True) -> typing.Sequence:
59
        if isinstance(method_descriptions, protocol.JsonRpcBatchRequest):
60
            batch_request = method_descriptions
61
        else:
62
            batch_request = protocol.JsonRpcBatchRequest(requests=tuple(
63
                self._parse_method_description(method_description)
64
                for method_description in method_descriptions
65
            ))
66
67
        batch_response = await self.direct_batch(batch_request)
68
69
        assert batch_response is not None  # Because it isn't a notification
70
71
        if save_order:
72
            return utils.collect_batch_result(batch_request, batch_response)
73
        else:
74
            return tuple(
75
                response.result if response.error is None else response.error
76
                for response in batch_response.responses
77
            )
78
79
    async def batch_notify(self, method_descriptions: typedefs.ClientMethodDescriptionsType) -> None:
80
        if isinstance(method_descriptions, protocol.JsonRpcBatchRequest):
81
            batch_request = method_descriptions
82
        else:
83
            batch_request = protocol.JsonRpcBatchRequest(requests=tuple(
84
                self._parse_method_description(method_description, is_notification=True)
85
                for method_description in method_descriptions
86
            ))
87
88
        await self.direct_batch(batch_request)
89
90
    async def direct_call(self,
91
                          request: protocol.JsonRpcRequest,
92
                          **kwargs) -> typing.Optional[protocol.JsonRpcResponse]:
93
        json_response, context = await self.send_json(
94
            request.dump(),
95
            without_response=request.is_notification,
96
            **kwargs,
97
        )
98
99
        if request.is_notification:
100
            return None
101
102
        response = protocol.JsonRpcResponse.load(
103
            json_response,
104
            error_map=self.error_map,
105
            context=context,
106
        )
107
108
        return response
109
110
    async def direct_batch(self,
111
                           batch_request: protocol.JsonRpcBatchRequest,
112
                           **kwargs) -> typing.Optional[protocol.JsonRpcBatchResponse]:
113
        if not batch_request.requests:
114
            raise errors.InvalidRequest('You can\'t send an empty batch request.')
115
116
        is_notification = batch_request.is_notification
117
118
        json_response, context = await self.send_json(
119
            batch_request.dump(),
120
            without_response=is_notification,
121
            **kwargs,
122
        )
123
124
        if is_notification:
125
            return None
126
127
        if not json_response:
128
            raise errors.ParseError('Server returned an empty batch response.')
129
130
        return protocol.JsonRpcBatchResponse.load(json_response)
131
132
    @abc.abstractmethod
133
    async def send_json(self,
134
                        data: typing.Any, *,
135
                        without_response: bool = False,
136
                        **kwargs) -> typing.Tuple[typing.Any, typing.Optional[dict]]:
137
        pass
138
139
    @staticmethod
140
    def json_serialize(data: typing.Any) -> str:
141
        return utils.json_serialize(data)
142
143
    @staticmethod
144
    def json_deserialize(data: str) -> typing.Any:
145
        return utils.json_deserialize(data)
146
147
    @staticmethod
148
    def _parse_method_description(method_description: typedefs.ClientMethodDescriptionType, *,
149
                                  is_notification: bool = False) -> protocol.JsonRpcRequest:
150
        if isinstance(method_description, protocol.JsonRpcRequest):
151
            return method_description
152
153
        request_id = None if is_notification else utils.get_random_id()
154
155
        if isinstance(method_description, str):
156
            return protocol.JsonRpcRequest(
157
                id=request_id,
158
                method_name=method_description,
159
            )
160
161
        if len(method_description) == 1:
162
            return protocol.JsonRpcRequest(
163
                id=request_id,
164
                method_name=method_description[0],
165
            )
166
167
        if len(method_description) == 2:
168
            return protocol.JsonRpcRequest(
169
                id=request_id,
170
                method_name=method_description[0],
171
                params=method_description[1],
172
            )
173
174
        if len(method_description) == 3:
175
            return protocol.JsonRpcRequest(
176
                id=request_id,
177
                method_name=method_description[0],
178
                args=method_description[1],
179
                kwargs=method_description[2],  # type: ignore
180
            )
181
182
        raise errors.InvalidParams('Use string or list (length less than or equal to 3).')
183