|
1
|
|
|
import json |
|
2
|
|
|
import weakref |
|
3
|
|
|
|
|
4
|
|
|
from aiohttp import http_websocket, web, web_ws |
|
5
|
|
|
|
|
6
|
|
|
from .base import BaseJsonRpcServer |
|
7
|
|
|
|
|
8
|
|
|
|
|
9
|
|
|
__all__ = ( |
|
10
|
|
|
'WsJsonRpcServer', |
|
11
|
|
|
) |
|
12
|
|
|
|
|
13
|
|
|
|
|
14
|
|
|
class WsJsonRpcServer(BaseJsonRpcServer): |
|
15
|
|
|
ws_state_key = 'rcp_websockets' |
|
16
|
|
|
|
|
17
|
|
|
async def handle_http_request(self, http_request: web.Request) -> web.StreamResponse: |
|
18
|
|
|
if http_request.method != 'GET' or http_request.headers.get('upgrade', '').lower() != 'websocket': |
|
19
|
|
|
return web.HTTPMethodNotAllowed(method=http_request.method, allowed_methods=('POST',)) |
|
20
|
|
|
|
|
21
|
|
|
if self.ws_state_key not in http_request.app: |
|
22
|
|
|
http_request.app[self.ws_state_key] = weakref.WeakSet() |
|
23
|
|
|
|
|
24
|
|
|
return await self.handle_websocket_request(http_request) |
|
25
|
|
|
|
|
26
|
|
|
async def handle_websocket_request(self, http_request: web.Request) -> web_ws.WebSocketResponse: |
|
27
|
|
|
http_request.msg_id = 0 |
|
28
|
|
|
http_request.pending = {} |
|
29
|
|
|
|
|
30
|
|
|
ws = web_ws.WebSocketResponse() |
|
31
|
|
|
await ws.prepare(http_request) |
|
32
|
|
|
http_request['ws'] = ws |
|
33
|
|
|
http_request.app[self.ws_state_key].add(ws) |
|
34
|
|
|
|
|
35
|
|
|
while not ws.closed: |
|
36
|
|
|
ws_msg = await ws.receive() |
|
37
|
|
|
|
|
38
|
|
|
if ws_msg.type != http_websocket.WSMsgType.TEXT: |
|
39
|
|
|
continue |
|
40
|
|
|
|
|
41
|
|
|
await self._handle_ws_msg(http_request, ws_msg) |
|
42
|
|
|
|
|
43
|
|
|
http_request.app[self.ws_state_key].discard(ws) |
|
44
|
|
|
return ws |
|
45
|
|
|
|
|
46
|
|
|
async def on_shutdown(self, app: web.Application) -> None: |
|
47
|
|
|
# https://docs.aiohttp.org/en/stable/web_advanced.html#graceful-shutdown |
|
48
|
|
|
|
|
49
|
|
|
if self.ws_state_key not in app: |
|
50
|
|
|
return |
|
51
|
|
|
|
|
52
|
|
|
for ws in set(app[self.ws_state_key]): |
|
53
|
|
|
await ws.close(code=http_websocket.WSCloseCode.GOING_AWAY, message='Server shutdown') |
|
54
|
|
|
|
|
55
|
|
|
app[self.ws_state_key].clear() |
|
56
|
|
|
|
|
57
|
|
|
async def _handle_ws_msg(self, http_request: web.Request, ws_msg: web_ws.WSMessage) -> None: |
|
58
|
|
|
input_data = json.loads(ws_msg.data) |
|
59
|
|
|
output_data = await self._process_input_data(input_data, http_request=http_request) |
|
60
|
|
|
ws = http_request['ws'] |
|
61
|
|
|
|
|
62
|
|
|
if ws._writer.transport.is_closing(): |
|
63
|
|
|
await ws.close() |
|
64
|
|
|
http_request.app[self.ws_state_key].discard(ws) |
|
65
|
|
|
|
|
66
|
|
|
await http_request['ws'].send_str(self.json_serialize(output_data)) |
|
67
|
|
|
|