Passed
Pull Request — main (#43)
by Switcheolytics
01:26
created

tests.test_ws_get_open_orders   A

Complexity

Total Complexity 3

Size/Duplication

Total Lines 73
Duplicated Lines 89.04 %

Importance

Changes 0
Metric Value
eloc 52
dl 65
loc 73
rs 10
c 0
b 0
f 0
wmc 3

How to fix   Duplicated Code   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

1
import asyncio
2
from typing import Optional
3
4
from tests import APITestCase, MAINNET_WS_URI, WALLET_SWTH_ETH1_AMM, WEBSOCKET_TIMEOUT_GET_REQUEST
5
from tradehub.websocket_client import DemexWebsocket
6
7
8
class TestWSGetOpenOrders(APITestCase):
9
10
    def test_get_open_orders_structure(self):
11
        """
12
        Check if response match expected dict structure.
13
        :return:
14
        """
15
        expect: dict = {
16
            "id": str,
17
            "result": [
18
                {
19
                    "order_id": str,
20
                    "block_height": int,
21
                    "triggered_block_height": int,
22
                    "address": str,
23
                    "market": str,
24
                    "side": str,
25
                    "price": str,
26
                    "quantity": str,
27
                    "available": str,
28
                    "filled": str,
29
                    "order_status": str,
30
                    "order_type": str,
31
                    "initiator": str,
32
                    "time_in_force": str,
33
                    "stop_price": str,
34
                    "trigger_type": str,
35
                    "allocated_margin_denom": str,
36
                    "allocated_margin_amount": str,
37
                    "is_liquidation": bool,
38
                    "is_post_only": bool,
39
                    "is_reduce_only": bool,
40
                    "type": str,
41
                    "block_created_at": str,
42
                    "username": str,
43
                    "id": str,
44
                }
45
            ]
46
        }
47
48
        # connect to websocket
49
        client = DemexWebsocket(uri=MAINNET_WS_URI)
50
        # little work around to save the response
51
        self.response: Optional[dict] = None
52
53
        async def on_connect():
54
            await client.get_open_orders('order_history', WALLET_SWTH_ETH1_AMM)
55
56
        async def on_message(message: dict):
57
            # save response into self
58
            self.response = message
59
            await client.disconnect()
60
61
        try:
62
            loop = asyncio.get_event_loop()
63
            loop.run_until_complete(asyncio.wait_for(client.connect(on_connect_callback=on_connect,
64
                                                                    on_receive_message_callback=on_message),
65
                                                     WEBSOCKET_TIMEOUT_GET_REQUEST))
66
        except asyncio.TimeoutError:
67
            raise TimeoutError("Test did not complete in time.")
68
69
        if not self.response:
70
            raise RuntimeError("Did not receive a response.")
71
72
        self.assertDictStructure(expect, self.response)
73