aiohttp_rpc.server.http   A
last analyzed

Complexity

Total Complexity 3

Size/Duplication

Total Lines 33
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 3
eloc 21
dl 0
loc 33
rs 10
c 0
b 0
f 0

1 Method

Rating   Name   Duplication   Size   Complexity  
A JsonRpcServer.handle_http_request() 0 13 3
1
import json
2
3
from aiohttp import web
4
5
from .base import BaseJsonRpcServer
6
from .. import errors, middlewares, protocol, utils
7
8
9
__all__ = (
10
    'JsonRpcServer',
11
    'rpc_server',
12
)
13
14
15
class JsonRpcServer(BaseJsonRpcServer):
16
    async def handle_http_request(self, http_request: web.Request) -> web.Response:
17
        if http_request.method != 'POST':
18
            raise web.HTTPMethodNotAllowed(method=http_request.method, allowed_methods=('POST',))
19
20
        try:
21
            input_data = await http_request.json()
22
        except json.JSONDecodeError as e:
23
            response = protocol.JsonRpcResponse(error=errors.ParseError(utils.get_exc_message(e)))
24
            return web.json_response(response.dump(), dumps=self.json_serialize)
25
26
        output_data = await self._process_input_data(input_data, context={'http_request': http_request})
27
28
        return web.json_response(output_data, dumps=self.json_serialize)
29
30
31
rpc_server = JsonRpcServer(
32
    middlewares=middlewares.DEFAULT_MIDDLEWARES,
33
)
34