Passed
Push — master ( 296307...a95b45 )
by Michael
05:07
created

aiohttp_rpc.client.base   A

Complexity

Total Complexity 28

Size/Duplication

Total Lines 176
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 28
eloc 122
dl 0
loc 176
rs 10
c 0
b 0
f 0

13 Methods

Rating   Name   Duplication   Size   Complexity  
A BaseJsonRpcClient.disconnect() 0 3 1
A BaseJsonRpcClient.connect() 0 3 1
A BaseJsonRpcClient.notify() 0 3 1
A BaseJsonRpcClient.send_json() 0 6 1
A BaseJsonRpcClient.direct_call() 0 19 2
A BaseJsonRpcClient.batch_notify() 0 10 2
A BaseJsonRpcClient.__aenter__() 0 3 1
A BaseJsonRpcClient.call() 0 10 2
A BaseJsonRpcClient.__aexit__() 0 5 1
A BaseJsonRpcClient.direct_batch() 0 21 4
A BaseJsonRpcClient.__getattr__() 0 2 1
A BaseJsonRpcClient.batch() 0 21 4
B BaseJsonRpcClient._parse_method_description() 0 36 7
1
import abc
2
import types
3
import typing
4
from functools import partial
5
6
from .. import errors, protocol, typedefs, utils
7
8
9
__all__ = (
10
    'BaseJsonRpcClient',
11
)
12
13
14
class BaseJsonRpcClient(abc.ABC):
15
    error_map: typing.Mapping[int, typing.Type[errors.JsonRpcError]] = {
16
        error.code: error
17
        for error in errors.DEFAULT_KNOWN_ERRORS
18
    }
19
    json_serialize: typing.Callable = utils.json_serialize
20
21
    async def __aenter__(self) -> 'BaseJsonRpcClient':
22
        await self.connect()
23
        return self
24
25
    async def __aexit__(self,
26
                        exc_type: typing.Optional[typing.Type[BaseException]],
27
                        exc_value: typing.Optional[BaseException],
28
                        traceback: typing.Optional[types.TracebackType]) -> None:
29
        await self.disconnect()
30
31
    def __getattr__(self, method_name: str) -> typing.Callable:
32
        return partial(self.call, method_name)
33
34
    @abc.abstractmethod
35
    async def connect(self) -> None:
36
        pass
37
38
    @abc.abstractmethod
39
    async def disconnect(self) -> None:
40
        pass
41
42
    async def call(self, method_name: str, *args, **kwargs) -> typing.Any:
43
        request = protocol.JsonRpcRequest(id=utils.get_random_id(), method_name=method_name, args=args, kwargs=kwargs)
44
        response = await self.direct_call(request)
45
46
        assert response is not None  # Because it isn't a notification
47
48
        if response.error is not None:
49
            raise response.error
50
51
        return response.result
52
53
    async def notify(self, method_name: str, *args, **kwargs) -> None:
54
        request = protocol.JsonRpcRequest(method_name=method_name, args=args, kwargs=kwargs)
55
        await self.direct_call(request)
56
57
    async def batch(self,
58
                    method_descriptions: typedefs.ClientMethodDescriptionsType, *,
59
                    save_order: bool = True) -> typing.Sequence:
60
        if isinstance(method_descriptions, protocol.JsonRpcBatchRequest):
61
            batch_request = method_descriptions
62
        else:
63
            batch_request = protocol.JsonRpcBatchRequest(requests=tuple(
64
                self._parse_method_description(method_description)
65
                for method_description in method_descriptions
66
            ))
67
68
        batch_response = await self.direct_batch(batch_request)
69
70
        assert batch_response is not None  # Because it isn't a notification
71
72
        if save_order:
73
            return utils.collect_batch_result(batch_request, batch_response)
74
        else:
75
            return tuple(
76
                response.result if response.error is None else response.error
77
                for response in batch_response.responses
78
            )
79
80
    async def batch_notify(self, method_descriptions: typedefs.ClientMethodDescriptionsType) -> None:
81
        if isinstance(method_descriptions, protocol.JsonRpcBatchRequest):
82
            batch_request = method_descriptions
83
        else:
84
            batch_request = protocol.JsonRpcBatchRequest(requests=tuple(
85
                self._parse_method_description(method_description, is_notification=True)
86
                for method_description in method_descriptions
87
            ))
88
89
        await self.direct_batch(batch_request)
90
91
    async def direct_call(self,
92
                          request: protocol.JsonRpcRequest,
93
                          **kwargs) -> typing.Optional[protocol.JsonRpcResponse]:
94
        json_response, context = await self.send_json(
95
            request.dump(),
96
            without_response=request.is_notification,
97
            **kwargs,
98
        )
99
100
        if request.is_notification:
101
            return None
102
103
        response = protocol.JsonRpcResponse.load(
104
            json_response,
105
            error_map=self.error_map,
106
            context=context,
107
        )
108
109
        return response
110
111
    async def direct_batch(self,
112
                           batch_request: protocol.JsonRpcBatchRequest,
113
                           **kwargs) -> typing.Optional[protocol.JsonRpcBatchResponse]:
114
        if not batch_request.requests:
115
            raise errors.InvalidRequest('You can not send an empty batch request.')
116
117
        is_notification = batch_request.is_notification
118
119
        json_response, context = await self.send_json(
120
            batch_request.dump(),
121
            without_response=is_notification,
122
            **kwargs,
123
        )
124
125
        if is_notification:
126
            return None
127
128
        if not json_response:
129
            raise errors.ParseError('Server returned an empty batch response.')
130
131
        return protocol.JsonRpcBatchResponse.load(json_response)
132
133
    @abc.abstractmethod
134
    async def send_json(self,
135
                        data: typing.Any, *,
136
                        without_response: bool = False,
137
                        **kwargs) -> typing.Tuple[typing.Any, typing.Optional[dict]]:
138
        pass
139
140
    @staticmethod
141
    def _parse_method_description(method_description: typedefs.ClientMethodDescriptionType, *,
142
                                  is_notification: bool = False) -> protocol.JsonRpcRequest:
143
        if isinstance(method_description, protocol.JsonRpcRequest):
144
            return method_description
145
146
        request_id = None if is_notification else utils.get_random_id()
147
148
        if isinstance(method_description, str):
149
            return protocol.JsonRpcRequest(
150
                id=request_id,
151
                method_name=method_description,
152
            )
153
154
        if len(method_description) == 1:
155
            return protocol.JsonRpcRequest(
156
                id=request_id,
157
                method_name=method_description[0],
158
            )
159
160
        if len(method_description) == 2:
161
            return protocol.JsonRpcRequest(
162
                id=request_id,
163
                method_name=method_description[0],
164
                params=method_description[1],
165
            )
166
167
        if len(method_description) == 3:
168
            return protocol.JsonRpcRequest(
169
                id=request_id,
170
                method_name=method_description[0],
171
                args=method_description[1],
172
                kwargs=method_description[2],  # type: ignore
173
            )
174
175
        raise errors.InvalidParams('Use string or list (length less than or equal to 3).')
176