Passed
Push — master ( 55adfd...84b7e9 )
by Michael
03:30
created

aiohttp_rpc.server.websocket   A

Complexity

Total Complexity 12

Size/Duplication

Total Lines 67
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 12
eloc 43
dl 0
loc 67
rs 10
c 0
b 0
f 0

4 Methods

Rating   Name   Duplication   Size   Complexity  
A WsJsonRpcServer.on_shutdown() 0 10 3
A WsJsonRpcServer._handle_ws_msg() 0 10 2
A WsJsonRpcServer.handle_websocket_request() 0 19 3
A WsJsonRpcServer.handle_http_request() 0 8 4
1
import json
2
import weakref
3
4
from aiohttp import http_websocket, web, web_ws
5
6
from .base import BaseJsonRpcServer
7
8
9
__all__ = (
10
    'WsJsonRpcServer',
11
)
12
13
14
class WsJsonRpcServer(BaseJsonRpcServer):
15
    ws_state_key = 'rcp_websockets'
16
17
    async def handle_http_request(self, http_request: web.Request) -> web.StreamResponse:
18
        if http_request.method != 'GET' or http_request.headers.get('upgrade', '').lower() != 'websocket':
19
            return web.HTTPMethodNotAllowed(method=http_request.method, allowed_methods=('POST',))
20
21
        if self.ws_state_key not in http_request.app:
22
            http_request.app[self.ws_state_key] = weakref.WeakSet()
23
24
        return await self.handle_websocket_request(http_request)
25
26
    async def handle_websocket_request(self, http_request: web.Request) -> web_ws.WebSocketResponse:
27
        http_request.msg_id = 0
28
        http_request.pending = {}
29
30
        ws = web_ws.WebSocketResponse()
31
        await ws.prepare(http_request)
32
        http_request['ws'] = ws
33
        http_request.app[self.ws_state_key].add(ws)
34
35
        while not ws.closed:
36
            ws_msg = await ws.receive()
37
38
            if ws_msg.type != http_websocket.WSMsgType.TEXT:
39
                continue
40
41
            await self._handle_ws_msg(http_request, ws_msg)
42
43
        http_request.app[self.ws_state_key].discard(ws)
44
        return ws
45
46
    async def on_shutdown(self, app: web.Application) -> None:
47
        # https://docs.aiohttp.org/en/stable/web_advanced.html#graceful-shutdown
48
49
        if self.ws_state_key not in app:
50
            return
51
52
        for ws in set(app[self.ws_state_key]):
53
            await ws.close(code=http_websocket.WSCloseCode.GOING_AWAY, message='Server shutdown')
54
55
        app[self.ws_state_key].clear()
56
57
    async def _handle_ws_msg(self, http_request: web.Request, ws_msg: web_ws.WSMessage) -> None:
58
        input_data = json.loads(ws_msg.data)
59
        output_data = await self._process_input_data(input_data, http_request=http_request)
60
        ws = http_request['ws']
61
62
        if ws._writer.transport.is_closing():
63
            await ws.close()
64
            http_request.app[self.ws_state_key].discard(ws)
65
66
        await http_request['ws'].send_str(self.json_serialize(output_data))
67