Passed
Pull Request — master (#5)
by Michael
05:44
created

WsJsonRpcClient._handle_ws_message()   F

Complexity

Conditions 14

Size

Total Lines 42
Code Lines 32

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 32
dl 0
loc 42
rs 3.6
c 0
b 0
f 0
cc 14
nop 2

How to fix   Complexity   

Complexity

Complex classes like aiohttp_rpc.client.websocket.WsJsonRpcClient._handle_ws_message() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
import asyncio
2
import json
3
import logging
4
import typing
5
6
import aiohttp
7
from aiohttp import web_ws
8
9
from .base import BaseJsonRpcClient
10
from .. import errors, utils
11
12
13
__all__ = (
14
    'WsJsonRpcClient',
15
    'WsJsonRpcClientForResponse',
16
)
17
18
logger = logging.getLogger(__name__)
19
20
21
class WsJsonRpcClient(BaseJsonRpcClient):
22
    ws_connect: typing.Optional[aiohttp.ClientWebSocketResponse]
23
    notify_about_result: typing.Optional[typing.Callable] = None
24
    timeout: typing.Optional[int]
25
    ws_connect_kwargs: dict
26
    _pending: typing.Dict[typing.Any, asyncio.Future]
27
    _message_worker: typing.Optional[asyncio.Future] = None
28
    _session_is_outer: bool
29
    _ws_connect_is_outer: bool
30
    _json_request_handler: typing.Optional[typing.Callable] = None
31
    _unprocessed_json_response_handler: typing.Optional[typing.Callable] = None
32
33
    def __init__(self,
34
                 url: str, *,
35
                 session: typing.Optional[aiohttp.ClientSession] = None,
36
                 ws_connect: typing.Optional[aiohttp.ClientWebSocketResponse] = None,
37
                 timeout: typing.Optional[int] = 5,
38
                 json_request_handler: typing.Optional[typing.Callable] = None,
39
                 unprocessed_json_response_handler: typing.Optional[typing.Callable] = None,
40
                 **ws_connect_kwargs) -> None:
41
        self.url = url
42
        self.timeout = timeout
43
44
        self.session = session
45
        self._session_is_outer = session is not None
46
47
        self.ws_connect = ws_connect
48
        self.ws_connect_kwargs = ws_connect_kwargs
49
        self._ws_connect_is_outer = ws_connect is not None
50
51
        self._pending = {}
52
        self._json_request_handler = json_request_handler
53
        self._unprocessed_json_response_handler = unprocessed_json_response_handler
54
55
    async def connect(self) -> None:
56
        if not self.session:
57
            self.session = aiohttp.ClientSession(json_serialize=self.json_serialize)
58
59
        if not self.ws_connect:
60
            try:
61
                self.ws_connect = await self.session.ws_connect(self.url, **self.ws_connect_kwargs)
62
            except Exception:
63
                await self.disconnect()
64
                raise
65
66
        self._message_worker = asyncio.ensure_future(self._handle_ws_messages())
67
68
    async def disconnect(self) -> None:
69
        if self.ws_connect and not self._ws_connect_is_outer:
70
            await self.ws_connect.close()
71
72
        if self.session and not self._session_is_outer:
73
            await self.session.close()
74
75
        if self._message_worker:
76
            await self._message_worker
77
78
    async def send_json(self,
79
                        data: typing.Any, *,
80
                        without_response: bool = False) -> typing.Tuple[typing.Any, typing.Optional[dict]]:
81
        if without_response:
82
            await self.ws_connect.send_str(self.json_serialize(data))
83
            return None, None
84
85
        msg_ids = self._get_msg_ids_from_json(data)
86
        future = asyncio.Future()
87
88
        for msg_id in msg_ids:
89
            self._pending[msg_id] = future
90
91
        await self.ws_connect.send_str(self.json_serialize(data))
92
93
        if not msg_ids:
94
            return None, None
95
96
        if self.timeout is not None:
97
            future = asyncio.wait_for(future, timeout=self.timeout)
98
99
        result = await future
100
101
        return result, None
102
103
    def clear_pending(self) -> None:
104
        self._pending.clear()
105
106
    @staticmethod
107
    def _get_msg_ids_from_json(data: typing.Any) -> typing.Optional[list]:
108
        if not data:
109
            return []
110
111
        if isinstance(data, dict) and data.get('id') is not None:
112
            return [data['id']]
113
114
        if isinstance(data, list):
115
            return [
116
                item['id']
117
                for item in data
118
                if isinstance(item, dict) and item.get('id') is not None
119
            ]
120
121
        return []
122
123
    async def _handle_ws_messages(self) -> typing.NoReturn:
124
        while not self.ws_connect.closed:
125
            try:
126
                ws_msg = await self.ws_connect.receive()
127
                asyncio.ensure_future(self._handle_ws_message(ws_msg))
128
            except asyncio.CancelledError as e:
129
                error = errors.ServerError(utils.get_exc_message(e)).with_traceback()
130
                self._notify_all_about_error(error)
131
                raise
132
            except Exception as e:
133
                logger.exception(e)
134
135
    async def _handle_ws_message(self, ws_msg: aiohttp.WSMessage) -> None:
136
        if ws_msg.type != aiohttp.WSMsgType.text:
137
            return
138
139
        json_response = json.loads(ws_msg.data)
140
141
        if not json_response:
142
            return
143
144
        if isinstance(json_response, dict):
145
            if 'method' in json_response:
146
                if self._json_request_handler:
147
                    await self._json_request_handler(
148
                        ws_connect=self.ws_connect,
149
                        ws_msg=ws_msg,
150
                        json_request=json_response,
151
                    )
152
            elif 'id' in json_response and json_response['id'] in self._pending:
153
                self._notify_about_result(json_response['id'], json_response)
154
            elif self._unprocessed_json_response_handler:
155
                self._unprocessed_json_response_handler(
156
                    ws_connect=self.ws_connect,
157
                    ws_msg=ws_msg,
158
                    json_response=json_response,
159
                )
160
161
            return
162
163
        if isinstance(json_response, list):
164
            if isinstance(json_response[0], dict) and 'method' in json_response[0]:
165
                if self._json_request_handler:
166
                    await self._json_request_handler(ws_connect=self.ws_connect, ws_msg=ws_msg)
167
            else:
168
                msg_ids = self._get_msg_ids_from_json(json_response)
169
170
                if msg_ids:
171
                    self._notify_about_results(msg_ids, json_response)
172
                else:
173
                    self._unprocessed_json_response_handler(
174
                        ws_connect=self.ws_connect,
175
                        ws_msg=ws_msg,
176
                        json_response=json_response,
177
                    )
178
179
180
    def _notify_all_about_error(self, error: errors.JsonRpcError) -> None:
181
        for future in self._pending.values():
182
            future.set_exception(error)
183
184
        self.clear_pending()
185
186
    def _notify_about_result(self, msg_id: typing.Any, json_response: dict) -> None:
187
        future = self._pending.pop(msg_id, None)
188
189
        if future:
190
            future.set_result(json_response)
191
192
    def _notify_about_results(self, msg_ids: list, json_response: list) -> None:
193
        is_processed = False
194
195
        for msg_id in msg_ids:
196
            future = self._pending.pop(msg_id, None)
197
198
            if future and not is_processed:
199
                future.set_result(json_response)
200
                is_processed = True
201
202
203
class WsJsonRpcClientForResponse(BaseJsonRpcClient):
204
    ws_response: web_ws.WebSocketResponse
205
206
    def __init__(self, ws_response: web_ws.WebSocketResponse) -> None:
207
        self.ws_response = ws_response
208
209
    async def connect(self) -> None:
210
        pass
211
212
    async def disconnect(self) -> None:
213
        pass
214
215
    async def send_json(self,
216
                        data: typing.Any, *,
217
                        without_response: bool = False) -> typing.Tuple[typing.Any, typing.Optional[dict]]:
218
        assert without_response
219
220
        await self.ws_response.send_str(self.json_serialize(data))
221
        return None, None
222
223
    async def direct_call(self, *args, **kwargs):
224
        raise NotImplementedError
225
226
    async def direct_batch(self, *args, **kwargs):
227
        raise NotImplementedError
228