Passed
Push — main ( 3b3251...3424c0 )
by Switcheolytics
01:09 queued 11s
created

tradehub.websocket_client.on_connect()   A

Complexity

Conditions 1

Size

Total Lines 2
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 2
dl 0
loc 2
rs 10
c 0
b 0
f 0
cc 1
nop 0
1
from typing import Optional, List, Callable
2
import asyncio
3
import websockets
4
import json
5
6
7
class DemexWebsocket:
8
    """
9
    DemexWebsocket is a high-level async implementation off the official Tradehub Demex websocket and provides all
10
    functionalities described in the documentation.
11
    """
12
13
    def __init__(self, uri: str, ping_interval: Optional[int] = 10, ping_timeout: Optional[int] = 30):
14
        """
15
        Create a websocket which is complaint with the specification provided by the offical documentation.
16
17
        .. see::
18
            https://docs.switcheo.org/#/?id=websocket
19
20
        :param uri: Websocket URI, starting with 'ws://' or 'wss://' e.g. 'ws://85.214.81.155:5000/ws'
21
        :param ping_interval: Interval for pinging the server in seconds.
22
        :param ping_timeout: Time after no response for pings are considered as timeout in seconds.
23
        """
24
        self._uri: str = uri
25
        self._ping_interval: int = ping_interval
26
        self._ping_timeout: int = ping_timeout
27
        self._websocket: Optional[websockets.WebSocketClientProtocol] = None
28
29
    async def subscribe(self, message_id: str, channels: List[str]):
30
        """
31
        Subscribe to one or many channels.
32
33
        :param message_id: Identifier that will be included in the websocket message response to allow the subscriber to
34
                           identify which channel the notification is originated from.
35
        :param channels: List with channels to join.
36
        :return: None
37
        """
38
        await self.send({
39
            "id": message_id,
40
            "method": "subscribe",
41
            "params": {"channels": channels}
42
        })
43
44
    async def unsubscribe(self, message_id: str, channels: List[str]):
45
        """
46
        Unsubscribe to one or many channels.
47
48
        :param message_id: Identifier that will be included in the websocket message response to allow the subscriber to
49
                           identify which channel the notification is originated from.
50
        :param channels: List with channels to leave.
51
        :return: None
52
        """
53
        await self.send({
54
            "id": message_id,
55
            "method": "unsubscribe",
56
            "params": {"channels": channels}
57
        })
58
59
    async def subscribe_leverages(self, message_id: str, swth_address: str):
60
        """
61
        Subscribe to wallet specific leverages channel.
62
63
        .. warning::
64
            This channel has not been tested yet.
65
66
        :param message_id: Identifier that will be included in the websocket message response to allow the subscriber to
67
                           identify which channel the notification is originated from.
68
        :param swth_address: Tradehub wallet address starting with 'swth1' for mainnet and 'tswth1' for testnet.
69
        :return: None
70
        """
71
        # TODO not tested yet
72
        channel_name: str = f"leverages.{swth_address}"
73
        await self.subscribe(message_id, [channel_name])
74
75
    async def subscribe_market_stats(self, message_id: str):
76
        """
77
        Subscribe to market stats.
78
79
        :param message_id: Identifier that will be included in the websocket message response to allow the subscriber to
80
                           identify which channel the notification is originated from.
81
        :return: None
82
        """
83
        channel_name: str = "market_stats"
84
        await self.subscribe(message_id, [channel_name])
85
86
    async def subscribe_books(self, message_id: str, market: str):
87
        """
88
        Subscribe to book channel.
89
90
        .. note::
91
            The first message is a snapshot off the current orderbook. The following messages are delta messages to the
92
            snapshot. Each message has a 'sequence_number'.
93
94
        :param message_id: Identifier that will be included in the websocket message response to allow the subscriber to
95
                           identify which channel the notification is originated from.
96
        :param market: Tradehub market identifier, e.g. 'swth_eth1'
97
        :return: None
98
        """
99
        channel_name: str = f"books.{market}"
100
        await self.subscribe(message_id, [channel_name])
101
102
    async def subscribe_orders(self, message_id: str, swth_address: str, market: Optional[str] = None):
