Test Failed
Push — master ( c58336...8dd0a6 )
by Michael
03:57
created

test_ws.test_args()   A

Complexity

Conditions 2

Size

Total Lines 13
Code Lines 10

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 2
eloc 10
nop 1
dl 0
loc 13
rs 9.9
c 0
b 0
f 0
1
import os
2
import sys
3
4
import pytest
5
6
import aiohttp_rpc
7
8
9
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
10
from tests import utils
11
12
13
@pytest.mark.asyncio
14
async def test_args(aiohttp_client):
15
    def method(a=1):
16
        return [1, 2, a]
17
18
    rpc_server = aiohttp_rpc.WsJsonRpcServer()
19
    rpc_server.add_method(method)
20
21
    client = await utils.make_ws_client(aiohttp_client, rpc_server)
22
23
    async with aiohttp_rpc.WsJsonRpcClient('/rpc', session=client) as rpc:
24
        assert await rpc.call('method') == [1, 2, 1]
25
        assert await rpc.call('method', 1) == [1, 2, 1]
26
27
28 View Code Duplication
@pytest.mark.asyncio
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
29
async def test_batch(aiohttp_client):
30
    def method_1(a=1):
31
        return [1, 2, a]
32
33
    def method_2():
34
        return [1]
35
36
    rpc_server = aiohttp_rpc.WsJsonRpcServer()
37
    rpc_server.add_methods((
38
        method_1,
39
        method_2,
40
    ))
41
42
    client = await utils.make_ws_client(aiohttp_client, rpc_server)
43
44
    async with aiohttp_rpc.WsJsonRpcClient('/rpc', session=client) as rpc:
45
        assert await rpc.batch(('method_1', 'method_2',)) == [[1, 2, 1], [1]]
46
        assert await rpc.batch((('method_1', 4), ('method_1', [], {'a': 5}),)) == [[1, 2, 4], [1, 2, 5]]
47
48
    async with aiohttp_rpc.WsJsonRpcClient('/rpc', session=client) as rpc:
49
        assert await rpc.batch_notify(('method_1', 'method_2',)) is None
50
        assert await rpc.batch_notify((('method_1', 4), ('method_1', [], {'a': 5}),)) is None
51