Passed
Pull Request — master (#5)
by Michael
05:44
created

WsJsonRpcClient.__init__()   A

Complexity

Conditions 1

Size

Total Lines 21
Code Lines 18

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 18
dl 0
loc 21
rs 9.5
c 0
b 0
f 0
cc 1
nop 9

How to fix   Many Parameters   

Many Parameters

Methods with many parameters are not only hard to understand, but their parameters also often become inconsistent when you need more, or different data.

There are several approaches to avoid long parameter lists:

1
import asyncio
2
import json
3
import logging
4
import typing
5
6
import aiohttp
7
from aiohttp import web_ws
8
9
from .base import BaseJsonRpcClient
10
from .. import errors, utils
11
12
13
__all__ = (
14
    'WsJsonRpcClient',
15
    'WsJsonRpcClientForResponse',
16
)
17
18
logger = logging.getLogger(__name__)
19
20
21
class WsJsonRpcClient(BaseJsonRpcClient):
22
    ws_connect: typing.Optional[aiohttp.ClientWebSocketResponse]
23
    notify_about_result: typing.Optional[typing.Callable] = None
24
    timeout: typing.Optional[int]
25
    ws_connect_kwargs: dict
26
    _pending: typing.Dict[typing.Any, asyncio.Future]
27
    _message_worker: typing.Optional[asyncio.Future] = None
28
    _session_is_outer: bool
29
    _ws_connect_is_outer: bool
30
    _json_request_handler: typing.Optional[typing.Callable] = None
31
    _unprocessed_json_response_handler: typing.Optional[typing.Callable] = None
32
33
    def __init__(self,
34
                 url: str, *,
35
                 session: typing.Optional[aiohttp.ClientSession] = None,
36
                 ws_connect: typing.Optional[aiohttp.ClientWebSocketResponse] = None,
37
                 timeout: typing.Optional[int] = 5,
38
                 json_request_handler: typing.Optional[typing.Callable] = None,
39
                 unprocessed_json_response_handler: typing.Optional[typing.Callable] = None,
40
                 **ws_connect_kwargs) -> None:
41
        self.url = url
42
        self.timeout = timeout
43
44
        self.session = session
45
        self._session_is_outer = session is not None
46
47
        self.ws_connect = ws_connect
48
        self.ws_connect_kwargs = ws_connect_kwargs
49
        self._ws_connect_is_outer = ws_connect is not None
50
51
        self._pending = {}
52
        self._json_request_handler = json_request_handler
53
        self._unprocessed_json_response_handler = unprocessed_json_response_handler
54
55
    async def connect(self) -> None:
56
        if not self.session:
57
            self.session = aiohttp.ClientSession(json_serialize=self.json_serialize)
58
59
        if not self.ws_connect:
60
            try:
61
                self.ws_connect = await self.session.ws_connect(self.url, **self.ws_connect_kwargs)
62
            except Exception:
63
                await self.disconnect()
64
                raise
65
66
        self._message_worker = asyncio.ensure_future(self._handle_ws_messages())
67
68
    async def disconnect(self) -> None:
69
        if self.ws_connect and not self._ws_connect_is_outer:
70
            await self.ws_connect.close()
71
72
        if self.session and not self._session_is_outer:
73
            await self.session.close()
74
75
        if self._message_worker:
76
            await self._message_worker
77
78
    async def send_json(self,
79
                        data: typing.Any, *,
80
                        without_response: bool = False) -> typing.Tuple[typing.Any, typing.Optional[dict]]:
81
        if without_response:
82
            await self.ws_connect.send_str(self.json_serialize(data))
83
            return None, None
84
85
        msg_ids = self._get_msg_ids_from_json(data)
86
        future = asyncio.Future()
87
88
        for msg_id in msg_ids:
89
            self._pending[msg_id] = future
90
91
        await self.ws_connect.send_str(self.json_serialize(data))
92
93
        if not msg_ids:
94
            return None, None
95
96
        if self.timeout is not None:
97
            future = asyncio.wait_for(future, timeout=self.timeout)
98
99
        result = await future
100
101
        return result, None
102
103
    def clear_pending(self) -> None:
104
        self._pending.clear()
105
106
    @staticmethod
107
    def _get_msg_ids_from_json(data: typing.Any) -> typing.Optional[list]:
108
        if not data:
109
            return []
110
111
        if isinstance(data, dict) and data.get('id') is not None:
112
            return [data['id']]
113
114
        if isinstance(data, list):
115
            return [
116
                item['id']
117
                for item in data
118
                if isinstance(item, dict) and item.get('id') is not None
119
            ]
120
121
        return []
122
123
    async def _handle_ws_messages(self) -> typing.NoReturn:
124
        while not self.ws_connect.closed:
125
            try:
126
                ws_msg = await self.ws_connect.receive()
127
                asyncio.ensure_future(self._handle_ws_message(ws_msg))
128
            except asyncio.CancelledError as e:
129
                error = errors.ServerError(utils.get_exc_message(e)).with_traceback()
130
                self._notify_all_about_error(error)
131
                raise
132
            except Exception as e:
133
                logger.exception(e)
134
135
    async def _handle_ws_message(self, ws_msg: aiohttp.WSMessage) -> None:
136
        if ws_msg.type != aiohttp.WSMsgType.text:
137
            return
138
139
        json_response = json.loads(ws_msg.data)
140
141
        if not json_response:
142
            return
143
144
        if isinstance(json_response, dict):
145
            if 'method' in json_response:
146
                if self._json_request_handler:
147
                    await self._json_request_handler(
148
                        ws_connect=self.ws_connect,
149
                        ws_msg=ws_msg,
150
                        json_request=json_response,
151
                    )
152
            elif 'id' in json_response and json_response['id'] in self._pending:
153
                self._notify_about_result(json_response['id'], json_response)
154
            elif self._unprocessed_json_response_handler:
155
                self._unprocessed_json_response_handler(
156
                    ws_connect=self.ws_connect,
157
                    ws_msg=ws_msg,
158
                    json_response=json_response,
159
                )
160
161
            return
162
163
        if isinstance(json_response, list):
164
            if isinstance(json_response[0], dict) and 'method' in json_response[0]:
165
                if self._json_request_handler:
166
                    await self._json_request_handler(ws_connect=self.ws_connect, ws_msg=ws_msg)
167
            else:
168
                msg_ids = self._get_msg_ids_from_json(json_response)
169
170
                if msg_ids:
171
                    self._notify_about_results(msg_ids, json_response)
172
                else:
173
                    self._unprocessed_json_response_handler(
174
                        ws_connect=self.ws_connect,
175
                        ws_msg=ws_msg,
176
                        json_response=json_response,
177
                    )
178
179
180
    def _notify_all_about_error(self, error: errors.JsonRpcError) -> None:
181
        for future in self._pending.values():
182
            future.set_exception(error)
183
184
        self.clear_pending()
185
186
    def _notify_about_result(self, msg_id: typing.Any, json_response: dict) -> None:
187
        future = self._pending.pop(msg_id, None)
188
189
        if future:
190
            future.set_result(json_response)
191
192
    def _notify_about_results(self, msg_ids: list, json_response: list) -> None:
193
        is_processed = False
194
195
        for msg_id in msg_ids:
196
            future = self._pending.pop(msg_id, None)
197
198
            if future and not is_processed:
199
                future.set_result(json_response)
200
                is_processed = True
201
202
203
class WsJsonRpcClientForResponse(BaseJsonRpcClient):
204
    ws_response: web_ws.WebSocketResponse
205
206
    def __init__(self, ws_response: web_ws.WebSocketResponse) -> None:
207
        self.ws_response = ws_response
208
209
    async def connect(self) -> None:
210
        pass
211
212
    async def disconnect(self) -> None:
213
        pass
214
215
    async def send_json(self,
216
                        data: typing.Any, *,
217
                        without_response: bool = False) -> typing.Tuple[typing.Any, typing.Optional[dict]]:
218
        assert without_response
219
220
        await self.ws_response.send_str(self.json_serialize(data))
221
        return None, None
222
223
    async def direct_call(self, *args, **kwargs):
224
        raise NotImplementedError
225
226
    async def direct_batch(self, *args, **kwargs):
227
        raise NotImplementedError
228