Code Duplication    Length = 27-27 lines in 2 locations

tests/test_protocol_for_ws.py 1 location

@@ 71-97 (lines=27) @@
68
        assert result[0] == {'jsonrpc': '2.0', 'result': 19, 'id': 4}
69
70
71
async def test_notification(aiohttp_client):
72
    """
73
    --> {"jsonrpc": "2.0", "method": "update", "params": [1,2,3,4,5]}
74
    --> {"jsonrpc": "2.0", "method": "foobar"}
75
    """
76
77
    def update(*args):
78
        return args
79
80
    def foobar(*args):
81
        return 'ok'
82
83
    rpc_server = aiohttp_rpc.WsJsonRpcServer()
84
    rpc_server.add_method(update)
85
    rpc_server.add_method(foobar)
86
87
    client = await utils.make_ws_client(aiohttp_client, rpc_server)
88
89
    async with aiohttp_rpc.WsJsonRpcClient('/rpc', session=client) as rpc:
90
        assert await rpc.notify('update', subtrahend=23, minuend=42) is None
91
        assert await rpc.notify('foobar', minuend=42, subtrahend=23) is None
92
93
        result = await rpc.send_json({'jsonrpc': '2.0', 'method': 'update', 'params': [1, 2, 3, 4, 5]})
94
        assert result[0] is None
95
96
        result = await rpc.send_json({'jsonrpc': '2.0', 'method': 'foobar'})
97
        assert result[0] is None
98
99
100
async def test_rpc_call_of_non_existent_method(aiohttp_client):

tests/test_protocol_for_http.py 1 location

@@ 70-96 (lines=27) @@
67
        assert result[0] == {'jsonrpc': '2.0', 'result': 19, 'id': 4}
68
69
70
async def test_notification(aiohttp_client):
71
    """
72
    --> {"jsonrpc": "2.0", "method": "update", "params": [1,2,3,4,5]}
73
    --> {"jsonrpc": "2.0", "method": "foobar"}
74
    """
75
76
    def update(*args):
77
        return args
78
79
    def foobar(*args):
80
        return 'ok'
81
82
    rpc_server = aiohttp_rpc.JsonRpcServer()
83
    rpc_server.add_method(update)
84
    rpc_server.add_method(foobar)
85
86
    client = await utils.make_client(aiohttp_client, rpc_server)
87
88
    async with aiohttp_rpc.JsonRpcClient('/rpc', session=client) as rpc:
89
        assert await rpc.notify('update', subtrahend=23, minuend=42) is None
90
        assert await rpc.notify('foobar', minuend=42, subtrahend=23) is None
91
92
        result = await rpc.send_json({'jsonrpc': '2.0', 'method': 'update', 'params': [1, 2, 3, 4, 5]})
93
        assert result[0] is None
94
95
        result = await rpc.send_json({'jsonrpc': '2.0', 'method': 'foobar'})
96
        assert result[0] is None
97
98
99
async def test_rpc_call_of_non_existent_method(aiohttp_client):