Test Failed
Pull Request — master (#3)
by Michael
11:49
created

WsJsonRpcServer.__init__()   A

Complexity

Conditions 1

Size

Total Lines 4
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 3
dl 0
loc 4
rs 10
c 0
b 0
f 0
cc 1
nop 3
1
import asyncio
2
import json
3
import weakref
4
5
from aiohttp import http_websocket, web, web_ws
6
7
from .base import BaseJsonRpcServer
8
9
10
__all__ = (
11
    'WsJsonRpcServer',
12
)
13
14
15
class WsJsonRpcServer(BaseJsonRpcServer):
16
    rcp_websockets: weakref.WeakSet
17
18
    def __init__(self, *args, **kwargs) -> None:
19
        super().__init__(*args, **kwargs)
20
21
        self.rcp_websockets = weakref.WeakSet()
22
23
    async def handle_http_request(self, http_request: web.Request) -> web.StreamResponse:
24
        if http_request.method != 'GET' or http_request.headers.get('upgrade', '').lower() != 'websocket':
25
            return web.HTTPMethodNotAllowed(method=http_request.method, allowed_methods=('POST',))
26
27
        return await self.handle_websocket_request(http_request)
28
29
    async def handle_websocket_request(self, http_request: web.Request) -> web_ws.WebSocketResponse:
30
        ws = web.WebSocketResponse()
31
        await ws.prepare(http_request)
32
33
        self.rcp_websockets.add(ws)
34
35
        async for ws_msg in ws:
36
            if ws_msg.type == http_websocket.WSMsgType.TEXT:
37
                asyncio.create_task(self._handle_ws_msg(
38
                    ws=ws,
39
                    http_request=http_request,
40
                    ws_msg=ws_msg,
41
                ))
42
            elif ws_msg.type == http_websocket.WSMsgType.ERROR:
43
                break
44
45
        return ws
46
47
    async def on_shutdown(self, app: web.Application) -> None:
48
        # https://docs.aiohttp.org/en/stable/web_advanced.html#graceful-shutdown
49
50
        for ws in self.rcp_websockets:
51
            await ws.close(code=http_websocket.WSCloseCode.GOING_AWAY, message='Server shutdown')
52
53
        self.rcp_websockets.clear()
54
55
    async def _handle_ws_msg(self, *,
56
                             ws: web_ws.WebSocketResponse,
57
                             http_request: web.Request,
58
                             ws_msg: web_ws.WSMessage) -> None:
59
        input_data = json.loads(ws_msg.data)
60
        output_data = await self._process_input_data(input_data, http_request=http_request)
61
62
        if not ws.closed:
63
            await ws.send_str(self.json_serialize(output_data))
64