Test Failed
Push — master ( 3314b0...3c70d5 )
by Michael
04:13 queued 01:02
created

aiohttp_rpc.client.JsonRpcClient._parse_batch_method()   A

Complexity

Conditions 5

Size

Total Lines 17
Code Lines 12

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 5
eloc 12
nop 1
dl 0
loc 17
rs 9.3333
c 0
b 0
f 0
1
import types
2
import typing
3
import uuid
4
from functools import partial
5
6
import aiohttp
7
8
from . import errors, utils
9
from .protocol import JsonRpcRequest, JsonRpcResponse
10
11
12
__all__ = (
13
    'JsonRpcClient',
14
)
15
16
17
class JsonRpcClient:
18
    url: str
19
    session: typing.Optional[aiohttp.ClientSession]
20
    _json_serialize: typing.Callable
21
    _is_outer_session: bool
22
    _error_map: typing.Dict[int, errors.JsonRpcError] = {
23
        error.code: error
24
        for error in errors.DEFAULT_KNOWN_ERRORS
25
    }
26
27
    def __init__(self,
28
                 url: str, *,
29
                 session: typing.Optional[aiohttp.ClientSession] = None,
30
                 known_errors: typing.Optional[typing.Iterable] = None,
31
                 json_serialize: typing.Callable = utils.json_serialize) -> None:
32
        self.url = url
33
        self.session = session
34
        self._is_outer_session = session is not None
35
        self._json_serialize = json_serialize
36
37
        if known_errors is not None:
38
            self._error_map = {error.code: error for error in known_errors}
39
40
    def __getattr__(self, method) -> typing.Callable:
41
        return partial(self.call, method)
42
43
    async def call(self, method: str, *args, **kwargs) -> typing.Any:
44
        rpc_request = JsonRpcRequest(msg_id=str(uuid.uuid4()), method=method, args=args, kwargs=kwargs)
45
        rpc_response = await self.direct_call(rpc_request)
46
47
        if rpc_response.error:
48
            raise rpc_response.error
49
50
        return rpc_response.result
51
52
    async def batch(self, methods: typing.Iterable[typing.Union[str, list, tuple]]) -> typing.Any:
53
        rpc_requests = []
54
55
        for method in methods:
56
            rpc_request = self._parse_batch_method(method)
57
            rpc_requests.append(rpc_request)
58
59
        rpc_responses = await self.direct_batch(rpc_requests)
60
61
        return [
62
            rpc_response.error if rpc_response.error else rpc_response.result
63
            for rpc_response in rpc_responses
64
        ]
65
66
    async def direct_call(self, rpc_request: JsonRpcRequest) -> JsonRpcResponse:
67
        http_response, json_response = await self.send_json(rpc_request.to_dict())
68
        rpc_response = JsonRpcResponse.from_dict(
69
            json_response,
70
            error_map=self._error_map,
71
            context={'http_response': http_response},
72
        )
73
        return rpc_response
74
75
    async def direct_batch(self, rpc_requests: typing.List[JsonRpcRequest]) -> typing.List[JsonRpcResponse]:
76
        data = [rpc_request.to_dict() for rpc_request in rpc_requests]
77
        http_response, json_response = await self.send_json(data)
78
79
        return [
80
            JsonRpcResponse.from_dict(item, error_map=self._error_map, context={'http_response': http_response})
81
            for item in json_response
82
        ]
83
84
    async def send_json(self, data: typing.Any) -> typing.Tuple[aiohttp.ClientResponse, typing.Any]:
85
        http_response = await self.session.post(self.url, json=data)
86
        json_response = await http_response.json()
87
        return http_response, json_response
88
89
    async def __aenter__(self) -> 'JsonRpcClient':
90
        if not self.session:
91
            self.session = aiohttp.ClientSession(json_serialize=self._json_serialize)
92
93
        return self
94
95
    async def __aexit__(self,
96
                        exc_type: typing.Optional[typing.Type[BaseException]],
97
                        exc_value: typing.Optional[BaseException],
98
                        traceback: typing.Optional[types.TracebackType]) -> None:
99
        if not self._is_outer_session:
100
            await self.session.close()
101
102
    @staticmethod
103
    def _parse_batch_method(batch_method: typing.Union[str, list, tuple]) -> JsonRpcRequest:
104
        msg_id = str(uuid.uuid4())
105
106
        if isinstance(batch_method, str):
107
            return JsonRpcRequest(msg_id=msg_id, method=batch_method)
108
109
        if len(batch_method) == 1:
110
            return JsonRpcRequest(msg_id=msg_id, method=batch_method[0])
111
112
        if len(batch_method) == 2:
113
            return JsonRpcRequest(msg_id=msg_id, method=batch_method[0], params=batch_method[1])
114
115
        if len(batch_method) == 3:
116
            return JsonRpcRequest(msg_id=msg_id, method=batch_method[0], args=batch_method[1], kwargs=batch_method[2])
117
118
        raise errors.InvalidParams('Use string or list (length less than or equal to 3).')
119