Completed
Push — main ( 30c9e1...6da681 )
by Switcheolytics
42:03 queued 11s
created

tests.test_ws_subscribe_books   A

Complexity

Total Complexity 4

Size/Duplication

Total Lines 66
Duplicated Lines 86.36 %

Importance

Changes 0
Metric Value
eloc 41
dl 57
loc 66
rs 10
c 0
b 0
f 0
wmc 4

1 Method

Rating   Name   Duplication   Size   Complexity  
A TestWSSubscribeBooks.test_subscribe_books_structure() 55 55 4

How to fix   Duplicated Code   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

1
import asyncio
2
import concurrent
3
from typing import Optional, List
4
5
from tests import APITestCase, DEVEL_AND_CO_SENTRY, WALLET_DEVEL, WEBSOCKET_TIMEOUT_SUBSCRIPTION
6
from tradehub.websocket_client import DemexWebsocket
7
8
9 View Code Duplication
class TestWSSubscribeBooks(APITestCase):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
10
11
    def test_subscribe_books_structure(self):
12
        """
13
        Check if response match expected dict structure.
14
        :return:
15
        """
16
        expect_subscription: dict = {
17
            'id': str,
18
            'result': [str]
19
        }
20
21
        expect: dict = {
22
            'channel': str,
23
            'sequence_number': int,
24
            'result': [
25
                {
26
                    'market': str,
27
                    'price': str,
28
                    'quantity': str,
29
                    'side': str,
30
                    'type': str
31
                }
32
            ]
33
        }
34
35
        # connect to websocket
36
        client = DemexWebsocket(f"ws://{DEVEL_AND_CO_SENTRY}:5000/ws")
37
        # little work around to save the response
38
        self.response: List[dict] = []
39
40
        async def on_connect():
41
            await client.subscribe_books('book', "eth1_usdc1")
42
43
        async def on_message(message: dict):
44
            # save response into self
45
            self.response.append(message)
46
47
        try:
48
            loop = asyncio.get_event_loop()
49
            loop.run_until_complete(asyncio.wait_for(client.connect(on_connect_callback=on_connect,
50
                                                                    on_receive_message_callback=on_message),
51
                                                     WEBSOCKET_TIMEOUT_SUBSCRIPTION))
52
        except asyncio.TimeoutError:
53
            loop = asyncio.get_event_loop()
54
            loop.run_until_complete(client.disconnect())
55
56
        if not self.response:
57
            raise RuntimeError("Did not receive a response.")
58
59
        self.assertTrue(len(self.response) >= 2, msg=f"Expected at least 2 messages: channel subscription and an update message")
60
61
        channel_subscription: dict = self.response[0]
62
        self.assertDictStructure(expect_subscription, channel_subscription)
63
64
        for message in self.response[1:]:
65
            self.assertDictStructure(expect, message)
66
67