Completed
Push — main ( 5e6d02...339f4c )
by
unknown
15s queued 12s
created

tests.test_ws_subscribe_recent_trades   A

Complexity

Total Complexity 5

Size/Duplication

Total Lines 82
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 55
dl 0
loc 82
rs 10
c 0
b 0
f 0
wmc 5
1
import asyncio
2
from typing import List
3
4
from tests import APITestCase, MAINNET_WS_URI, WEBSOCKET_TIMEOUT_SUBSCRIPTION
5
from tradehub.websocket_client import DemexWebsocket
6
7
8
class TestWSSubscribeRecentTrades(APITestCase):
9
10
    def test_subscribe_recent_trades_structure(self):
11
        """
12
        Check if response match expected dict structure.
13
        :return:
14
        """
15
        expect_subscription: dict = {
16
            'id': str,
17
            'result': [str]
18
        }
19
20
        expect: dict = {
21
            'channel': str,
22
            'sequence_number': int,
23
            'result': [
24
                {
25
                    'id': str,
26
                    'block_created_at': str,
27
                    'taker_id': str,
28
                    'taker_address': str,
29
                    'taker_fee_amount': str,
30
                    'taker_fee_denom': str,
31
                    'taker_side': str,
32
                    'maker_id': str,
33
                    'maker_address': str,
34
                    'maker_fee_amount': str,
35
                    'maker_fee_denom': str,
36
                    'maker_side': str,
37
                    'market': str,
38
                    'price': str,
39
                    'quantity': str,
40
                    'liquidation': str,
41
                    'taker_username': str,
42
                    'maker_username': str,
43
                    'block_height': str,
44
                }
45
            ]
46
        }
47
48
        # connect to websocket
49
        client = DemexWebsocket(uri=MAINNET_WS_URI)
50
        # little work around to save the response
51
        self.response: List[dict] = []
52
53
        async def on_connect():
54
            # use AMM to be sure deterministic of which tokens the wallet holds
55
            await client.subscribe_recent_trades('orders', "eth1_usdc1")
56
57
        async def on_message(message: dict):
58
            # save response into self
59
            self.response.append(message)
60
61
        try:
62
            loop = asyncio.get_event_loop()
63
            loop.run_until_complete(asyncio.wait_for(client.connect(on_connect_callback=on_connect,
64
                                                                    on_receive_message_callback=on_message),
65
                                                     WEBSOCKET_TIMEOUT_SUBSCRIPTION))
66
        except asyncio.TimeoutError:
67
            loop = asyncio.get_event_loop()
68
            loop.run_until_complete(client.disconnect())
69
70
        if not self.response:
71
            raise RuntimeError("Did not receive a response.")
72
73
        if len(self.response) < 2:
74
            self.skipTest(f"Did not receive orders within time, test can not finish.")
75
76
        channel_subscription: dict = self.response[0]
77
        self.assertDictStructure(expect_subscription, channel_subscription)
78
79
        for message in self.response[1:]:
80
            # if this fails, check if the AMM wallet own other tokens as expected
81
            self.assertDictStructure(expect, message)
82