aiohttp_rpc.client.websocket   F
last analyzed

Complexity

Total Complexity 68

Size/Duplication

Total Lines 302
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 68
eloc 224
dl 0
loc 302
rs 2.96
c 0
b 0
f 0

13 Methods

Rating   Name   Duplication   Size   Complexity  
B WsJsonRpcClient.connect() 0 19 6
A WsJsonRpcClient.__init__() 0 28 1
B WsJsonRpcClient.send_json() 0 38 7
A WsJsonRpcClient._notify_all_about_error() 0 8 3
B WsJsonRpcClient.disconnect() 0 18 8
C WsJsonRpcClient._handle_ws_messages() 0 39 10
B WsJsonRpcClient._handle_single_json_response() 0 15 6
B WsJsonRpcClient._handle_json_responses() 0 14 6
B WsJsonRpcClient._handle_single_ws_message() 0 23 6
A WsJsonRpcClient._notify_about_results() 0 14 4
A WsJsonRpcClient._notify_about_result() 0 5 2
A WsJsonRpcClient._get_ids_from_json() 0 18 5
A WsJsonRpcClient._check_ws_connection() 0 13 4

How to fix   Complexity   

Complexity

Complex classes like aiohttp_rpc.client.websocket often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
import asyncio
2
import logging
3
import typing
4
5
from aiohttp import ClientSession, http_websocket, web_ws
6
7
from .base import BaseJsonRpcClient
8
from .. import errors, typedefs, utils
9
10
11
__all__ = (
12
    'WsJsonRpcClient',
13
)
14
15
logger = logging.getLogger(__name__)
16
17
18
class WsJsonRpcClient(BaseJsonRpcClient):
19
    url: typing.Optional[str]
20
    ws_connect: typing.Optional[typedefs.WSConnectType]
21
    ws_connect_kwargs: dict
22
    _timeout: typing.Optional[float]
23
    _timeout_for_data_receiving: typing.Optional[float]
24
    _connection_check_interval: typing.Optional[float]
25
    _pending: typing.Dict[typing.Any, asyncio.Future]
26
    _message_worker: typing.Optional[asyncio.Future] = None
27
    _check_worker: typing.Optional[asyncio.Future] = None
28
    _session_is_outer: bool
29
    _ws_connect_is_outer: bool
30
    _json_request_handler: typing.Optional[typing.Callable] = None
31
    _unprocessed_json_response_handler: typing.Optional[typing.Callable] = None
32
    _background_tasks: typing.Set
33
    _is_closed: bool = True
34
35
    def __init__(self,
36
                 url: typing.Optional[str] = None, *,
37
                 session: typing.Optional[ClientSession] = None,
38
                 ws_connect: typing.Optional[typedefs.WSConnectType] = None,
39
                 timeout: typing.Optional[float] = 60,
40
                 timeout_for_data_receiving: typing.Optional[float] = None,
41
                 connection_check_interval: typing.Optional[float] = 5,
42
                 json_request_handler: typing.Optional[typing.Callable] = None,
43
                 unprocessed_json_response_handler: typing.Optional[typing.Callable] = None,
44
                 **ws_connect_kwargs) -> None:
45
        assert (session is not None) or (url is not None and session is None) or (ws_connect is not None)
46
47
        self.url = url
48
        self._timeout = timeout
49
        self._timeout_for_data_receiving = timeout_for_data_receiving
50
        self._connection_check_interval = connection_check_interval
51
52
        self.session = session
53
        self._session_is_outer = session is not None  # We don't close an outer session.
54
55
        self.ws_connect = ws_connect
56
        self.ws_connect_kwargs = ws_connect_kwargs
57
        self._ws_connect_is_outer = ws_connect is not None  # We don't close an outer WS connection.
58
59
        self._pending = {}
60
        self._json_request_handler = json_request_handler
61
        self._unprocessed_json_response_handler = unprocessed_json_response_handler
62
        self._background_tasks = set()
63
64
    async def connect(self) -> None:
65
        self._is_closed = False
66
67
        if self.session is None and self.ws_connect is None:
68
            self.session = ClientSession(json_serialize=self.json_serialize)
69
70
        if self.ws_connect is None:
71
            assert self.url is not None and self.session is not None
72
73
            try:
74
                self.ws_connect = await self.session.ws_connect(self.url, **self.ws_connect_kwargs)
75
            except Exception:
76
                await self.disconnect()
77
                raise
78
79
        self._message_worker = asyncio.create_task(self._handle_ws_messages())
80
81
        if self._connection_check_interval is not None:
82
            self._check_worker = asyncio.create_task(self._check_ws_connection())
83
84
    async def disconnect(self) -> None:
85
        self._is_closed = True
86
87
        if self.ws_connect is not None and not self._ws_connect_is_outer:
88
            await self.ws_connect.close()
89
90
        if self.session is not None and not self._session_is_outer:
91
            await self.session.close()
92
93
        if self._message_worker is not None:
94
            if self._ws_connect_is_outer:
95
                await asyncio.wait_for(self._message_worker, timeout=60)
96
            else:
97
                await self._message_worker
98
99
        if self._check_worker is not None:
100
            self._check_worker.cancel()
101
            await self._check_worker
102
103
    async def send_json(self,
104
                        data: typing.Any, *,
105
                        without_response: bool = False,
106
                        **kwargs) -> typing.Tuple[typing.Any, typing.Optional[dict]]:
107
        assert self.ws_connect is not None
108
109
        if without_response:
110
            try:
111
                await self.ws_connect.send_str(self.json_serialize(data), **kwargs)
112
            except ConnectionResetError as e:
113
                error = errors.ServerError(utils.get_exc_message(e)).with_traceback()
114
                self._notify_all_about_error(error)
115
                raise error
116
117
            return None, None
118
119
        request_ids = self._get_ids_from_json(data)
120
        future: asyncio.Future = asyncio.Future()
121
122
        for request_id in request_ids:
123
            self._pending[request_id] = future
124
125
        try:
126
            await self.ws_connect.send_str(self.json_serialize(data), **kwargs)
127
        except ConnectionResetError as e:
128
            error = errors.ServerError(utils.get_exc_message(e)).with_traceback()
129
            self._notify_all_about_error(error)
130
            raise error
131
132
        if not request_ids:
133
            return None, None
134
135
        if self._timeout is not None:
136
            future = asyncio.wait_for(future, timeout=self._timeout)  # type: ignore
137
138
        result = await future
139
140
        return result, None
141
142
    @staticmethod
143
    def _get_ids_from_json(data: typing.Any) -> typing.Tuple[typedefs.JsonRpcIdType, ...]:
144
        if not data:
145
            return ()
146
147
        if isinstance(data, typing.Mapping) and data.get('id') is not None:
148
            return (
149
                data['id'],
150
            )
151
152
        if isinstance(data, typing.Sequence):
153
            return tuple(
154
                item['id']
155
                for item in data
156
                if isinstance(item, typing.Mapping) and item.get('id') is not None
157
            )
158
159
        return ()
160
161
    async def _handle_ws_messages(self) -> None:
162
        assert self.ws_connect is not None
163
164
        while True:
165
            try:
166
                ws_msg: http_websocket.WSMessage = await self.ws_connect.receive(
167
                    timeout=self._timeout_for_data_receiving,
168
                )
169
            except asyncio.TimeoutError:
170
                if self._is_closed:
171
                    break
172
                else:
173
                    continue
174
175
            if ws_msg.type in (
176
                http_websocket.WSMsgType.CLOSE,
177
                http_websocket.WSMsgType.CLOSING,
178
                http_websocket.WSMsgType.CLOSED,
179
            ):
180
                break
181
182
            if ws_msg.type != http_websocket.WSMsgType.TEXT:
183
                continue
184
185
            try:
186
                task = asyncio.create_task(self._handle_single_ws_message(ws_msg))
187
            except asyncio.CancelledError as e:
188
                error = errors.InternalError(utils.get_exc_message(e)).with_traceback()
189
                self._notify_all_about_error(error)
190
                break
191
            except Exception:
