WsJsonRpcClient.__init__()   A
last analyzed

Complexity

Conditions 1

Size

Total Lines 28
Code Lines 24

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 24
dl 0
loc 28
rs 9.304
c 0
b 0
f 0
cc 1
nop 11

How to fix   Many Parameters   

Many Parameters

Methods with many parameters are not only hard to understand, but their parameters also often become inconsistent when you need more, or different data.

There are several approaches to avoid long parameter lists:

1
import asyncio
2
import logging
3
import typing
4
5
from aiohttp import ClientSession, http_websocket, web_ws
6
7
from .base import BaseJsonRpcClient
8
from .. import errors, typedefs, utils
9
10
11
__all__ = (
12
    'WsJsonRpcClient',
13
)
14
15
logger = logging.getLogger(__name__)
16
17
18
class WsJsonRpcClient(BaseJsonRpcClient):
19
    url: typing.Optional[str]
20
    ws_connect: typing.Optional[typedefs.WSConnectType]
21
    ws_connect_kwargs: dict
22
    _timeout: typing.Optional[float]
23
    _timeout_for_data_receiving: typing.Optional[float]
24
    _connection_check_interval: typing.Optional[float]
25
    _pending: typing.Dict[typing.Any, asyncio.Future]
26
    _message_worker: typing.Optional[asyncio.Future] = None
27
    _check_worker: typing.Optional[asyncio.Future] = None
28
    _session_is_outer: bool
29
    _ws_connect_is_outer: bool
30
    _json_request_handler: typing.Optional[typing.Callable] = None
31
    _unprocessed_json_response_handler: typing.Optional[typing.Callable] = None
32
    _background_tasks: typing.Set
33
    _is_closed: bool = True
34
35
    def __init__(self,
36
                 url: typing.Optional[str] = None, *,
37
                 session: typing.Optional[ClientSession] = None,
38
                 ws_connect: typing.Optional[typedefs.WSConnectType] = None,
39
                 timeout: typing.Optional[float] = 60,
40
                 timeout_for_data_receiving: typing.Optional[float] = None,
41
                 connection_check_interval: typing.Optional[float] = 5,
42
                 json_request_handler: typing.Optional[typing.Callable] = None,
43
                 unprocessed_json_response_handler: typing.Optional[typing.Callable] = None,
44
                 **ws_connect_kwargs) -> None:
45
        assert (session is not None) or (url is not None and session is None) or (ws_connect is not None)
46
47
        self.url = url
48
        self._timeout = timeout
49
        self._timeout_for_data_receiving = timeout_for_data_receiving
50
        self._connection_check_interval = connection_check_interval
51
52
        self.session = session
53
        self._session_is_outer = session is not None  # We don't close an outer session.
54
55
        self.ws_connect = ws_connect
56
        self.ws_connect_kwargs = ws_connect_kwargs
57
        self._ws_connect_is_outer = ws_connect is not None  # We don't close an outer WS connection.
58
59
        self._pending = {}
60
        self._json_request_handler = json_request_handler
61
        self._unprocessed_json_response_handler = unprocessed_json_response_handler
62
        self._background_tasks = set()
63
64
    async def connect(self) -> None:
65
        self._is_closed = False
66
67
        if self.session is None and self.ws_connect is None:
68
            self.session = ClientSession(json_serialize=self.json_serialize)
69
70
        if self.ws_connect is None:
71
            assert self.url is not None and self.session is not None
72
73
            try:
74
                self.ws_connect = await self.session.ws_connect(self.url, **self.ws_connect_kwargs)
75
            except Exception:
76
                await self.disconnect()
77
                raise
78
79
        self._message_worker = asyncio.create_task(self._handle_ws_messages())
80
81
        if self._connection_check_interval is not None:
82
            self._check_worker = asyncio.create_task(self._check_ws_connection())
83
84
    async def disconnect(self) -> None:
85
        self._is_closed = True
86
87
        if self.ws_connect is not None and not self._ws_connect_is_outer:
88
            await self.ws_connect.close()
89
90
        if self.session is not None and not self._session_is_outer:
91
            await self.session.close()
92
93
        if self._message_worker is not None:
94
            if self._ws_connect_is_outer:
95
                await asyncio.wait_for(self._message_worker, timeout=60)
96
            else:
97
                await self._message_worker
98
99
        if self._check_worker is not None:
100
            self._check_worker.cancel()
101
            await self._check_worker
102
103
    async def send_json(self,
104
                        data: typing.Any, *,
105
                        without_response: bool = False,
106
                        **kwargs) -> typing.Tuple[typing.Any, typing.Optional[dict]]:
107
        assert self.ws_connect is not None
108
109
        if without_response:
110
            try:
111
                await self.ws_connect.send_str(self.json_serialize(data), **kwargs)
112
            except ConnectionResetError as e:
113
                error = errors.ServerError(utils.get_exc_message(e)).with_traceback()
114
                self._notify_all_about_error(error)
115
                raise error
116
117
            return None, None
118
119
        request_ids = self._get_ids_from_json(data)
120
        future: asyncio.Future = asyncio.Future()
121
122
        for request_id in request_ids:
123
            self._pending[request_id] = future
124
125
        try:
126
            await self.ws_connect.send_str(self.json_serialize(data), **kwargs)
127
        except ConnectionResetError as e:
128
            error = errors.ServerError(utils.get_exc_message(e)).with_traceback()
129
            self._notify_all_about_error(error)
130
            raise error
131
132
        if not request_ids:
133
            return None, None
134
135
        if self._timeout is not None:
136
            future = asyncio.wait_for(future, timeout=self._timeout)  # type: ignore
137
138
        result = await future
139
140
        return result, None
141
142
    @staticmethod
