Completed
Push — main ( 30c9e1...6da681 )
by Switcheolytics
42:03 queued 11s
created

tests.test_ws_subscribe_orders   A

Complexity

Total Complexity 10

Size/Duplication

Total Lines 169
Duplicated Lines 92.31 %

Importance

Changes 0
Metric Value
eloc 115
dl 156
loc 169
rs 10
c 0
b 0
f 0
wmc 10

2 Methods

Rating   Name   Duplication   Size   Complexity  
B TestWSSubscribeOrders.test_subscribe_orders_structure() 78 78 5
B TestWSSubscribeOrders.test_subscribe_market_orders_structure() 78 78 5

How to fix   Duplicated Code   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

1
import asyncio
2
import concurrent
3
from typing import Optional, List
4
5
from tests import APITestCase, DEVEL_AND_CO_SENTRY, WALLET_SWTH_ETH1_AMM, WEBSOCKET_TIMEOUT_SUBSCRIPTION
6
from tradehub.websocket_client import DemexWebsocket
7
8
9
class TestWSSubscribeOrders(APITestCase):
10
11 View Code Duplication
    def test_subscribe_orders_structure(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
12
        """
13
        Check if response match expected dict structure.
14
        :return:
15
        """
16
        expect_subscription: dict = {
17
            'id': str,
18
            'result': [str]
19
        }
20
21
        expect: dict = {
22
            'channel': str,
23
            'result': [
24
                {
25
                    'order_id': str,
26
                    'block_height': int,
27
                    'triggered_block_height': int,
28
                    'address': str,
29
                    'market': str,
30
                    'side': str,
31
                    'price': str,
32
                    'quantity': str,
33
                    'available': str,
34
                    'filled': str,
35
                    'order_status': str,
36
                    'order_type': str,
37
                    'initiator': str,
38
                    'time_in_force': str,
39
                    'stop_price': str,
40
                    'trigger_type': str,
41
                    'allocated_margin_denom': str,
42
                    'allocated_margin_amount': str,
43
                    'is_liquidation': bool,
44
                    'is_post_only': bool,
45
                    'is_reduce_only': bool,
46
                    'type': str,
47
                    'block_created_at': str,
48
                    'username': str,
49
                    'id': str
50
                }
51
            ]
52
        }
53
54
55
        # connect to websocket
56
        client = DemexWebsocket(f"ws://{DEVEL_AND_CO_SENTRY}:5000/ws")
57
        # little work around to save the response
58
        self.response: List[dict] = []
59
60
        async def on_connect():
61
            # use AMM to be sure deterministic of which tokens the wallet holds
62
            await client.subscribe_orders('orders', WALLET_SWTH_ETH1_AMM)
63
64
        async def on_message(message: dict):
65
            # save response into self
66
            self.response.append(message)
67
68
        try:
69
            loop = asyncio.get_event_loop()
70
            loop.run_until_complete(asyncio.wait_for(client.connect(on_connect_callback=on_connect,
71
                                                                    on_receive_message_callback=on_message),
72
                                                     WEBSOCKET_TIMEOUT_SUBSCRIPTION))
73
        except asyncio.TimeoutError:
74
            loop = asyncio.get_event_loop()
75
            loop.run_until_complete(client.disconnect())
76
77
        if not self.response:
78
            raise RuntimeError("Did not receive a response.")
79
80
        if len(self.response) < 2:
81
            self.skipTest(f"Did not receive orders within time, test can not finish.")
82
83
        channel_subscription: dict = self.response[0]
84
        self.assertDictStructure(expect_subscription, channel_subscription)
85
86
        for message in self.response[1:]:
87
            # if this fails, check if the AMM wallet own other tokens as expected
88
            self.assertDictStructure(expect, message)
89
90
91 View Code Duplication
    def test_subscribe_market_orders_structure(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
92
        """
93
        Check if response match expected dict structure.
94
        :return:
95
        """
96
        expect_subscription: dict = {
97
            'id': str,
98
            'result': [str]
99
        }
100
101
        expect: dict = {
102
            'channel': str,
103
            'result': [
104
                {
105
                    'order_id': str,
106
                    'block_height': int,
107
                    'triggered_block_height': int,
108
                    'address': str,
109
                    'market': str,
110
                    'side': str,
111
                    'price': str,
112
                    'quantity': str,
113
                    'available': str,
114
                    'filled': str,
115
                    'order_status': str,
116
                    'order_type': str,
117
                    'initiator': str,
118
                    'time_in_force': str,
119
                    'stop_price': str,
120
                    'trigger_type': str,
121
                    'allocated_margin_denom': str,
122
                    'allocated_margin_amount': str,
123
                    'is_liquidation': bool,
124
                    'is_post_only': bool,
125
                    'is_reduce_only': bool,
126
                    'type': str,
127
                    'block_created_at': str,
128
                    'username': str,
129
                    'id': str
130
                }
131
            ]
132
        }
133
134
135
        # connect to websocket
136
        client = DemexWebsocket(f"ws://{DEVEL_AND_CO_SENTRY}:5000/ws")
137
        # little work around to save the response
138
        self.response: List[dict] = []
139
140
        async def on_connect():
141
            # use AMM to be sure deterministic of which tokens the wallet holds
142
            await client.subscribe_orders('orders', WALLET_SWTH_ETH1_AMM, "swth_eth1")
143
144
        async def on_message(message: dict):
145
            # save response into self
146
            self.response.append(message)
147
148
        try:
149
            loop = asyncio.get_event_loop()
150
            loop.run_until_complete(asyncio.wait_for(client.connect(on_connect_callback=on_connect,
151
                                                                    on_receive_message_callback=on_message),
152
                                                     WEBSOCKET_TIMEOUT_SUBSCRIPTION))
153
        except asyncio.TimeoutError:
154
            loop = asyncio.get_event_loop()
155
            loop.run_until_complete(client.disconnect())
156
157
        if not self.response:
158
            raise RuntimeError("Did not receive a response.")
159
160
        if len(self.response) < 2:
161
            self.skipTest(f"Did not receive orders within time, test can not finish.")
162
163
        channel_subscription: dict = self.response[0]
164
        self.assertDictStructure(expect_subscription, channel_subscription)
165
166
        for message in self.response[1:]:
167
            # if this fails, check if the AMM wallet own other tokens as expected
168
            self.assertDictStructure(expect, message)