Passed
Pull Request — master (#5)
by Michael
05:44
created

WsJsonRpcServer.handle_ws_message()   A

Complexity

Conditions 5

Size

Total Lines 22
Code Lines 17

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 17
dl 0
loc 22
rs 9.0833
c 0
b 0
f 0
cc 5
nop 5
1
import asyncio
2
import json
3
import typing
4
import weakref
5
6
from aiohttp import http_websocket, web, web_ws
7
8
from .base import BaseJsonRpcServer
9
from .. import errors, protocol, utils
10
11
12
__all__ = (
13
    'WsJsonRpcServer',
14
)
15
16
17
class WsJsonRpcServer(BaseJsonRpcServer):
18
    rcp_websockets: weakref.WeakSet
19
20
    def __init__(self, *args, **kwargs) -> None:
21
        super().__init__(*args, **kwargs)
22
23
        self.rcp_websockets = weakref.WeakSet()
24
25
    async def handle_http_request(self, http_request: web.Request) -> web.StreamResponse:
26
        if http_request.method != 'GET' or http_request.headers.get('upgrade', '').lower() != 'websocket':
27
            raise web.HTTPMethodNotAllowed(method=http_request.method, allowed_methods=('GET',))
28
29
        return await self.handle_websocket_request(http_request)
30
31
    async def handle_websocket_request(self, http_request: web.Request) -> web_ws.WebSocketResponse:
32
        ws_connect = web_ws.WebSocketResponse()
33
        await ws_connect.prepare(http_request)
34
35
        self.rcp_websockets.add(ws_connect)
36
37
        async for ws_msg in ws_connect:
38
            if ws_msg.type == http_websocket.WSMsgType.TEXT:
39
                coro = self.handle_ws_message(
40
                    ws_msg=ws_msg,
41
                    ws_connect=ws_connect,
42
                    http_request=http_request,
43
                )
44
                asyncio.ensure_future(coro)  # TODO: asyncio.create_task(coro) in Python 3.7+
45
            elif ws_msg.type == http_websocket.WSMsgType.ERROR:
46
                break
47
48
        return ws_connect
49
50
    async def on_shutdown(self, app: web.Application) -> None:
51
        # https://docs.aiohttp.org/en/stable/web_advanced.html#graceful-shutdown
52
53
        for ws in self.rcp_websockets:
54
            await ws.close(code=http_websocket.WSCloseCode.GOING_AWAY, message='Server shutdown')
55
56
        self.rcp_websockets.clear()
57
58
    async def handle_ws_message(self,
59
                                ws_msg: web_ws.WSMessage, *,
60
                                ws_connect: web_ws.WebSocketResponse,
61
                                http_request: typing.Optional[web.Request] = None) -> None:
62
        try:
63
            input_data = json.loads(ws_msg.data)
64
        except json.JSONDecodeError as e:
65
            response = protocol.JsonRpcResponse(error=errors.ParseError(utils.get_exc_message(e)))
66
            json_response = response.to_dict()
67
        else:
68
            json_response = await self._process_input_data(input_data, context={
69
                'http_request': http_request,
70
                'ws_connect': ws_connect,
71
            })
72
73
        if json_response is None:
74
            return
75
76
        if ws_connect.closed:
77
            raise errors.ServerError('WS is closed.')
78
79
        await ws_connect.send_str(self.json_serialize(json_response))
80