103
        """
104
        Subscribe to orders channel.
105
106
        .. note::
107
            The market identifier is optional and acts as a filter.
108
109
        :param message_id: Identifier that will be included in the websocket message response to allow the subscriber to
110
                           identify which channel the notification is originated from.
111
        :param swth_address: Tradehub wallet address starting with 'swth1' for mainnet and 'tswth1' for testnet.
112
        :param market: Tradehub market identifier, e.g. 'swth_eth1'
113
        :return: None
114
        """
115
        if market:
116
            channel_name: str = f"orders_by_market.{market}.{swth_address}"
117
        else:
118
            channel_name: str = f"orders.{swth_address}"
119
        await self.subscribe(message_id, [channel_name])
120
121
    async def subscribe_positions(self, message_id: str, swth_address: str, market: Optional[str] = None):
122
        """
123
        Subscribe to positions channel.
124
125
        .. note::
126
            The market identifier is optional and acts as a filter.
127
128
        .. warning::
129
            This channel is not tested yet.
130
131
        :param message_id: Identifier that will be included in the websocket message response to allow the subscriber to
132
                           identify which channel the notification is originated from.
133
        :param swth_address: Tradehub wallet address starting with 'swth1' for mainnet and 'tswth1' for testnet.
134
        :param market: Tradehub market identifier, e.g. 'swth_eth1'
135
        :return: None
136
        """
137
        # TODO not tested yet
138
        if market:
139
            channel_name: str = f"positions_by_market.{market}.{swth_address}"
140
        else:
141
            channel_name: str = f"positions.{swth_address}"
142
        await self.subscribe(message_id, [channel_name])
143
144
    async def subscribe_recent_trades(self, message_id: str, market: str):
145
        """
146
        Subscribe to recent trades.
147
148
        :param message_id: Identifier that will be included in the websocket message response to allow the subscriber to
149
                           identify which channel the notification is originated from.
150
        :param market: Tradehub market identifier, e.g. 'swth_eth1'
151
        :return: None
152
        """
153
        channel_name: str = f"recent_trades.{market}"
154
        await self.subscribe(message_id, [channel_name])
155
156
    async def subscribe_account_trades(self, message_id: str, swth_address: str, market: Optional[str] = None):
157
        """
158
        Subscribe to account trades.
159
160
        .. note::
161
            The market identifier is optional and acts as a filter.
162
163
        :param message_id: Identifier that will be included in the websocket message response to allow the subscriber to
164
                           identify which channel the notification is originated from.
165
        :param swth_address: Tradehub wallet address starting with 'swth1' for mainnet and 'tswth1' for testnet.
166
        :param market: Tradehub market identifier, e.g. 'swth_eth1'
167
        :return: None
168
        """
169
        if market:
170
            channel_name: str = f"account_trades_by_market.{market}.{swth_address}"
171
        else:
172
            channel_name: str = f"account_trades.{swth_address}"
173
        await self.subscribe(message_id, [channel_name])
174
175
    async def subscribe_balances(self, message_id: str, swth_address: str):
176
        """
177
        Subscribe to wallet specific balance channel.
178
179
        :param message_id: Identifier that will be included in the websocket message response to allow the subscriber to
180
                           identify which channel the notification is originated from.
181
        :param swth_address: Tradehub wallet address starting with 'swth1' for mainnet and 'tswth1' for testnet.
182
        :return: None
183
        """
184
        channel_name: str = f"balances.{swth_address}"
185
        await self.subscribe(message_id, [channel_name])
186
187
    async def subscribe_candlesticks(self, message_id: str, market: str, granularity: int):
188
        """
189
        Subscribe to candlesticks channel.
190
191
        :param message_id: Identifier that will be included in the websocket message response to allow the subscriber to
192
                           identify which channel the notification is originated from.
193
        :param market: Tradehub market identifier, e.g. 'swth_eth1'
194
        :param granularity: Define the candlesticks granularity. Allowed values: 1, 5, 15, 30, 60, 1360, 1440.
195
        :return: None
196
        """
197
        if granularity not in [1, 5, 15, 30, 60, 1360, 1440]:
198
            raise ValueError(f"Granularity '{granularity}' not supported. Allowed values: 1, 5, 15, 30, 60, 1360, 1440")
199
        channel_name: str = f"candlesticks.{market}.{granularity}"
200
        await self.subscribe(message_id, [channel_name])
201
202
    async def get_order_history(self, message_id: str, swth_address: str, market: Optional[str] = None):