143
    def _get_ids_from_json(data: typing.Any) -> typing.Tuple[typedefs.JsonRpcIdType, ...]:
144
        if not data:
145
            return ()
146
147
        if isinstance(data, typing.Mapping) and data.get('id') is not None:
148
            return (
149
                data['id'],
150
            )
151
152
        if isinstance(data, typing.Sequence):
153
            return tuple(
154
                item['id']
155
                for item in data
156
                if isinstance(item, typing.Mapping) and item.get('id') is not None
157
            )
158
159
        return ()
160
161
    async def _handle_ws_messages(self) -> None:
162
        assert self.ws_connect is not None
163
164
        while True:
165
            try:
166
                ws_msg: http_websocket.WSMessage = await self.ws_connect.receive(
167
                    timeout=self._timeout_for_data_receiving,
168
                )
169
            except asyncio.TimeoutError:
170
                if self._is_closed:
171
                    break
172
                else:
173
                    continue
174
175
            if ws_msg.type in (
176
                http_websocket.WSMsgType.CLOSE,
177
                http_websocket.WSMsgType.CLOSING,
178
                http_websocket.WSMsgType.CLOSED,
179
            ):
180
                break
181
182
            if ws_msg.type != http_websocket.WSMsgType.TEXT:
183
                continue
184
185
            try:
186
                task = asyncio.create_task(self._handle_single_ws_message(ws_msg))
187
            except asyncio.CancelledError as e:
188
                error = errors.InternalError(utils.get_exc_message(e)).with_traceback()
189
                self._notify_all_about_error(error)
190
                break
191
            except Exception:
192
                logger.warning('Can\'t process WS message.', exc_info=True)
193
            else:
194
                # To avoid a task disappearing mid execution:
195
                self._background_tasks.add(task)
196
                task.add_done_callback(self._background_tasks.discard)
197
198
            if self._is_closed:
199
                break
200
201
    async def _check_ws_connection(self) -> None:
202
        assert self.ws_connect is not None
203
204
        try:
205
            while not self._is_closed:
206
                if self.ws_connect.closed:
207
                    error = errors.ServerError('Connection is closed')
208
                    self._notify_all_about_error(error)
209
                    break
210
211
                await asyncio.sleep(self._connection_check_interval)  # type: ignore
212
        except asyncio.CancelledError:
213
            pass
214
215
    async def _handle_single_ws_message(self, ws_msg: http_websocket.WSMessage) -> None:
216
        if ws_msg.type != http_websocket.WSMsgType.text:
217
            return
218
219
        try:
220
            json_response = self.json_deserialize(ws_msg.data)
221
        except Exception:
222
            logger.warning('Can\'t parse json.', exc_info=True)
223
            return
224
225
        if not json_response:
226
            return
227
228
        if isinstance(json_response, typing.Mapping):
229
            await self._handle_single_json_response(json_response, ws_msg=ws_msg)
230
            return
231
232
        if isinstance(json_response, typing.Sequence):
233
            await self._handle_json_responses(json_response, ws_msg=ws_msg)
234
            return
235
236
        logger.warning('Couldn\'t process the response.', extra={
237
            'json_response': json_response,
238
        })
239
240
    async def _handle_single_json_response(self, json_response: typing.Mapping, *, ws_msg: web_ws.WSMessage) -> None:
241
        if 'method' in json_response:
242
            if self._json_request_handler is not None:
243
                await self._json_request_handler(
244
                    ws_connect=self.ws_connect,
245
                    ws_msg=ws_msg,
246
                    json_request=json_response,
247
                )
248
        elif 'id' in json_response and json_response['id'] in self._pending:
249
            self._notify_about_result(json_response['id'], json_response)
250
        elif self._unprocessed_json_response_handler is not None:
251
            self._unprocessed_json_response_handler(
252
                ws_connect=self.ws_connect,
253
                ws_msg=ws_msg,
254
                json_response=json_response,
255
            )
256
257
    async def _handle_json_responses(self, json_responses: typing.Sequence, *, ws_msg: web_ws.WSMessage) -> None:
258
        if isinstance(json_responses[0], typing.Mapping) and 'method' in json_responses[0]:
259
            if self._json_request_handler is not None:
260
                await self._json_request_handler(ws_connect=self.ws_connect, ws_msg=ws_msg)
261
        else:
262
            response_ids = self._get_ids_from_json(json_responses)
263
264
            if response_ids:
265
                self._notify_about_results(response_ids, json_responses)
266
            elif self._unprocessed_json_response_handler is not None:
267
                self._unprocessed_json_response_handler(
268
                    ws_connect=self.ws_connect,
269
                    ws_msg=ws_msg,
270
                    json_response=json_responses,
271
                )
272
273
    def _notify_all_about_error(self, error: Exception) -> None:
274
        for future in self._pending.values():
275
            try:
276
                future.set_exception(error)
277
            except asyncio.InvalidStateError:
278
                pass
279
280
        self._pending.clear()
281
282
    def _notify_about_result(self, response_id: typedefs.JsonRpcIdType, json_response: typing.Mapping) -> None:
283
        future = self._pending.pop(response_id, None)
284
285
        if future is not None:
286
            future.set_result(json_response)
287
288
    def _notify_about_results(self,
289
                              response_ids: typing.Sequence[typedefs.JsonRpcIdType],
290
                              json_response: typing.Sequence) -> None:
291
        is_processed = False
292
293
        for response_id in response_ids:
294
            future = self._pending.pop(response_id, None)
295
296
            if future is not None and not is_processed:
297
                # We suppose that a batch result has the same ids that we sent.
298
                # And these ids have the same future.
299
300
                future.set_result(json_response)
301
                is_processed = True
302