aiohttp_rpc.client.http   A
last analyzed

Complexity

Total Complexity 10

Size/Duplication

Total Lines 59
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 10
eloc 44
dl 0
loc 59
rs 10
c 0
b 0
f 0

4 Methods

Rating   Name   Duplication   Size   Complexity  
A JsonRpcClient.__init__() 0 8 1
A JsonRpcClient.connect() 0 3 2
A JsonRpcClient.disconnect() 0 3 3
A JsonRpcClient.send_json() 0 22 4
1
import typing
2
3
import aiohttp
4
5
from .base import BaseJsonRpcClient
6
from .. import errors, utils
7
8
9
__all__ = (
10
    'JsonRpcClient',
11
)
12
13
14
class JsonRpcClient(BaseJsonRpcClient):
15
    url: str
16
    session: typing.Optional[aiohttp.ClientSession]
17
    request_kwargs: dict
18
    _session_is_outer: bool
19
20
    def __init__(self,
21
                 url: str, *,
22
                 session: typing.Optional[aiohttp.ClientSession] = None,
23
                 **request_kwargs) -> None:
24
        self.url = url
25
        self.session = session
26
        self.request_kwargs = request_kwargs
27
        self._session_is_outer = session is not None  # We don't close an outer session.
28
29
    async def connect(self) -> None:
30
        if self.session is None:
31
            self.session = aiohttp.ClientSession(json_serialize=self.json_serialize, **self.request_kwargs)
32
33
    async def disconnect(self) -> None:
34
        if self.session is not None and not self._session_is_outer:
35
            await self.session.close()
36
37
    async def send_json(self,
38
                        data: typing.Any, *,
39
                        without_response: bool = False,
40
                        **kwargs) -> typing.Tuple[typing.Any, typing.Optional[dict]]:
41
        assert self.session is not None
42
43
        http_response = await self.session.post(self.url, json=data, **kwargs)
44
45
        try:
46
            http_response.raise_for_status()
47
        except aiohttp.ClientResponseError as e:
48
            raise errors.ServerError(f'Server responded with code {http_response.status}.') from e
49
50
        try:
51
            json_response = await http_response.json(loads=self.json_deserialize)
52
        except aiohttp.ContentTypeError as e:
53
            raise errors.ParseError(utils.get_exc_message(e)) from e
54
55
        if without_response:
56
            return None, None
57
58
        return json_response, {'http_response': http_response}
59