192
                logger.warning('Can\'t process WS message.', exc_info=True)
193
            else:
194
                # To avoid a task disappearing mid execution:
195
                self._background_tasks.add(task)
196
                task.add_done_callback(self._background_tasks.discard)
197
198
            if self._is_closed:
199
                break
200
201
    async def _check_ws_connection(self) -> None:
202
        assert self.ws_connect is not None
203
204
        try:
205
            while not self._is_closed:
206
                if self.ws_connect.closed:
207
                    error = errors.ServerError('Connection is closed')
208
                    self._notify_all_about_error(error)
209
                    break
210
211
                await asyncio.sleep(self._connection_check_interval)  # type: ignore
212
        except asyncio.CancelledError:
213
            pass
214
215
    async def _handle_single_ws_message(self, ws_msg: http_websocket.WSMessage) -> None:
216
        if ws_msg.type != http_websocket.WSMsgType.text:
217
            return
218
219
        try:
220
            json_response = self.json_deserialize(ws_msg.data)
221
        except Exception:
222
            logger.warning('Can\'t parse json.', exc_info=True)
223
            return
224
225
        if not json_response:
226
            return
227
228
        if isinstance(json_response, typing.Mapping):
229
            await self._handle_single_json_response(json_response, ws_msg=ws_msg)
230
            return
231
232
        if isinstance(json_response, typing.Sequence):
233
            await self._handle_json_responses(json_response, ws_msg=ws_msg)
234
            return
235
236
        logger.warning('Couldn\'t process the response.', extra={
237
            'json_response': json_response,
238
        })
239
240
    async def _handle_single_json_response(self, json_response: typing.Mapping, *, ws_msg: web_ws.WSMessage) -> None:
241
        if 'method' in json_response:
242
            if self._json_request_handler is not None:
243
                await self._json_request_handler(
244
                    ws_connect=self.ws_connect,
245
                    ws_msg=ws_msg,
246
                    json_request=json_response,
247
                )
248
        elif 'id' in json_response and json_response['id'] in self._pending:
249
            self._notify_about_result(json_response['id'], json_response)
250
        elif self._unprocessed_json_response_handler is not None:
251
            self._unprocessed_json_response_handler(
252
                ws_connect=self.ws_connect,
253
                ws_msg=ws_msg,
254
                json_response=json_response,
255
            )
256
257
    async def _handle_json_responses(self, json_responses: typing.Sequence, *, ws_msg: web_ws.WSMessage) -> None:
258
        if isinstance(json_responses[0], typing.Mapping) and 'method' in json_responses[0]:
259
            if self._json_request_handler is not None:
260
                await self._json_request_handler(ws_connect=self.ws_connect, ws_msg=ws_msg)
261
        else:
262
            response_ids = self._get_ids_from_json(json_responses)
263
264
            if response_ids:
265
                self._notify_about_results(response_ids, json_responses)
266
            elif self._unprocessed_json_response_handler is not None:
267
                self._unprocessed_json_response_handler(
268
                    ws_connect=self.ws_connect,
269
                    ws_msg=ws_msg,
270
                    json_response=json_responses,
271
                )
272
273
    def _notify_all_about_error(self, error: Exception) -> None:
274
        for future in self._pending.values():
275
            try:
276
                future.set_exception(error)
277
            except asyncio.InvalidStateError:
278
                pass
279
280
        self._pending.clear()
281
282
    def _notify_about_result(self, response_id: typedefs.JsonRpcIdType, json_response: typing.Mapping) -> None:
283
        future = self._pending.pop(response_id, None)
284
285
        if future is not None:
286
            future.set_result(json_response)
287
288
    def _notify_about_results(self,
289
                              response_ids: typing.Sequence[typedefs.JsonRpcIdType],
290
                              json_response: typing.Sequence) -> None:
291
        is_processed = False
292
293
        for response_id in response_ids:
294
            future = self._pending.pop(response_id, None)
295
296
            if future is not None and not is_processed:
297
                # We suppose that a batch result has the same ids that we sent.
298
                # And these ids have the same future.
299
300
                future.set_result(json_response)
301
                is_processed = True
302