203
        """
204
        Request order history.
205
206
        .. note::
207
            The market identifier is optional and acts as a filter.
208
209
        :param message_id: Identifier that will be included in the websocket message response to allow the subscriber to
210
                           identify which channel the notification is originated from.
211
        :param swth_address: Tradehub wallet address starting with 'swth1' for mainnet and 'tswth1' for testnet.
212
        :param market: Tradehub market identifier, e.g. 'swth_eth1'
213
        :return: None
214
        """
215
        await self.send({
216
            "id": message_id,
217
            "method": "get_order_history",
218
            "params": {
219
                "address": swth_address,
220
                "market": market
221
            }
222
        })
223
224
    async def get_recent_trades(self, message_id: str, market: str):
225
        """
226
        Request recent trades.
227
228
        :param message_id: Identifier that will be included in the websocket message response to allow the subscriber to
229
                           identify which channel the notification is originated from.
230
        :param market: Tradehub market identifier, e.g. 'swth_eth1'
231
        :return: None
232
        """
233
        await self.send({
234
            "id": message_id,
235
            "method": "get_recent_trades",
236
            "params": {
237
                "market": market
238
            }
239
        })
240
241
    async def get_candlesticks(self, message_id: str, market: str, granularity: int,
242
                               from_epoch: Optional[int] = None, to_epoch: Optional[int] = None):
243
        """
244
        Requests candlesticks for market with granularity.
245
246
        :param message_id: Identifier that will be included in the websocket message response to allow the subscriber to
247
                           identify which channel the notification is originated from.
248
        :param market: Tradehub market identifier, e.g. 'swth_eth1'
249
        :param granularity: Define the candlesticks granularity. Allowed values: 1, 5, 15, 30, 60, 1360, 1440.
250
        :param from_epoch: Starting from epoch seconds.
251
        :param to_epoch: Ending to epoch seconds.
252
        :return: None
253
        """
254
        if granularity not in [1, 5, 15, 30, 60, 1360, 1440]:
255
            raise ValueError(f"Granularity '{granularity}' not supported. Allowed values: 1, 5, 15, 30, 60, 1360, 1440")
256
        await self.send({
257
            "id": message_id,
258
            "method": "get_candlesticks",
259
            "params": {
260
                "market": market,
261
                "resolution": granularity,
262
                "from": from_epoch,
263
                "to": to_epoch
264
            }
265
        })
266
267
    async def get_open_orders(self, message_id: str, swth_address: str, market: Optional[str] = None):
268
        """
269
        Request open orders.
270
271
        .. note::
272
            The market identifier is optional and acts as a filter.
273
274
        :param message_id: Identifier that will be included in the websocket message response to allow the subscriber to
275
                           identify which channel the notification is originated from.
276
        :param swth_address: Tradehub wallet address starting with 'swth1' for mainnet and 'tswth1' for testnet.
277
        :param market: Tradehub market identifier, e.g. 'swth_eth1'
278
        :return: None
279
        """
280
        await self.send({
281
            "id": message_id,
282
            "method": "get_open_orders",
283
            "params": {
284
                "address": swth_address,
285
                "market": market
286
            }
287
        })
288
289
    async def get_account_trades(self, message_id: str, swth_address: str,
290
                                 market: Optional[str] = None, page: Optional[int] = None):
291
        """
292
        Request account trades.
293
294
        .. note::
295
            The market identifier is optional and acts as a filter.
296
297
        .. warning::
298
            The parameter page results in an error. Do not use it(yet).
299
300
        :param message_id: Identifier that will be included in the websocket message response to allow the subscriber to
301
                           identify which channel the notification is originated from.
302
        :param swth_address: Tradehub wallet address starting with 'swth1' for mainnet and 'tswth1' for testnet.
303
        :param market: Tradehub market identifier, e.g. 'swth_eth1'.
304
        :param page: Used for pagination.
305
        :return: None
306
        """
307
        # TODO page does not work and causes an error
308
        await self.send({
309
            "id": message_id,
310
            "method": "get_account_trades",
311
            "params": {
312
                "address": swth_address,
313
                "market": market,
314
                "page": page
315
            }
316
        })
317
318
    async def get_market_stats(self, message_id: str, market: Optional[str] = None):
319
        """
320
        Request market stats.
321
322
        .. warning::
323
            Parameter 'market' has no effect. Maybe not intended as parameter. Request will result in all market stats.
324
325
        :param message_id: Identifier that will be included in the websocket message response to allow the subscriber to
326
                           identify which channel the notification is originated from.
327
        :param market: Tradehub market identifier, e.g. 'swth_eth1'
328
        :return: None
329
        """
