Passed
Pull Request — main (#21)
by
unknown
02:37
created

tests.test_ws_unsubscribe   A

Complexity

Total Complexity 5

Size/Duplication

Total Lines 55
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 35
dl 0
loc 55
rs 10
c 0
b 0
f 0
wmc 5

1 Method

Rating   Name   Duplication   Size   Complexity  
B TestWSUnsubscribe.test_unsubscribe_structure() 0 44 5
1
import asyncio
2
import concurrent
3
from typing import Optional, List
4
5
from tests import APITestCase, DEVEL_AND_CO_SENTRY, WALLET_SWTH_ETH1_AMM, WEBSOCKET_TIMEOUT_GET_REQUEST
6
from tradehub.websocket_client import DemexWebsocket
7
8
9
class TestWSUnsubscribe(APITestCase):
10
11
    def test_unsubscribe_structure(self):
12
        """
13
        Check if response match expected dict structure.
14
        :return:
15
        """
16
        expect_subscription: dict = {
17
            'id': str,
18
            'result': [str]
19
        }
20
21
        # connect to websocket
22
        client = DemexWebsocket(f"ws://{DEVEL_AND_CO_SENTRY}:5000/ws")
23
        # little work around to save the response
24
        self.response: List[dict] = []
25
26
        async def on_connect():
27
            # use AMM to be sure deterministic of which tokens the wallet holds
28
            await client.subscribe_balances('balance', WALLET_SWTH_ETH1_AMM)
29
30
        async def on_message(message: dict):
31
            # save response into self
32
            if "id" in message.keys():
33
                self.response.append(message)
34
            if len(self.response) == 1:
35
                await client.unsubscribe("unsubscribe", self.response[0]["result"])
36
37
        try:
38
            loop = asyncio.get_event_loop()
39
            loop.run_until_complete(asyncio.wait_for(client.connect(on_connect_callback=on_connect,
40
                                                                    on_receive_message_callback=on_message),
41
                                                     WEBSOCKET_TIMEOUT_GET_REQUEST))
42
        except concurrent.futures._base.TimeoutError:
43
            loop = asyncio.get_event_loop()
44
            loop.run_until_complete(client.disconnect())
45
46
        if not self.response:
47
            raise RuntimeError("Did not receive a response.")
48
49
        self.assertTrue(len(self.response) >= 2, msg=f"Expected at least 2 messages: channel subscription and and first update message")
50
51
        channel_subscription: dict = self.response[0]
52
        self.assertDictStructure(expect_subscription, channel_subscription)
53
        channel_unsubscription: dict = self.response[1]
54
        self.assertDictStructure(expect_subscription, channel_unsubscription)
55
56