Test Failed
Push — master ( c58336...8dd0a6 )
by Michael
03:57
created

test_batch.test_unlinked_results()   A

Complexity

Conditions 4

Size

Total Lines 38
Code Lines 28

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 4
eloc 28
nop 2
dl 0
loc 38
rs 9.208
c 0
b 0
f 0
1
import os
2
import sys
3
4
import pytest
5
6
import aiohttp_rpc
7
8
9
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
10
from tests import utils
11
12
13 View Code Duplication
@pytest.mark.asyncio
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
14
async def test_batch(aiohttp_client):
15
    def method_1(a=1):
16
        return [1, 2, a]
17
18
    def method_2():
19
        return [1]
20
21
    rpc_server = aiohttp_rpc.JsonRpcServer()
22
    rpc_server.add_methods((
23
        method_1,
24
        method_2,
25
    ))
26
27
    assert await rpc_server.call('method_1') == [1, 2, 1]
28
    assert await rpc_server.call('method_2') == [1]
29
30
    client = await utils.make_client(aiohttp_client, rpc_server)
31
32
    async with aiohttp_rpc.JsonRpcClient('/rpc', session=client) as rpc:
33
        assert await rpc.batch(('method_1', 'method_2',)) == [[1, 2, 1], [1]]
34
        assert await rpc.batch((('method_1', 4), ('method_1', [], {'a': 5}),)) == [[1, 2, 4], [1, 2, 5]]
35
36
37
@pytest.mark.asyncio
38
async def test_unlinked_results(aiohttp_client, mocker):
39
    def method_1(a=1):
40
        return [1, 2, a]
41
42
    def method_2():
43
        return [1]
44
45
    rpc_server = aiohttp_rpc.JsonRpcServer()
46
    rpc_server.add_methods((
47
        method_1,
48
        method_2,
49
    ))
50
51
    client = await utils.make_client(aiohttp_client, rpc_server)
52
53
    async def test_send_json_1(data, **kwargs):
54
        data = [
55
            {'id': None, 'jsonrpc': '2.0', 'result': [1]},
56
            {'id': data[0]['id'], 'jsonrpc': '2.0', 'result': [1, 2, 1]},
57
        ]
58
        return data, {}
59
60
    async def test_send_json_2(data, **kwargs):
61
        data = [
62
            {'id': None, 'jsonrpc': '2.0', 'result': [1]},
63
            {'id': data[0]['id'], 'jsonrpc': '2.0', 'result': [1, 2, 1]},
64
            {'id': None, 'jsonrpc': '2.0', 'result': [1]},
65
        ]
66
        return data, {}
67
68
    async with aiohttp_rpc.JsonRpcClient('/rpc', session=client) as rpc:
69
        mocker.patch.object(rpc, 'send_json', new_callable=lambda: test_send_json_1)
70
        assert await rpc.batch(('method_1', 'method_2',)) == [[1, 2, 1], [1]]
71
72
        mocker.patch.object(rpc, 'send_json', new_callable=lambda: test_send_json_2)
73
        unlinked_results = aiohttp_rpc.UnlinkedResults(data=[[1], [1]])
74
        assert await rpc.batch(('method_1', 'method_2', 'method_2',)) == [[1, 2, 1], unlinked_results, unlinked_results]
75