1 | import json |
||
2 | import typing |
||
3 | import uuid |
||
4 | from functools import partial |
||
5 | from traceback import format_exception_only |
||
6 | |||
7 | from . import constants, errors |
||
8 | |||
9 | if typing.TYPE_CHECKING: |
||
10 | from . import protocol # NOQA |
||
11 | |||
12 | |||
13 | __all__ = ( |
||
14 | 'convert_params_to_args_and_kwargs', |
||
15 | 'parse_args_and_kwargs', |
||
16 | 'get_exc_message', |
||
17 | 'json_serialize', |
||
18 | 'collect_batch_result', |
||
19 | ) |
||
20 | |||
21 | |||
22 | def convert_params_to_args_and_kwargs(params: typing.Any) -> typing.Tuple[typing.Sequence, typing.Mapping]: |
||
23 | if params is constants.NOTHING: |
||
24 | return (), {} |
||
25 | |||
26 | if isinstance(params, constants.JSON_PRIMITIVE_TYPES): |
||
27 | return (params,), {} |
||
28 | |||
29 | if isinstance(params, typing.Sequence): |
||
30 | return params, {} |
||
31 | |||
32 | if isinstance(params, typing.Mapping): |
||
33 | return (), params |
||
34 | |||
35 | raise errors.InvalidParams('Params have unsupported data types.') |
||
36 | |||
37 | |||
38 | def parse_args_and_kwargs(args: typing.Optional[typing.Sequence], |
||
39 | kwargs: typing.Optional[typing.Mapping], |
||
40 | ) -> typing.Tuple[typing.Any, typing.Sequence, typing.Mapping]: |
||
41 | has_args = bool(args) |
||
42 | has_kwargs = bool(kwargs) |
||
43 | |||
44 | if not has_args and not has_kwargs: |
||
45 | return constants.NOTHING, (), {} # type: ignore |
||
46 | |||
47 | if not (has_args ^ has_kwargs): |
||
48 | raise errors.InvalidParams('Need use args or kwargs.') |
||
49 | |||
50 | if has_args: |
||
51 | if len(args) == 1 and isinstance(args[0], constants.JSON_PRIMITIVE_TYPES): # type: ignore |
||
52 | return args[0], args, {} # type: ignore |
||
53 | |||
54 | return args, args, {} # type: ignore |
||
55 | |||
56 | return kwargs, (), kwargs # type: ignore |
||
57 | |||
58 | |||
59 | def get_random_id() -> str: |
||
60 | return str(uuid.uuid4()) |
||
61 | |||
62 | |||
63 | def get_exc_message(exp: BaseException) -> str: |
||
64 | return ''.join(format_exception_only(exp.__class__, exp)).strip() |
||
65 | |||
66 | |||
67 | def validate_jsonrpc(jsonrpc: typing.Any) -> None: |
||
68 | if jsonrpc != constants.VERSION_2_0: |
||
69 | raise errors.InvalidRequest(f'Only version "{constants.VERSION_2_0}" is supported.') |
||
70 | |||
71 | |||
72 | def collect_batch_result(batch_request: 'protocol.JsonRpcBatchRequest', |
||
73 | batch_response: 'protocol.JsonRpcBatchResponse') -> typing.Tuple[typing.Any, ...]: |
||
74 | from . import protocol |
||
75 | |||
76 | unlinked_results = protocol.JsonRpcUnlinkedResults() |
||
77 | responses_map: typing.Dict[typing.Any, typing.Any] = {} |
||
78 | |||
79 | for response in batch_response.responses: |
||
80 | if response.error is None: |
||
81 | value = response.result |
||
82 | else: |
||
83 | value = response.error |
||
84 | |||
85 | if response.id is None: |
||
86 | unlinked_results.add(value) |
||
87 | continue |
||
88 | |||
89 | if response.id in responses_map: |
||
90 | if isinstance(responses_map[response.id], protocol.JsonRpcDuplicatedResults): |
||
0 ignored issues
–
show
introduced
by
![]() |
|||
91 | responses_map[response.id].add(value) |
||
92 | else: |
||
93 | responses_map[response.id] = protocol.JsonRpcDuplicatedResults([ |
||
94 | responses_map[response.id], |
||
95 | value, |
||
96 | ]) |
||
97 | else: |
||
98 | responses_map[response.id] = value |
||
99 | |||
100 | return tuple( |
||
101 | unlinked_results or None |
||
102 | if request.is_notification |
||
103 | else responses_map.get(request.id, unlinked_results or None) |
||
104 | for request in batch_request.requests |
||
105 | ) |
||
106 | |||
107 | |||
108 | json_serialize = partial(json.dumps, default=lambda x: repr(x)) |
||
109 | json_deserialize = json.loads |
||
110 |