Passed
Push — master ( 0d356b...652bd4 )
by Michael
11:41 queued 05:19
created

WsJsonRpcServer._handle_ws_request()   A

Complexity

Conditions 3

Size

Total Lines 26
Code Lines 18

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 18
dl 0
loc 26
rs 9.5
c 0
b 0
f 0
cc 3
nop 2
1
import asyncio
2
import json
3
import typing
4
import weakref
5
6
from aiohttp import http_websocket, web, web_ws
7
8
from .base import BaseJsonRpcServer
9
from .. import errors, protocol, utils
10
11
12
__all__ = (
13
    'WsJsonRpcServer',
14
)
15
16
17
class WsJsonRpcServer(BaseJsonRpcServer):
18
    rcp_websockets: weakref.WeakSet
19
    _json_response_handler: typing.Optional[typing.Callable] = None
20
21
    def __init__(self,
22
                 *args,
23
                 json_response_handler: typing.Optional[typing.Callable] = None,
24
                 **kwargs) -> None:
25
        super().__init__(*args, **kwargs)
26
27
        self.rcp_websockets = weakref.WeakSet()
28
        self._json_response_handler = json_response_handler
29
30
    async def handle_http_request(self, http_request: web.Request) -> web.StreamResponse:
31
        if http_request.method != 'GET' or http_request.headers.get('upgrade', '').lower() != 'websocket':
32
            raise web.HTTPMethodNotAllowed(method=http_request.method, allowed_methods=('GET',))
33
34
        return await self._handle_ws_request(http_request)
35
36
    async def on_shutdown(self, app: web.Application) -> None:
37
        # https://docs.aiohttp.org/en/stable/web_advanced.html#graceful-shutdown
38
39
        for ws in self.rcp_websockets:
40
            await ws.close(code=http_websocket.WSCloseCode.GOING_AWAY, message='Server shutdown')
41
42
        self.rcp_websockets.clear()
43
44
    async def _handle_ws_request(self, http_request: web.Request) -> web_ws.WebSocketResponse:
45
        from aiohttp_rpc import WsJsonRpcClient
46
47
        ws_connect = web_ws.WebSocketResponse()
48
        await ws_connect.prepare(http_request)
49
50
        self.rcp_websockets.add(ws_connect)
51
52
        ws_rpc_client = WsJsonRpcClient(ws_connect=ws_connect)
53
54
        async for ws_msg in ws_connect:
55
            if ws_msg.type != http_websocket.WSMsgType.TEXT:
56
                continue
57
58
            coro = self._handle_ws_message(
59
                ws_msg=ws_msg,
60
                ws_connect=ws_connect,
61
                context={
62
                    'http_request': http_request,
63
                    'ws_connect': ws_connect,
64
                    'ws_rpc_client': ws_rpc_client,
65
                },
66
            )
67
            asyncio.ensure_future(coro)  # TODO: asyncio.create_task(coro) in Python 3.7+
68
69
        return ws_connect
70
71
    async def _handle_ws_message(self,
72
                                 ws_msg: web_ws.WSMessage, *,
73
                                 ws_connect: web_ws.WebSocketResponse,
74
                                 context: dict) -> None:
75
        try:
76
            input_data = json.loads(ws_msg.data)
77
        except json.JSONDecodeError as e:
78
            response = protocol.JsonRpcResponse(error=errors.ParseError(utils.get_exc_message(e)))
79
            json_response = response.to_dict()
80
        else:
81
            json_response = await self._process_input_data(input_data, context=context)
82
83
        if json_response is None:
84
            return
85
86
        if ws_connect.closed:
87
            raise errors.ServerError('WS is closed.')
88
89
        await ws_connect.send_str(self.json_serialize(json_response))
90