1
|
|
|
from __future__ import absolute_import |
2
|
|
|
|
3
|
|
|
import websocket |
4
|
|
|
import threading |
5
|
|
|
import logging |
6
|
|
|
import json |
7
|
|
|
import random |
8
|
|
|
import time |
9
|
|
|
|
10
|
|
|
|
11
|
|
|
class WebsocketHandler(object): |
12
|
|
|
"""WebsocketHandler handles websocket connections to a ConnectorDB server. It allows |
13
|
|
|
subscribing and unsubscribing from inputs/outputs. The handler also deals with dropped |
14
|
|
|
connections, repeatedly attempting to reconnect to the server whenever connection is lost.""" |
15
|
|
|
"""The maximum time to wait between reconnection attempts""" |
16
|
|
|
reconnect_time_max_seconds = 8 * 60.0 |
17
|
|
|
"""Multiply the wait time by this factor when a reconnect fails""" |
18
|
|
|
reconnect_time_backoff_multiplier = 1.5 |
19
|
|
|
"""The time in seconds to wait before an initial attempt at reconnecting""" |
20
|
|
|
reconnect_time_starting_seconds = 1.0 |
21
|
|
|
"""The time between pings that results in a connection timeout""" |
22
|
|
|
connection_ping_timeout = 60 * 2 |
23
|
|
|
|
24
|
|
|
def __init__(self, server_url, basic_auth): |
25
|
|
|
""" |
26
|
|
|
The handler is initialized as follows:: |
27
|
|
|
from requests.auth import HTTPBasicAuth |
28
|
|
|
req = HTTPBasicAuth(username,password) |
29
|
|
|
ws = WebsocketHandler("https://connectordb.com",req) |
30
|
|
|
""" |
31
|
|
|
|
32
|
|
|
# The websocket is at /api/v1/websocket, and the server_url includes the /api/v1/ |
33
|
|
|
server_url += "websocket" |
34
|
|
|
|
35
|
|
|
# First we must get the websocket URI from the server URL |
36
|
|
|
self.ws_url = "wss://" + server_url[8:] |
37
|
|
|
if server_url.startswith("http://"): |
38
|
|
|
self.ws_url = "ws://" + server_url[7:] |
39
|
|
|
|
40
|
|
|
self.setauth(basic_auth) |
41
|
|
|
|
42
|
|
|
# Set up the variable which will hold all of the subscriptions |
43
|
|
|
self.subscriptions = {} |
44
|
|
|
self.subscription_lock = threading.Lock() |
45
|
|
|
|
46
|
|
|
# The server periodically sends ping messages during websocket connection. |
47
|
|
|
# we keep track of the pings so that we notice loss of connection |
48
|
|
|
self.lastpingtime = time.time() |
49
|
|
|
self.pingtimer = None |
50
|
|
|
|
51
|
|
|
# Now set up the websocket |
52
|
|
|
self.ws = None |
53
|
|
|
self.ws_thread = None # The thread where the websocket runs |
54
|
|
|
self.ws_openlock = threading.Lock() |
55
|
|
|
self.ws_sendlock = threading.Lock() |
56
|
|
|
|
57
|
|
|
# Set up the websocket status |
58
|
|
|
self._status = "disconnected" |
59
|
|
|
self._status_lock = threading.Lock() |
60
|
|
|
|
61
|
|
|
# Set up the reconnect time |
62
|
|
|
self.reconnect_time = self.reconnect_time_starting_seconds |
63
|
|
|
|
64
|
|
|
# Set up the times that we were connected and disconnected. These allow for |
65
|
|
|
# setting up reconnect delays correctly |
66
|
|
|
self.connected_time = 0 |
67
|
|
|
self.disconnected_time = 0 |
68
|
|
|
|
69
|
|
|
def setauth(self,basic_auth): |
70
|
|
|
""" setauth can be used during runtime to make sure that authentication is reset. |
71
|
|
|
it can be used when changing passwords/apikeys to make sure reconnects succeed """ |
72
|
|
|
self.headers = [] |
73
|
|
|
# If we have auth |
74
|
|
|
if basic_auth is not None: |
75
|
|
|
# we use a cheap hack to get the basic auth header out of the auth object. |
76
|
|
|
# This snippet ends up with us having an array of the necessary headers |
77
|
|
|
# to perform authentication. |
78
|
|
|
class auth_extractor(): |
79
|
|
|
def __init__(self): |
80
|
|
|
self.headers = {} |
81
|
|
|
|
82
|
|
|
extractor = auth_extractor() |
83
|
|
|
basic_auth(extractor) |
84
|
|
|
|
85
|
|
|
for header in extractor.headers: |
86
|
|
|
self.headers.append("%s: %s" % (header, extractor.headers[header])) |
87
|
|
|
|
88
|
|
|
@property |
89
|
|
|
def status(self): |
90
|
|
|
status = "" |
91
|
|
|
with self._status_lock: |
92
|
|
|
status = self._status |
93
|
|
|
return status |
94
|
|
|
|
95
|
|
|
@status.setter |
96
|
|
|
def status(self, newstatus): |
97
|
|
|
with self._status_lock: |
98
|
|
|
self._status = newstatus |
99
|
|
|
logging.debug("ConnectorDB:WS:STATUS: %s", newstatus) |
100
|
|
|
|
101
|
|
|
def send(self, cmd): |
102
|
|
|
"""Send the given command thru the websocket""" |
103
|
|
|
with self.ws_sendlock: |
104
|
|
|
self.ws.send(json.dumps(cmd)) |
105
|
|
|
|
106
|
|
|
def insert(self, stream, data): |
107
|
|
|
"""Insert the given datapoints into the stream""" |
108
|
|
|
self.send({"cmd": "insert", "arg": stream, "d": data}) |
109
|
|
|
|
110
|
|
|
def subscribe(self, stream, callback, transform=""): |
111
|
|
|
"""Given a stream, a callback and an optional transform, sets up the subscription""" |
112
|
|
|
if self.status == "disconnected" or self.status == "disconnecting" or self.status == "connecting": |
113
|
|
|
self.connect() |
114
|
|
|
if self.status is not "connected": |
115
|
|
|
return False |
116
|
|
|
logging.debug("Subscribing to %s", stream) |
117
|
|
|
|
118
|
|
|
self.send({"cmd": "subscribe", "arg": stream, "transform": transform}) |
119
|
|
|
with self.subscription_lock: |
120
|
|
|
self.subscriptions[stream + ":" + transform] = callback |
121
|
|
|
return True |
122
|
|
|
|
123
|
|
|
def unsubscribe(self, stream, transform=""): |
124
|
|
|
"""Unsubscribe from the given stream (with the optional transform)""" |
125
|
|
|
if self.status is not "connected": |
126
|
|
|
return False |
127
|
|
|
logging.debug("Unsubscribing from %s", stream) |
128
|
|
|
self.send( |
129
|
|
|
{"cmd": "unsubscribe", |
130
|
|
|
"arg": stream, |
131
|
|
|
"transform": transform}) |
132
|
|
|
|
133
|
|
|
self.subscription_lock.acquire() |
134
|
|
|
del self.subscriptions[stream + ":" + transform] |
135
|
|
|
if len(self.subscriptions) is 0: |
136
|
|
|
self.subscription_lock.release() |
137
|
|
|
self.disconnect() |
138
|
|
|
else: |
139
|
|
|
self.subscription_lock.release() |
140
|
|
|
|
141
|
|
|
def connect(self): |
142
|
|
|
"""Attempt to connect to the websocket - and returns either True or False depending on if |
143
|
|
|
the connection was successful or not""" |
144
|
|
|
|
145
|
|
|
# Wait for the lock to be available (ie, the websocket is not being used (yet)) |
146
|
|
|
self.ws_openlock.acquire() |
147
|
|
|
self.ws_openlock.release() |
148
|
|
|
|
149
|
|
|
if self.status == "connected": |
150
|
|
|
return True # Already connected |
151
|
|
|
if self.status == "disconnecting": |
152
|
|
|
# If currently disconnecting, wait a moment, and retry connect |
153
|
|
|
time.sleep(0.1) |
154
|
|
|
return self.connect() |
155
|
|
|
if self.status == "disconnected" or self.status == "reconnecting": |
156
|
|
|
self.ws = websocket.WebSocketApp(self.ws_url, |
157
|
|
|
header=self.headers, |
158
|
|
|
on_message=self.__on_message, |
159
|
|
|
on_ping=self.__on_ping, |
160
|
|
|
on_open=self.__on_open, |
161
|
|
|
on_close=self.__on_close, |
162
|
|
|
on_error=self.__on_error) |
163
|
|
|
self.ws_thread = threading.Thread(target=self.ws.run_forever) |
164
|
|
|
self.ws_thread.daemon = True |
165
|
|
|
|
166
|
|
|
self.status = "connecting" |
167
|
|
|
self.ws_openlock.acquire() |
168
|
|
|
self.ws_thread.start() |
169
|
|
|
|
170
|
|
|
self.ws_openlock.acquire() |
171
|
|
|
self.ws_openlock.release() |
172
|
|
|
|
173
|
|
|
return self.status == "connected" |
174
|
|
|
|
175
|
|
|
def disconnect(self): |
176
|
|
|
if self.status == "connected": |
177
|
|
|
self.status = "disconnecting" |
178
|
|
|
with self.subscription_lock: |
179
|
|
|
self.subscriptions = {} |
180
|
|
|
|
181
|
|
|
self.ws.close() |
182
|
|
|
self.__on_close(self.ws) |
183
|
|
|
|
184
|
|
|
def __reconnect(self): |
185
|
|
|
"""This is called when a connection is lost - it attempts to reconnect to the server""" |
186
|
|
|
self.status = "reconnecting" |
187
|
|
|
|
188
|
|
|
# Reset the disconnect time after 15 minutes |
189
|
|
|
if self.disconnected_time - self.connected_time > 15 * 60: |
190
|
|
|
self.reconnect_time = self.reconnect_time_starting_seconds |
191
|
|
|
else: |
192
|
|
|
self.reconnect_time *= self.reconnect_time_backoff_multiplier |
193
|
|
|
|
194
|
|
|
if self.reconnect_time > self.reconnect_time_max_seconds: |
195
|
|
|
self.reconnect_time = self.reconnect_time_max_seconds |
196
|
|
|
|
197
|
|
|
# We want to add some randomness to the reconnect rate - necessary so that we don't pound the server |
198
|
|
|
# if it goes down |
199
|
|
|
self.reconnect_time *= 1 + random.uniform(-0.2, 0.2) |
200
|
|
|
|
201
|
|
|
if self.reconnect_time < self.reconnect_time_starting_seconds: |
202
|
|
|
self.reconnect_time = self.reconnect_time_starting_seconds |
203
|
|
|
|
204
|
|
|
logging.warn("ConnectorDB:WS: Attempting to reconnect in %fs", |
205
|
|
|
self.reconnect_time) |
206
|
|
|
|
207
|
|
|
self.reconnector = threading.Timer(self.reconnect_time, |
208
|
|
|
self.__reconnect_fnc) |
209
|
|
|
self.reconnector.daemon = True |
210
|
|
|
self.reconnector.start() |
211
|
|
|
|
212
|
|
|
def __reconnect_fnc(self): |
213
|
|
|
"""This function is called by reconnect after the time delay""" |
214
|
|
|
if self.connect(): |
215
|
|
|
self.__resubscribe() |
216
|
|
|
else: |
217
|
|
|
self.__reconnect() |
218
|
|
|
|
219
|
|
|
def __resubscribe(self): |
220
|
|
|
"""Send subscribe command for all existing subscriptions. This allows to resume a connection |
221
|
|
|
that was closed""" |
222
|
|
|
with self.subscription_lock: |
223
|
|
|
for sub in self.subscriptions: |
224
|
|
|
logging.debug("Resubscribing to %s", sub) |
225
|
|
|
stream_transform = sub.split(":", 1) |
226
|
|
|
self.send({ |
227
|
|
|
"cmd": "subscribe", |
228
|
|
|
"arg": stream_transform[0], |
229
|
|
|
"transform": stream_transform[1] |
230
|
|
|
}) |
231
|
|
|
|
232
|
|
|
def __on_open(self, ws): |
233
|
|
|
"""Called when the websocket is opened""" |
234
|
|
|
logging.debug("ConnectorDB: Websocket opened") |
235
|
|
|
|
236
|
|
|
# Connection success - decrease the wait time for next connection |
237
|
|
|
self.reconnect_time /= self.reconnect_time_backoff_multiplier |
238
|
|
|
|
239
|
|
|
self.status = "connected" |
240
|
|
|
|
241
|
|
|
self.lastpingtime = time.time() |
242
|
|
|
self.__ensure_ping() |
243
|
|
|
|
244
|
|
|
self.connected_time = time.time() |
245
|
|
|
|
246
|
|
|
# Release the lock that connect called |
247
|
|
|
self.ws_openlock.release() |
248
|
|
|
|
249
|
|
|
def __on_close(self, ws): |
250
|
|
|
"""Called when the websocket is closed""" |
251
|
|
|
if self.status == "disconnected": |
252
|
|
|
return # This can be double-called on disconnect |
253
|
|
|
logging.debug("ConnectorDB:WS: Websocket closed") |
254
|
|
|
|
255
|
|
|
# Turn off the ping timer |
256
|
|
|
if self.pingtimer is not None: |
257
|
|
|
self.pingtimer.cancel() |
258
|
|
|
|
259
|
|
|
self.disconnected_time = time.time() |
260
|
|
|
if self.status == "disconnecting": |
261
|
|
|
self.status = "disconnected" |
262
|
|
|
elif self.status == "connected": |
263
|
|
|
self.__reconnect() |
264
|
|
|
|
265
|
|
|
def __on_error(self, ws, err): |
266
|
|
|
"""Called when there is an error in the websocket""" |
267
|
|
|
logging.debug("ConnectorDB:WS: Connection Error") |
268
|
|
|
|
269
|
|
|
if self.status == "connecting": |
270
|
|
|
self.status = "errored" |
271
|
|
|
self.ws_openlock.release() # Release the lock of connecting |
272
|
|
|
|
273
|
|
|
def __on_message(self, ws, msg): |
274
|
|
|
"""This function is called whenever there is a message received from the server""" |
275
|
|
|
msg = json.loads(msg) |
276
|
|
|
logging.debug("ConnectorDB:WS: Msg '%s'", msg["stream"]) |
277
|
|
|
|
278
|
|
|
# Build the subcription key |
279
|
|
|
stream_key = msg["stream"] + ":" |
280
|
|
|
if "transform" in msg: |
281
|
|
|
stream_key += msg["transform"] |
282
|
|
|
|
283
|
|
|
self.subscription_lock.acquire() |
284
|
|
|
if stream_key in self.subscriptions: |
285
|
|
|
subscription_function = self.subscriptions[stream_key] |
286
|
|
|
self.subscription_lock.release() |
287
|
|
|
|
288
|
|
|
fresult = subscription_function(msg["stream"], msg["data"]) |
289
|
|
|
|
290
|
|
|
if fresult is True: |
291
|
|
|
# This is a special result - if the subscription function of a downlink returns True, |
292
|
|
|
# then the datapoint is acknowledged automatically (ie, reinserted in non-downlink stream) |
293
|
|
|
fresult = msg["data"] |
294
|
|
|
|
295
|
|
|
if fresult is not False and fresult is not None and msg["stream"].endswith( |
296
|
|
|
"/downlink") and msg["stream"].count("/") == 3: |
297
|
|
|
# If the above conditions are true, it means that the datapoints were from a downlink, |
298
|
|
|
# and the subscriber function chooses to acknowledge them, so we reinsert them. |
299
|
|
|
self.insert(msg["stream"][:-9], fresult) |
300
|
|
|
else: |
301
|
|
|
self.subscription_lock.release() |
302
|
|
|
logging.warn( |
303
|
|
|
"ConnectorDB:WS: Msg '%s' not subscribed! Subscriptions: %s", |
304
|
|
|
msg["stream"], list(self.subscriptions.keys())) |
305
|
|
|
|
306
|
|
|
def __on_ping(self, ws, data): |
307
|
|
|
"""The server periodically sends us websocket ping messages to keep the connection alive. To |
308
|
|
|
ensure that the connection to the server is still active, we memorize the most recent ping's time |
309
|
|
|
and we periodically ensure that a ping was received in __ensure_ping""" |
310
|
|
|
logging.debug("ConnectorDB:WS: ping") |
311
|
|
|
self.lastpingtime = time.time() |
312
|
|
|
|
313
|
|
|
def __ensure_ping(self): |
314
|
|
|
"""Each time the server sends a ping message, we record the timestamp. If we haven't received a ping |
315
|
|
|
within the given interval, then we assume that the connection was lost, close the websocket and |
316
|
|
|
attempt to reconnect""" |
317
|
|
|
|
318
|
|
|
logging.debug("ConnectorDB:WS: pingcheck") |
319
|
|
|
if (time.time() - self.lastpingtime > self.connection_ping_timeout): |
320
|
|
|
logging.warn("ConnectorDB:WS: Websocket ping timed out!") |
321
|
|
|
if self.ws is not None: |
322
|
|
|
self.ws.close() |
323
|
|
|
self.__on_close(self.ws) |
324
|
|
|
else: |
325
|
|
|
# reset the ping timer |
326
|
|
|
self.pingtimer = threading.Timer(self.connection_ping_timeout, |
327
|
|
|
self.__ensure_ping) |
328
|
|
|
self.pingtimer.daemon = True |
329
|
|
|
self.pingtimer.start() |
330
|
|
|
|
331
|
|
|
def __del__(self): |
332
|
|
|
"""Make sure that all threads shut down when needed""" |
333
|
|
|
self.disconnect() |
334
|
|
|
|