|
1
|
|
|
from __future__ import absolute_import |
|
2
|
|
|
|
|
3
|
|
|
import websocket |
|
4
|
|
|
import threading |
|
5
|
|
|
import logging |
|
6
|
|
|
import json |
|
7
|
|
|
import random |
|
8
|
|
|
import time |
|
9
|
|
|
|
|
10
|
|
|
|
|
11
|
|
|
class WebsocketHandler(object): |
|
12
|
|
|
"""WebsocketHandler handles websocket connections to a ConnectorDB server. It allows |
|
13
|
|
|
subscribing and unsubscribing from inputs/outputs. The handler also deals with dropped |
|
14
|
|
|
connections, repeatedly attempting to reconnect to the server whenever connection is lost.""" |
|
15
|
|
|
"""The maximum time to wait between reconnection attempts""" |
|
16
|
|
|
reconnect_time_max_seconds = 8 * 60.0 |
|
17
|
|
|
"""Multiply the wait time by this factor when a reconnect fails""" |
|
18
|
|
|
reconnect_time_backoff_multiplier = 1.5 |
|
19
|
|
|
"""The time in seconds to wait before an initial attempt at reconnecting""" |
|
20
|
|
|
reconnect_time_starting_seconds = 1.0 |
|
21
|
|
|
"""The time between pings that results in a connection timeout""" |
|
22
|
|
|
connection_ping_timeout = 60 * 2 |
|
23
|
|
|
|
|
24
|
|
|
def __init__(self, server_url, basic_auth): |
|
25
|
|
|
""" |
|
26
|
|
|
The handler is initialized as follows:: |
|
27
|
|
|
from requests.auth import HTTPBasicAuth |
|
28
|
|
|
req = HTTPBasicAuth(username,password) |
|
29
|
|
|
ws = WebsocketHandler("https://connectordb.com",req) |
|
30
|
|
|
""" |
|
31
|
|
|
|
|
32
|
|
|
# The websocket is at /api/v1/websocket, and the server_url includes the /api/v1/ |
|
33
|
|
|
server_url += "websocket" |
|
34
|
|
|
|
|
35
|
|
|
# First we must get the websocket URI from the server URL |
|
36
|
|
|
self.ws_url = "wss://" + server_url[8:] |
|
37
|
|
|
if server_url.startswith("http://"): |
|
38
|
|
|
self.ws_url = "ws://" + server_url[7:] |
|
39
|
|
|
|
|
40
|
|
|
self.setauth(basic_auth) |
|
41
|
|
|
|
|
42
|
|
|
# Set up the variable which will hold all of the subscriptions |
|
43
|
|
|
self.subscriptions = {} |
|
44
|
|
|
self.subscription_lock = threading.Lock() |
|
45
|
|
|
|
|
46
|
|
|
# The server periodically sends ping messages during websocket connection. |
|
47
|
|
|
# we keep track of the pings so that we notice loss of connection |
|
48
|
|
|
self.lastpingtime = time.time() |
|
49
|
|
|
self.pingtimer = None |
|
50
|
|
|
|
|
51
|
|
|
# Now set up the websocket |
|
52
|
|
|
self.ws = None |
|
53
|
|
|
self.ws_thread = None # The thread where the websocket runs |
|
54
|
|
|
self.ws_openlock = threading.Lock() |
|
55
|
|
|
self.ws_sendlock = threading.Lock() |
|
56
|
|
|
|
|
57
|
|
|
# Set up the websocket status |
|
58
|
|
|
self._status = "disconnected" |
|
59
|
|
|
self._status_lock = threading.Lock() |
|
60
|
|
|
|
|
61
|
|
|
# Set up the reconnect time |
|
62
|
|
|
self.reconnect_time = self.reconnect_time_starting_seconds |
|
63
|
|
|
|
|
64
|
|
|
# Set up the times that we were connected and disconnected. These allow for |
|
65
|
|
|
# setting up reconnect delays correctly |
|
66
|
|
|
self.connected_time = 0 |
|
67
|
|
|
self.disconnected_time = 0 |
|
68
|
|
|
|
|
69
|
|
|
def setauth(self,basic_auth): |
|
70
|
|
|
""" setauth can be used during runtime to make sure that authentication is reset. |
|
71
|
|
|
it can be used when changing passwords/apikeys to make sure reconnects succeed """ |
|
72
|
|
|
self.headers = [] |
|
73
|
|
|
# If we have auth |
|
74
|
|
|
if basic_auth is not None: |
|
75
|
|
|
# we use a cheap hack to get the basic auth header out of the auth object. |
|
76
|
|
|
# This snippet ends up with us having an array of the necessary headers |
|
77
|
|
|
# to perform authentication. |
|
78
|
|
|
class auth_extractor(): |
|
79
|
|
|
def __init__(self): |
|
80
|
|
|
self.headers = {} |
|
81
|
|
|
|
|
82
|
|
|
extractor = auth_extractor() |
|
83
|
|
|
basic_auth(extractor) |
|
84
|
|
|
|
|
85
|
|
|
for header in extractor.headers: |
|
86
|
|
|
self.headers.append("%s: %s" % (header, extractor.headers[header])) |
|
87
|
|
|
|
|
88
|
|
|
@property |
|
89
|
|
|
def status(self): |
|
90
|
|
|
status = "" |
|
91
|
|
|
with self._status_lock: |
|
92
|
|
|
status = self._status |
|
93
|
|
|
return status |
|
94
|
|
|
|
|
95
|
|
|
@status.setter |
|
96
|
|
|
def status(self, newstatus): |
|
97
|
|
|
with self._status_lock: |
|
98
|
|
|
self._status = newstatus |
|
99
|
|
|
logging.debug("ConnectorDB:WS:STATUS: %s", newstatus) |
|
100
|
|
|
|
|
101
|
|
|
def send(self, cmd): |
|
102
|
|
|
"""Send the given command thru the websocket""" |
|
103
|
|
|
with self.ws_sendlock: |
|
104
|
|
|
self.ws.send(json.dumps(cmd)) |
|
105
|
|
|
|
|
106
|
|
|
def insert(self, stream, data): |
|
107
|
|
|
"""Insert the given datapoints into the stream""" |
|
108
|
|
|
self.send({"cmd": "insert", "arg": stream, "d": data}) |
|
109
|
|
|
|
|
110
|
|
|
def subscribe(self, stream, callback, transform=""): |
|
111
|
|
|
"""Given a stream, a callback and an optional transform, sets up the subscription""" |
|
112
|
|
|
if self.status == "disconnected" or self.status == "disconnecting" or self.status == "connecting": |
|
113
|
|
|
self.connect() |
|
114
|
|
|
if self.status is not "connected": |
|
115
|
|
|
return False |
|
116
|
|
|
logging.debug("Subscribing to %s", stream) |
|
117
|
|
|
|
|
118
|
|
|
self.send({"cmd": "subscribe", "arg": stream, "transform": transform}) |
|
119
|
|
|
with self.subscription_lock: |
|
120
|
|
|
self.subscriptions[stream + ":" + transform] = callback |
|
121
|
|
|
return True |
|
122
|
|
|
|
|
123
|
|
|
def unsubscribe(self, stream, transform=""): |
|
124
|
|
|
"""Unsubscribe from the given stream (with the optional transform)""" |
|
125
|
|
|
if self.status is not "connected": |
|
126
|
|
|
return False |
|
127
|
|
|
logging.debug("Unsubscribing from %s", stream) |
|
128
|
|
|
self.send( |
|
129
|
|
|
{"cmd": "unsubscribe", |
|
130
|
|
|
"arg": stream, |
|
131
|
|
|
"transform": transform}) |
|
132
|
|
|
|
|
133
|
|
|
self.subscription_lock.acquire() |
|
134
|
|
|
del self.subscriptions[stream + ":" + transform] |
|
135
|
|
|
if len(self.subscriptions) is 0: |
|
136
|
|
|
self.subscription_lock.release() |
|
137
|
|
|
self.disconnect() |
|
138
|
|
|
else: |
|
139
|
|
|
self.subscription_lock.release() |
|
140
|
|
|
|
|
141
|
|
|
def connect(self): |
|
142
|
|
|
"""Attempt to connect to the websocket - and returns either True or False depending on if |
|
143
|
|
|
the connection was successful or not""" |
|
144
|
|
|
|
|
145
|
|
|
# Wait for the lock to be available (ie, the websocket is not being used (yet)) |
|
146
|
|
|
self.ws_openlock.acquire() |
|
147
|
|
|
self.ws_openlock.release() |
|
148
|
|
|
|
|
149
|
|
|
if self.status == "connected": |
|
150
|
|
|
return True # Already connected |
|
151
|
|
|
if self.status == "disconnecting": |
|
152
|
|
|
# If currently disconnecting, wait a moment, and retry connect |
|
153
|
|
|
time.sleep(0.1) |
|
154
|
|
|
return self.connect() |
|
155
|
|
|
if self.status == "disconnected" or self.status == "reconnecting": |
|
156
|
|
|
self.ws = websocket.WebSocketApp(self.ws_url, |
|
157
|
|
|
header=self.headers, |
|
158
|
|
|
on_message=self.__on_message, |
|
159
|
|
|
on_ping=self.__on_ping, |
|
160
|
|
|
on_open=self.__on_open, |
|
161
|
|
|
on_close=self.__on_close, |
|
162
|
|
|
on_error=self.__on_error) |
|
163
|
|
|
self.ws_thread = threading.Thread(target=self.ws.run_forever) |
|
164
|
|
|
self.ws_thread.daemon = True |
|
165
|
|
|
|
|
166
|
|
|
self.status = "connecting" |
|
167
|
|
|
self.ws_openlock.acquire() |
|
168
|
|
|
self.ws_thread.start() |
|
169
|
|
|
|
|
170
|
|
|
self.ws_openlock.acquire() |
|
171
|
|
|
self.ws_openlock.release() |
|
172
|
|
|
|
|
173
|
|
|
return self.status == "connected" |
|
174
|
|
|
|
|
175
|
|
|
def disconnect(self): |
|
176
|
|
|
if self.status == "connected": |
|
177
|
|
|
self.status = "disconnecting" |
|
178
|
|
|
with self.subscription_lock: |
|
179
|
|
|
self.subscriptions = {} |
|
180
|
|
|
|
|
181
|
|
|
self.ws.close() |
|
182
|
|
|
self.__on_close(self.ws) |
|
183
|
|
|
|
|
184
|
|
|
def __reconnect(self): |
|
185
|
|
|
"""This is called when a connection is lost - it attempts to reconnect to the server""" |
|
186
|
|
|
self.status = "reconnecting" |
|
187
|
|
|
|
|
188
|
|
|
# Reset the disconnect time after 15 minutes |
|
189
|
|
|
if self.disconnected_time - self.connected_time > 15 * 60: |
|
190
|
|
|
self.reconnect_time = self.reconnect_time_starting_seconds |
|
191
|
|
|
else: |
|
192
|
|
|
self.reconnect_time *= self.reconnect_time_backoff_multiplier |
|
193
|
|
|
|
|
194
|
|
|
if self.reconnect_time > self.reconnect_time_max_seconds: |
|
195
|
|
|
self.reconnect_time = self.reconnect_time_max_seconds |
|
196
|
|
|
|
|
197
|
|
|
# We want to add some randomness to the reconnect rate - necessary so that we don't pound the server |
|
198
|
|
|
# if it goes down |
|
199
|
|
|
self.reconnect_time *= 1 + random.uniform(-0.2, 0.2) |
|
200
|
|
|
|
|
201
|
|
|
if self.reconnect_time < self.reconnect_time_starting_seconds: |
|
202
|
|
|
self.reconnect_time = self.reconnect_time_starting_seconds |
|
203
|
|
|
|
|
204
|
|
|
logging.warn("ConnectorDB:WS: Attempting to reconnect in %fs", |
|
205
|
|
|
self.reconnect_time) |
|
206
|
|
|
|
|
207
|
|
|
self.reconnector = threading.Timer(self.reconnect_time, |
|
208
|
|
|
self.__reconnect_fnc) |
|
209
|
|
|
self.reconnector.daemon = True |
|
210
|
|
|
self.reconnector.start() |
|
211
|
|
|
|
|
212
|
|
|
def __reconnect_fnc(self): |
|
213
|
|
|
"""This function is called by reconnect after the time delay""" |
|
214
|
|
|
if self.connect(): |
|
215
|
|
|
self.__resubscribe() |
|
216
|
|
|
else: |
|
217
|
|
|
self.__reconnect() |
|
218
|
|
|
|
|
219
|
|
|
def __resubscribe(self): |
|
220
|
|
|
"""Send subscribe command for all existing subscriptions. This allows to resume a connection |
|
221
|
|
|
that was closed""" |
|
222
|
|
|
with self.subscription_lock: |
|
223
|
|
|
for sub in self.subscriptions: |
|
224
|
|
|
logging.debug("Resubscribing to %s", sub) |
|
225
|
|
|
stream_transform = sub.split(":", 1) |
|
226
|
|
|
self.send({ |
|
227
|
|
|
"cmd": "subscribe", |
|
228
|
|
|
"arg": stream_transform[0], |
|
229
|
|
|
"transform": stream_transform[1] |
|
230
|
|
|
}) |
|
231
|
|
|
|
|
232
|
|
|
def __on_open(self, ws): |
|
233
|
|
|
"""Called when the websocket is opened""" |
|
234
|
|
|
logging.debug("ConnectorDB: Websocket opened") |
|
235
|
|
|
|
|
236
|
|
|
# Connection success - decrease the wait time for next connection |
|
237
|
|
|
self.reconnect_time /= self.reconnect_time_backoff_multiplier |
|
238
|
|
|
|
|
239
|
|
|
self.status = "connected" |
|
240
|
|
|
|
|
241
|
|
|
self.lastpingtime = time.time() |
|
242
|
|
|
self.__ensure_ping() |
|
243
|
|
|
|
|
244
|
|
|
self.connected_time = time.time() |
|
245
|
|
|
|
|
246
|
|
|
# Release the lock that connect called |
|
247
|
|
|
self.ws_openlock.release() |
|
248
|
|
|
|
|
249
|
|
|
def __on_close(self, ws): |
|
250
|
|
|
"""Called when the websocket is closed""" |
|
251
|
|
|
if self.status == "disconnected": |
|
252
|
|
|
return # This can be double-called on disconnect |
|
253
|
|
|
logging.debug("ConnectorDB:WS: Websocket closed") |
|
254
|
|
|
|
|
255
|
|
|
# Turn off the ping timer |
|
256
|
|
|
if self.pingtimer is not None: |
|
257
|
|
|
self.pingtimer.cancel() |
|
258
|
|
|
|
|
259
|
|
|
self.disconnected_time = time.time() |
|
260
|
|
|
if self.status == "disconnecting": |
|
261
|
|
|
self.status = "disconnected" |
|
262
|
|
|
elif self.status == "connected": |
|
263
|
|
|
self.__reconnect() |
|
264
|
|
|
|
|
265
|
|
|
def __on_error(self, ws, err): |
|
266
|
|
|
"""Called when there is an error in the websocket""" |
|
267
|
|
|
logging.debug("ConnectorDB:WS: Connection Error") |
|
268
|
|
|
|
|
269
|
|
|
if self.status == "connecting": |
|
270
|
|
|
self.status = "errored" |
|
271
|
|
|
self.ws_openlock.release() # Release the lock of connecting |
|
272
|
|
|
|
|
273
|
|
|
def __on_message(self, ws, msg): |
|
274
|
|
|
"""This function is called whenever there is a message received from the server""" |
|
275
|
|
|
msg = json.loads(msg) |
|
276
|
|
|
logging.debug("ConnectorDB:WS: Msg '%s'", msg["stream"]) |
|
277
|
|
|
|
|
278
|
|
|
# Build the subcription key |
|
279
|
|
|
stream_key = msg["stream"] + ":" |
|
280
|
|
|
if "transform" in msg: |
|
281
|
|
|
stream_key += msg["transform"] |
|
282
|
|
|
|
|
283
|
|
|
self.subscription_lock.acquire() |
|
284
|
|
|
if stream_key in self.subscriptions: |
|
285
|
|
|
subscription_function = self.subscriptions[stream_key] |
|
286
|
|
|
self.subscription_lock.release() |
|
287
|
|
|
|
|
288
|
|
|
fresult = subscription_function(msg["stream"], msg["data"]) |
|
289
|
|
|
|
|
290
|
|
|
if fresult is True: |
|
291
|
|
|
# This is a special result - if the subscription function of a downlink returns True, |
|
292
|
|
|
# then the datapoint is acknowledged automatically (ie, reinserted in non-downlink stream) |
|
293
|
|
|
fresult = msg["data"] |
|
294
|
|
|
|
|
295
|
|
|
if fresult is not False and fresult is not None and msg["stream"].endswith( |
|
296
|
|
|
"/downlink") and msg["stream"].count("/") == 3: |
|
297
|
|
|
# If the above conditions are true, it means that the datapoints were from a downlink, |
|
298
|
|
|
# and the subscriber function chooses to acknowledge them, so we reinsert them. |
|
299
|
|
|
self.insert(msg["stream"][:-9], fresult) |
|
300
|
|
|
else: |
|
301
|
|
|
self.subscription_lock.release() |
|
302
|
|
|
logging.warn( |
|
303
|
|
|
"ConnectorDB:WS: Msg '%s' not subscribed! Subscriptions: %s", |
|
304
|
|
|
msg["stream"], list(self.subscriptions.keys())) |
|
305
|
|
|
|
|
306
|
|
|
def __on_ping(self, ws, data): |
|
307
|
|
|
"""The server periodically sends us websocket ping messages to keep the connection alive. To |
|
308
|
|
|
ensure that the connection to the server is still active, we memorize the most recent ping's time |
|
309
|
|
|
and we periodically ensure that a ping was received in __ensure_ping""" |
|
310
|
|
|
logging.debug("ConnectorDB:WS: ping") |
|
311
|
|
|
self.lastpingtime = time.time() |
|
312
|
|
|
|
|
313
|
|
|
def __ensure_ping(self): |
|
314
|
|
|
"""Each time the server sends a ping message, we record the timestamp. If we haven't received a ping |
|
315
|
|
|
within the given interval, then we assume that the connection was lost, close the websocket and |
|
316
|
|
|
attempt to reconnect""" |
|
317
|
|
|
|
|
318
|
|
|
logging.debug("ConnectorDB:WS: pingcheck") |
|
319
|
|
|
if (time.time() - self.lastpingtime > self.connection_ping_timeout): |
|
320
|
|
|
logging.warn("ConnectorDB:WS: Websocket ping timed out!") |
|
321
|
|
|
if self.ws is not None: |
|
322
|
|
|
self.ws.close() |
|
323
|
|
|
self.__on_close(self.ws) |
|
324
|
|
|
else: |
|
325
|
|
|
# reset the ping timer |
|
326
|
|
|
self.pingtimer = threading.Timer(self.connection_ping_timeout, |
|
327
|
|
|
self.__ensure_ping) |
|
328
|
|
|
self.pingtimer.daemon = True |
|
329
|
|
|
self.pingtimer.start() |
|
330
|
|
|
|
|
331
|
|
|
def __del__(self): |
|
332
|
|
|
"""Make sure that all threads shut down when needed""" |
|
333
|
|
|
self.disconnect() |
|
334
|
|
|
|