Test Failed
Pull Request — master (#5)
by Michael
15:16
created

aiohttp_rpc.client.websocket   A

Complexity

Total Complexity 42

Size/Duplication

Total Lines 188
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 42
eloc 137
dl 0
loc 188
rs 9.0399
c 0
b 0
f 0

17 Methods

Rating   Name   Duplication   Size   Complexity  
A WsJsonRpcClient.connect() 0 11 3
A WsJsonRpcClientForResponse.connect() 0 2 1
A WsJsonRpcClientForResponse.__init__() 0 2 1
A WsJsonRpcClient.clear_pending() 0 2 1
A WsJsonRpcClientForResponse.disconnect() 0 2 1
A WsJsonRpcClient.__init__() 0 11 1
A WsJsonRpcClientForResponse.send_json() 0 7 1
A WsJsonRpcClient.send_json() 0 24 5
A WsJsonRpcClient.disconnect() 0 9 4
A WsJsonRpcClient._notify_all_about_error() 0 5 2
A WsJsonRpcClientForResponse.direct_batch() 0 2 1
A WsJsonRpcClient._handle_ws_messages() 0 11 4
A WsJsonRpcClient._get_msg_ids_from_json() 0 16 5
A WsJsonRpcClient._notify_about_results() 0 9 4
A WsJsonRpcClient._handle_ws_message() 0 18 5
A WsJsonRpcClientForResponse.direct_call() 0 2 1
A WsJsonRpcClient._notify_about_result() 0 5 2

How to fix   Complexity   

Complexity

Complex classes like aiohttp_rpc.client.websocket often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
import asyncio
2
import json
3
import logging
4
import typing
5
6
import aiohttp
7
from aiohttp import web_ws
8
9
from .base import BaseJsonRpcClient
10
from .. import errors, utils
11
12
13
__all__ = (
14
    'WsJsonRpcClient',
15
    'WsJsonRpcClientForResponse',
16
)
17
18
logger = logging.getLogger(__name__)
19
20
21
class WsJsonRpcClient(BaseJsonRpcClient):
22
    ws_connect = None
23
    notify_about_result: typing.Optional[typing.Callable] = None
24
    timeout: typing.Optional[int]
25
    ws_connect_kwargs: dict
26
    _pending: typing.Dict[typing.Any, asyncio.Future]
27
    _message_worker: typing.Optional[asyncio.Future] = None
28
29
    def __init__(self,
30
                 url: str, *,
31
                 session: typing.Optional[aiohttp.ClientSession] = None,
32
                 timeout: typing.Optional[int] = 5,
33
                 **ws_connect_kwargs) -> None:
34
        self.url = url
35
        self.session = session
36
        self._is_outer_session = session is not None
37
        self._pending = {}
38
        self.timeout = timeout
39
        self.ws_connect_kwargs = ws_connect_kwargs
40
41
    async def connect(self) -> None:
42
        if not self.session:
43
            self.session = aiohttp.ClientSession(json_serialize=self.json_serialize)
44
45
        try:
46
            self.ws_connect = await self.session.ws_connect(self.url, **self.ws_connect_kwargs)
47
        except Exception:
48
            await self.disconnect()
49
            raise
50
51
        self._message_worker = asyncio.ensure_future(self._handle_ws_messages())
52
53
    async def disconnect(self) -> None:
54
        if self.ws_connect:
55
            await self.ws_connect.close()
56
57
        if not self._is_outer_session:
58
            await self.session.close()
59
60
        if self._message_worker:
61
            await self._message_worker
62
63
    async def send_json(self,
64
                        data: typing.Any, *,
65
                        without_response: bool = False) -> typing.Tuple[typing.Any, typing.Optional[dict]]:
66
        if without_response:
67
            await self.ws_connect.send_str(self.json_serialize(data))
68
            return None, None
69
70
        msg_ids = self._get_msg_ids_from_json(data)
71
        future = asyncio.Future()
72
73
        for msg_id in msg_ids:
74
            self._pending[msg_id] = future
75
76
        await self.ws_connect.send_str(self.json_serialize(data))
77
78
        if not msg_ids:
79
            return None, None
80
81
        if self.timeout is not None:
82
            future = asyncio.wait_for(future, timeout=self.timeout)
83
84
        result = await future
85
86
        return result, None
87
88
    def clear_pending(self) -> None:
89
        self._pending.clear()
90
91
    @staticmethod
92
    def _get_msg_ids_from_json(data: typing.Any) -> typing.Optional[list]:
93
        if not data:
94
            return []
95
96
        if isinstance(data, dict) and data.get('id') is not None:
97
            return [data['id']]
98
99
        if isinstance(data, list):
100
            return [
101
                item['id']
102
                for item in data
103
                if item.get('id') is not None
104
            ]
105
106
        return []
107
108
    async def _handle_ws_messages(self) -> typing.NoReturn:
109
        while not self.ws_connect.closed:
110
            try:
111
                ws_msg = await self.ws_connect.receive()
112
                self._handle_ws_message(ws_msg)
113
            except asyncio.CancelledError as e:
114
                error = errors.ServerError(utils.get_exc_message(e)).with_traceback()
115
                self._notify_all_about_error(error)
116
                raise
117
            except Exception as e:
118
                logger.exception(e)
119
120
    def _handle_ws_message(self, ws_msg: aiohttp.WSMessage) -> None:
121
        if ws_msg.type != aiohttp.WSMsgType.text:
122
            return
123
124
        json_response = json.loads(ws_msg.data)
125
126
        if isinstance(json_response, dict) and 'id' in json_response:
127
            self._notify_about_result(json_response['id'], json_response)
128
            return
129
130
        if isinstance(json_response, list):
131
            self._notify_about_results(
132
                [
133
                    item['id']
134
                    for item in json_response
135
                    if isinstance(item, dict) and 'id' in item
136
                ],
137
                json_response,
138
            )
139
140
    def _notify_all_about_error(self, error: errors.JsonRpcError) -> None:
141
        for future in self._pending.values():
142
            future.set_exception(error)
143
144
        self.clear_pending()
145
146
    def _notify_about_result(self, msg_id: typing.Any, json_response: dict) -> None:
147
        future = self._pending.pop(msg_id, None)
148
149
        if future:
150
            future.set_result(json_response)
151
152
    def _notify_about_results(self, msg_ids: list, json_response: list) -> None:
153
        is_processed = False
154
155
        for msg_id in msg_ids:
156
            future = self._pending.pop(msg_id, None)
157
158
            if future and not is_processed:
159
                future.set_result(json_response)
160
                is_processed = True
161
162
163
class WsJsonRpcClientForResponse(BaseJsonRpcClient):
164
    ws_response: web_ws.WebSocketResponse
165
166
    def __init__(self, ws_response: web_ws.WebSocketResponse) -> None:
167
        self.ws_response = ws_response
168
169
    async def connect(self) -> None:
170
        pass
171
172
    async def disconnect(self) -> None:
173
        pass
174
175
    async def send_json(self,
176
                        data: typing.Any, *,
177
                        without_response: bool = False) -> typing.Tuple[typing.Any, typing.Optional[dict]]:
178
        assert without_response
179
180
        await self.ws_response.send_str(self.json_serialize(data))
181
        return None, None
182
183
    async def direct_call(self, *args, **kwargs):
184
        raise NotImplementedError
185
186
    async def direct_batch(self, *args, **kwargs):
187
        raise NotImplementedError
188