Passed
Push — master ( 135e2c...0076f8 )
by Michael
09:26 queued 05:17
created

WsJsonRpcServer.__init__()   A

Complexity

Conditions 1

Size

Total Lines 4
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 3
dl 0
loc 4
rs 10
c 0
b 0
f 0
cc 1
nop 3
1
import asyncio
2
import json
3
import weakref
4
5
from aiohttp import http_websocket, web, web_ws
6
7
from .base import BaseJsonRpcServer
8
9
10
__all__ = (
11
    'WsJsonRpcServer',
12
)
13
14
15
class WsJsonRpcServer(BaseJsonRpcServer):
16
    rcp_websockets: weakref.WeakSet
17
18
    def __init__(self, *args, **kwargs) -> None:
19
        super().__init__(*args, **kwargs)
20
21
        self.rcp_websockets = weakref.WeakSet()
22
23
    async def handle_http_request(self, http_request: web.Request) -> web.StreamResponse:
24
        if http_request.method != 'GET' or http_request.headers.get('upgrade', '').lower() != 'websocket':
25
            raise web.HTTPMethodNotAllowed(method=http_request.method, allowed_methods=('GET',))
26
27
        return await self.handle_websocket_request(http_request)
28
29
    async def handle_websocket_request(self, http_request: web.Request) -> web_ws.WebSocketResponse:
30
        ws = web_ws.WebSocketResponse()
31
        await ws.prepare(http_request)
32
33
        self.rcp_websockets.add(ws)
34
35
        async for ws_msg in ws:
36
            if ws_msg.type == http_websocket.WSMsgType.TEXT:
37
                coro = self._handle_ws_msg(
38
                    ws=ws,
39
                    http_request=http_request,
40
                    ws_msg=ws_msg,
41
                )
42
                asyncio.ensure_future(coro)  # asyncio.create_task(coro) in Python 3.7+
43
            elif ws_msg.type == http_websocket.WSMsgType.ERROR:
44
                break
45
46
        return ws
47
48
    async def on_shutdown(self, app: web.Application) -> None:
49
        # https://docs.aiohttp.org/en/stable/web_advanced.html#graceful-shutdown
50
51
        for ws in self.rcp_websockets:
52
            await ws.close(code=http_websocket.WSCloseCode.GOING_AWAY, message='Server shutdown')
53
54
        self.rcp_websockets.clear()
55
56
    async def _handle_ws_msg(self, *,
57
                             ws: web_ws.WebSocketResponse,
58
                             http_request: web.Request,
59
                             ws_msg: web_ws.WSMessage) -> None:
60
        input_data = json.loads(ws_msg.data)
61
        output_data = await self._process_input_data(input_data, http_request=http_request)
62
63
        if not ws.closed:
64
            await ws.send_str(self.json_serialize(output_data))
65