330
        # TODO market has no effect
331
        await self.send({
332
            "id": message_id,
333
            "method": "get_market_stats",
334
            "params": {
335
                "market": market
336
            }
337
        })
338
339
    async def get_leverages(self, message_id: str, swth_address: str, market: Optional[str] = None):
340
        """
341
        Request leverages.
342
343
        .. note::
344
            The market identifier is optional and acts as a filter.
345
346
        .. warning::
347
            The request method has not been tested yet.
348
349
        :param message_id: Identifier that will be included in the websocket message response to allow the subscriber to
350
                           identify which channel the notification is originated from.
351
        :param swth_address: Tradehub wallet address starting with 'swth1' for mainnet and 'tswth1' for testnet.
352
        :param market: Tradehub market identifier, e.g. 'swth_eth1'.
353
        :return: None
354
        """
355
        # TODO not tested yet
356
        await self.send({
357
            "id": message_id,
358
            "method": "get_leverages",
359
            "params": {
360
                "address": swth_address,
361
                "market": market
362
            }
363
        })
364
365
    async def get_open_positions(self, message_id: str, swth_address: str, market: Optional[str] = None):
366
        """
367
        Request open positions.
368
369
        .. note::
370
            The market identifier is optional and acts as a filter.
371
372
        .. warning::
373
            The request method has not been tested yet.
374
375
        :param message_id: Identifier that will be included in the websocket message response to allow the subscriber to
376
                           identify which channel the notification is originated from.
377
        :param swth_address: Tradehub wallet address starting with 'swth1' for mainnet and 'tswth1' for testnet.
378
        :param market: Tradehub market identifier, e.g. 'swth_eth1'.
379
        :return: None
380
        """
381
        # TODO not tested yet
382
        await self.send({
383
            "id": message_id,
384
            "method": "get_open_positions",
385
            "params": {
386
                "address": swth_address,
387
                "market": market
388
            }
389
        })
390
391
    async def get_closed_positions(self, message_id: str, swth_address: str, market: Optional[str] = None):
392
        """
393
        Request closed positions.
394
395
        .. note::
396
            The market identifier is optional and acts as a filter.
397
398
        .. warning::
399
            The request method has not been tested yet.
400
401
        :param message_id: Identifier that will be included in the websocket message response to allow the subscriber to
402
                           identify which channel the notification is originated from.
403
        :param swth_address: Tradehub wallet address starting with 'swth1' for mainnet and 'tswth1' for testnet.
404
        :param market: Tradehub market identifier, e.g. 'swth_eth1'.
405
        :return: None
406
        """
407
        # TODO not tested yet
408
        await self.send({
409
            "id": message_id,
410
            "method": "get_closed_positions",
411
            "params": {
412
                "address": swth_address,
413
                "market": market
414
            }
415
        })
416
417
    async def send(self, data: dict):
418
        """
419
        Send data to websocket server. Provided data will be translated to json.
420
421
        :param data: data as dictionary.
422
        :return:
423
        """
424
        await self._websocket.send(json.dumps(data))
425
426
    async def connect(self,
427
                      on_receive_message_callback: Callable,
428
                      on_connect_callback: Optional[Callable] = None,
429
                      on_error_callback: Optional[Callable] = None):
430
        """
431
        Connect to websocket server.
432
433
        .. warning::
434
            Callbacks need to be NON-BLOCKING! Otherwise the PING-PONG coroutine is blocked and the server will close
435
            the connection. You will not receive any notification about this.
436
437
        :param on_receive_message_callback: async callback which is called with the received message as dict.
438
        :param on_connect_callback: async callback which is called if websocket is connected.
439
        :param on_error_callback: async callback which is called if websocket has an error.
440
        :return: None
441
        """
442
        try:
443
            async with websockets.connect(self._uri,
444
                                          ping_interval=self._ping_interval,
445
                                          ping_timeout=self._ping_timeout) as websocket:
446
                self._websocket = websocket
447
448
                if on_connect_callback:
449
                    await on_connect_callback()
450
451
                async for message in websocket:
452
                    data = json.loads(message)
453
                    await on_receive_message_callback(data)
454
        except Exception as e:
455
            if on_error_callback:
456
                await on_error_callback(e)
457
            else:
458
                raise e
459