Passed
Push — master ( 863a60...41e8a0 )
by Michael
15:33
created

aiohttp_rpc.utils   A

Complexity

Total Complexity 16

Size/Duplication

Total Lines 70
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 16
eloc 45
dl 0
loc 70
rs 10
c 0
b 0
f 0

5 Functions

Rating   Name   Duplication   Size   Complexity  
A validate_jsonrpc() 0 3 2
A get_exc_message() 0 2 1
A get_random_id() 0 2 1
B parse_args_and_kwargs() 0 20 7
A convert_params_to_args_and_kwargs() 0 14 5
1
import json
2
import typing
3
import uuid
4
from functools import partial
5
from traceback import format_exception_only
6
7
from . import constants, errors
8
9
10
__all__ = (
11
    'convert_params_to_args_and_kwargs',
12
    'parse_args_and_kwargs',
13
    'get_exc_message',
14
    'json_serialize',
15
)
16
17
18
def convert_params_to_args_and_kwargs(params: typing.Any) -> typing.Tuple[list, dict]:
19
    if params is constants.NOTHING:
20
        return [], {}
21
22
    if isinstance(params, constants.JSON_PRIMITIVE_TYPES):
23
        return [params], {}
24
25
    if isinstance(params, list):
26
        return params, {}
27
28
    if isinstance(params, dict):
29
        return [], params
30
31
    raise errors.InvalidParams('Params have unsupported data types.')
32
33
34
def parse_args_and_kwargs(args: typing.Any, kwargs: typing.Any) -> typing.Tuple:
35
    has_args = bool(args and args is not constants.NOTHING)
36
    has_kwargs = bool(kwargs and kwargs is not constants.NOTHING)
37
38
    if not has_args and not has_kwargs:
39
        return constants.NOTHING, [], {}
40
41
    if not (has_args ^ has_kwargs):
42
        raise errors.InvalidParams('Need use args or kwargs.')
43
44
    if has_args:
45
        args = list(args)
46
47
        if len(args) == 1 and isinstance(args[0], constants.JSON_PRIMITIVE_TYPES):
48
            return args[0], args, {}
49
50
        return args, args, {}
51
52
    kwargs = dict(kwargs)
53
    return kwargs, [], kwargs
54
55
56
def get_random_id() -> str:
57
    return str(uuid.uuid4())
58
59
60
def get_exc_message(exp: Exception) -> str:
61
    return ''.join(format_exception_only(exp.__class__, exp)).strip()
62
63
64
def validate_jsonrpc(jsonrpc: typing.Any) -> None:
65
    if jsonrpc != constants.VERSION_2_0:
66
        raise errors.InvalidRequest(f'Only version "{constants.VERSION_2_0}" is supported.')
67
68
69
json_serialize = partial(json.dumps, default=lambda x: repr(x))
70
71
72
# def is_json_rpc_request(data: typing.Any) -> bool:
73
#     if isinstance(data, dict) and 'method' in data:
74
#         return True
75
#
76
#     if isinstance(data, list) and len(data) > 0 and isinstance(data[0], dict) and 'method' in data[0]:
77
#         return True
78
#
79
#     return False
80
81
82
# def is_json_rpc_response(data: typing.Any) -> bool:
83
#     if isinstance(data, dict) and ('result' in data or 'error' in data):
84
#         return True
85
#
86
#     if isinstance(data, list) and len(data) > 0 and isinstance(data[0], dict) and ('result' in data[0] or 'error' in data[0]):
87
#         return True
88
#
89
#     return False
90
91
92
# def is_json_batch(data: typing.Any) -> bool:
93
#     return isinstance(data, list)
94
95
96
# class WebSocketConnection:
97
#     _ws_connect: WSConnectType
98
#     _message_worker: typing.Optional[asyncio.Future] = None
99
#     _handlers: typing.List[typing.Callable]
100
#
101
#     def __init__(self, ws_connect: WSConnectType) -> None:
102
#         self._ws_connect = ws_connect
103
#         self._handlers = []
104
#
105
#     def add_handler(self, handler: typing.Callable) -> None:
106
#         self._handlers.append(handler)
107
#
108
#     async def open(self) -> None:
109
#         self._message_worker = asyncio.ensure_future(self._handle_ws_messages())
110
#
111
#     async def close(self) -> None:
112
#         await self._ws_connect.close()
113
#
114
#         if self._message_worker:
115
#             await self._message_worker
116
#
117
#     @property
118
#     def closed(self) -> bool:
119
#         return self._ws_connect.closed
120
#
121
#     async def send_str(self, message: typing.Union[str, bytes]) -> None:
122
#         await self._ws_connect.send_str(message)
123
#
124
#     async def _handle_ws_messages(self):
125
#         async for ws_msg in self._ws_connect:
126
#             if ws_msg.type == http_websocket.WSMsgType.BINARY:
127
#                 raw_data = ws_msg.data.decode()
128
#             elif ws_msg.type == http_websocket.WSMsgType.TEXT:
129
#                 raw_data = ws_msg.data
130
#             else:
131
#                 continue
132
#
133
#             parsed_data, exception = self._parse_raw_data(raw_data)
134
#
135
#             if exception is not None:
136
#                 await self._call_handlers(
137
#                     parsed_data=parsed_data,
138
#                     prepared_data=None,
139
#                     exception=exception,
140
#                 )
141
#                 return
142
#
143
#             try:
144
#                 prepared_data = self._prepare_data(parsed_data)
145
#             except Exception as e:
146
#                 await self._call_handlers(
147
#                     parsed_data=parsed_data,
148
#                     prepared_data=None,
149
#                     exception=e,
150
#                 )
151
#                 return
152
#
153
#             await self._call_handlers(
154
#                 parsed_data=parsed_data,
155
#                 prepared_data=prepared_data,
156
#                 exception=None,
157
#             )
158
#
159
#     async def _call_handlers(self, **kwargs) -> None:
160
#         for handler in self._handlers:
161
#             await handler(**kwargs)
162
#
163
#     @staticmethod
164
#     def _parse_raw_data(raw_data: str) -> typing.Tuple:
165
#         try:
166
#             input_data = json.loads(raw_data)
167
#         except json.JSONDecodeError as e:
168
#             exception = e
169
#             input_data = None
170
#         else:
171
#             exception = None
172
#
173
#         return input_data, exception
174
#
175
#     @staticmethod
176
#     def _prepare_data(parsed_data):
177
#         from . import protocol, errors
178
#
179
#         if is_json_rpc_response(parsed_data):
180
#             if is_json_batch(parsed_data):
181
#                 return protocol.JsonRpcBatchResponse.from_list(parsed_data)
182
#             else:
183
#                 return protocol.JsonRpcResponse.from_dict(parsed_data)
184
#         elif is_json_rpc_request(parsed_data):
185
#             if is_json_batch(parsed_data):
186
#                 return protocol.JsonRpcBatchRequest.from_list(parsed_data)
187
#             else:
188
#                 return protocol.JsonRpcRequest.from_dict(parsed_data)
189
#         else:
190
#             raise errors.ParseError
191