GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.

  D
last analyzed

Complexity

Total Complexity 59

Size/Duplication

Total Lines 323
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
c 0
b 0
f 0
dl 0
loc 323
rs 4.5454
wmc 59

20 Methods

Rating   Name   Duplication   Size   Complexity  
A ebsocketHandler.disconnect() 0 8 3
B ebsocketHandler.__on_close() 0 15 5
A ebsocketHandler.__on_ping() 0 6 1
A ebsocketHandler.__on_error() 0 7 2
A ebsocketHandler.insert() 0 3 1
A ebsocketHandler.__on_open() 0 16 1
A ebsocketHandler.setauth() 0 18 3
A ebsocketHandler.send() 0 4 2
B ebsocketHandler.connect() 0 33 5
A ebsocketHandler.unsubscribe() 0 17 3
B ebsocketHandler.__reconnect() 0 27 4
A ebsocketHandler.status() 0 6 2
B ebsocketHandler.subscribe() 0 12 6
D ebsocketHandler.__on_message() 0 32 8
B ebsocketHandler.__init__() 0 44 2
A ebsocketHandler.__reconnect_fnc() 0 6 2
A uth_extractor.__init__() 0 2 1
A ebsocketHandler.__ensure_ping() 0 17 3
A ebsocketHandler.__del__() 0 3 1
A ebsocketHandler.__resubscribe() 0 11 3

How to fix   Complexity   

Complex Class

Complex classes like often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
from __future__ import absolute_import
2
3
import websocket
4
import threading
5
import logging
6
import json
7
import random
8
import time
9
10
11
class WebsocketHandler(object):
12
    """WebsocketHandler handles websocket connections to a ConnectorDB server. It allows
13
    subscribing and unsubscribing from inputs/outputs. The handler also deals with dropped
14
    connections, repeatedly attempting to reconnect to the server whenever connection is lost."""
15
    """The maximum time to wait between reconnection attempts"""
16
    reconnect_time_max_seconds = 8 * 60.0
17
    """Multiply the wait time by this factor when a reconnect fails"""
18
    reconnect_time_backoff_multiplier = 1.5
19
    """The time in seconds to wait before an initial attempt at reconnecting"""
20
    reconnect_time_starting_seconds = 1.0
21
    """The time between pings that results in a connection timeout"""
22
    connection_ping_timeout = 60 * 2
23
24
    def __init__(self, server_url, basic_auth):
25
        """
26
        The handler is initialized as follows::
27
            from requests.auth import HTTPBasicAuth
28
            req = HTTPBasicAuth(username,password)
29
            ws = WebsocketHandler("https://connectordb.com",req)
30
        """
31
32
        # The websocket is at /api/v1/websocket, and the server_url includes the /api/v1/
33
        server_url += "websocket"
34
35
        # First we must get the websocket URI from the server URL
36
        self.ws_url = "wss://" + server_url[8:]
37
        if server_url.startswith("http://"):
38
            self.ws_url = "ws://" + server_url[7:]
39
40
        self.setauth(basic_auth)
41
42
        # Set up the variable which will hold all of the subscriptions
43
        self.subscriptions = {}
44
        self.subscription_lock = threading.Lock()
45
46
        # The server periodically sends ping messages during websocket connection.
47
        # we keep track of the pings so that we notice loss of connection
48
        self.lastpingtime = time.time()
49
        self.pingtimer = None
50
51
        # Now set up the websocket
52
        self.ws = None
53
        self.ws_thread = None  # The thread where the websocket runs
54
        self.ws_openlock = threading.Lock()
55
        self.ws_sendlock = threading.Lock()
56
57
        # Set up the websocket status
58
        self._status = "disconnected"
59
        self._status_lock = threading.Lock()
60
61
        # Set up the reconnect time
62
        self.reconnect_time = self.reconnect_time_starting_seconds
63
64
        # Set up the times that we were connected and disconnected. These allow for
65
        # setting up reconnect delays correctly
66
        self.connected_time = 0
67
        self.disconnected_time = 0
68
69
    def setauth(self,basic_auth):
70
        """ setauth can be used during runtime to make sure that authentication is reset.
71
        it can be used when changing passwords/apikeys to make sure reconnects succeed """
72
        self.headers = []
73
        # If we have auth
74
        if basic_auth is not None:
75
            # we use a cheap hack to get the basic auth header out of the auth object.
76
            # This snippet ends up with us having an array of the necessary headers
77
            # to perform authentication.
78
            class auth_extractor():
79
                def __init__(self):
80
                    self.headers = {}
81
82
            extractor = auth_extractor()
83
            basic_auth(extractor)
84
85
            for header in extractor.headers:
86
                self.headers.append("%s: %s" % (header, extractor.headers[header]))
87
88
    @property
89
    def status(self):
90
        status = ""
91
        with self._status_lock:
92
            status = self._status
93
        return status
94
95
    @status.setter
96
    def status(self, newstatus):
97
        with self._status_lock:
98
            self._status = newstatus
99
        logging.debug("ConnectorDB:WS:STATUS: %s", newstatus)
100
101
    def send(self, cmd):
102
        """Send the given command thru the websocket"""
103
        with self.ws_sendlock:
104
            self.ws.send(json.dumps(cmd))
105
106
    def insert(self, stream, data):
107
        """Insert the given datapoints into the stream"""
108
        self.send({"cmd": "insert", "arg": stream, "d": data})
109
110
    def subscribe(self, stream, callback, transform=""):
111
        """Given a stream, a callback and an optional transform, sets up the subscription"""
112
        if self.status == "disconnected" or self.status == "disconnecting" or self.status == "connecting":
113
            self.connect()
114
        if self.status is not "connected":
115
            return False
116
        logging.debug("Subscribing to %s", stream)
117
118
        self.send({"cmd": "subscribe", "arg": stream, "transform": transform})
119
        with self.subscription_lock:
120
            self.subscriptions[stream + ":" + transform] = callback
121
        return True
122
123
    def unsubscribe(self, stream, transform=""):
124
        """Unsubscribe from the given stream (with the optional transform)"""
125
        if self.status is not "connected":
126
            return False
127
        logging.debug("Unsubscribing from %s", stream)
128
        self.send(
129
            {"cmd": "unsubscribe",
130
             "arg": stream,
131
             "transform": transform})
132
133
        self.subscription_lock.acquire()
134
        del self.subscriptions[stream + ":" + transform]
135
        if len(self.subscriptions) is 0:
136
            self.subscription_lock.release()
137
            self.disconnect()
138
        else:
139
            self.subscription_lock.release()
140
141
    def connect(self):
142
        """Attempt to connect to the websocket - and returns either True or False depending on if
143
        the connection was successful or not"""
144
145
        # Wait for the lock to be available (ie, the websocket is not being used (yet))
146
        self.ws_openlock.acquire()
147
        self.ws_openlock.release()
148
149
        if self.status == "connected":
150
            return True  # Already connected
151
        if self.status == "disconnecting":
152
            # If currently disconnecting, wait a moment, and retry connect
153
            time.sleep(0.1)
154
            return self.connect()
155
        if self.status == "disconnected" or self.status == "reconnecting":
156
            self.ws = websocket.WebSocketApp(self.ws_url,
157
                                             header=self.headers,
158
                                             on_message=self.__on_message,
159
                                             on_ping=self.__on_ping,
160
                                             on_open=self.__on_open,
161
                                             on_close=self.__on_close,
162
                                             on_error=self.__on_error)
163
            self.ws_thread = threading.Thread(target=self.ws.run_forever)
164
            self.ws_thread.daemon = True
165
166
            self.status = "connecting"
167
            self.ws_openlock.acquire()
168
            self.ws_thread.start()
169
170
        self.ws_openlock.acquire()
171
        self.ws_openlock.release()
172
173
        return self.status == "connected"
174
175
    def disconnect(self):
176
        if self.status == "connected":
177
            self.status = "disconnecting"
178
            with self.subscription_lock:
179
                self.subscriptions = {}
180
181
            self.ws.close()
182
            self.__on_close(self.ws)
183
184
    def __reconnect(self):
185
        """This is called when a connection is lost - it attempts to reconnect to the server"""
186
        self.status = "reconnecting"
187
188
        # Reset the disconnect time after 15 minutes
189
        if self.disconnected_time - self.connected_time > 15 * 60:
190
            self.reconnect_time = self.reconnect_time_starting_seconds
191
        else:
192
            self.reconnect_time *= self.reconnect_time_backoff_multiplier
193
194
        if self.reconnect_time > self.reconnect_time_max_seconds:
195
            self.reconnect_time = self.reconnect_time_max_seconds
196
197
        # We want to add some randomness to the reconnect rate - necessary so that we don't pound the server
198
        # if it goes down
199
        self.reconnect_time *= 1 + random.uniform(-0.2, 0.2)
200
201
        if self.reconnect_time < self.reconnect_time_starting_seconds:
202
            self.reconnect_time = self.reconnect_time_starting_seconds
203
204
        logging.warn("ConnectorDB:WS: Attempting to reconnect in %fs",
205
                     self.reconnect_time)
206
207
        self.reconnector = threading.Timer(self.reconnect_time,
208
                                           self.__reconnect_fnc)
209
        self.reconnector.daemon = True
210
        self.reconnector.start()
211
212
    def __reconnect_fnc(self):
213
        """This function is called by reconnect after the time delay"""
214
        if self.connect():
215
            self.__resubscribe()
216
        else:
217
            self.__reconnect()
218
219
    def __resubscribe(self):
220
        """Send subscribe command for all existing subscriptions. This allows to resume a connection
221
        that was closed"""
222
        with self.subscription_lock:
223
            for sub in self.subscriptions:
224
                logging.debug("Resubscribing to %s", sub)
225
                stream_transform = sub.split(":", 1)
226
                self.send({
227
                    "cmd": "subscribe",
228
                    "arg": stream_transform[0],
229
                    "transform": stream_transform[1]
230
                })
231
232
    def __on_open(self, ws):
233
        """Called when the websocket is opened"""
234
        logging.debug("ConnectorDB: Websocket opened")
235
236
        # Connection success - decrease the wait time for next connection
237
        self.reconnect_time /= self.reconnect_time_backoff_multiplier
238
239
        self.status = "connected"
240
241
        self.lastpingtime = time.time()
242
        self.__ensure_ping()
243
244
        self.connected_time = time.time()
245
246
        # Release the lock that connect called
247
        self.ws_openlock.release()
248
249
    def __on_close(self, ws):
250
        """Called when the websocket is closed"""
251
        if self.status == "disconnected":
252
            return  # This can be double-called on disconnect
253
        logging.debug("ConnectorDB:WS: Websocket closed")
254
255
        # Turn off the ping timer
256
        if self.pingtimer is not None:
257
            self.pingtimer.cancel()
258
259
        self.disconnected_time = time.time()
260
        if self.status == "disconnecting":
261
            self.status = "disconnected"
262
        elif self.status == "connected":
263
            self.__reconnect()
264
265
    def __on_error(self, ws, err):
266
        """Called when there is an error in the websocket"""
267
        logging.debug("ConnectorDB:WS: Connection Error")
268
269
        if self.status == "connecting":
270
            self.status = "errored"
271
            self.ws_openlock.release()  # Release the lock of connecting
272
273
    def __on_message(self, ws, msg):
274
        """This function is called whenever there is a message received from the server"""
275
        msg = json.loads(msg)
276
        logging.debug("ConnectorDB:WS: Msg '%s'", msg["stream"])
277
278
        # Build the subcription key
279
        stream_key = msg["stream"] + ":"
280
        if "transform" in msg:
281
            stream_key += msg["transform"]
282
283
        self.subscription_lock.acquire()
284
        if stream_key in self.subscriptions:
285
            subscription_function = self.subscriptions[stream_key]
286
            self.subscription_lock.release()
287
288
            fresult = subscription_function(msg["stream"], msg["data"])
289
290
            if fresult is True:
291
                # This is a special result - if the subscription function of a downlink returns True,
292
                # then the datapoint is acknowledged automatically (ie, reinserted in non-downlink stream)
293
                fresult = msg["data"]
294
295
            if fresult is not False and fresult is not None and msg["stream"].endswith(
296
                    "/downlink") and msg["stream"].count("/") == 3:
297
                # If the above conditions are true, it means that the datapoints were from a downlink,
298
                # and the subscriber function chooses to acknowledge them, so we reinsert them.
299
                self.insert(msg["stream"][:-9], fresult)
300
        else:
301
            self.subscription_lock.release()
302
            logging.warn(
303
                "ConnectorDB:WS: Msg '%s' not subscribed! Subscriptions: %s",
304
                msg["stream"], list(self.subscriptions.keys()))
305
306
    def __on_ping(self, ws, data):
307
        """The server periodically sends us websocket ping messages to keep the connection alive. To
308
        ensure that the connection to the server is still active, we memorize the most recent ping's time
309
        and we periodically ensure that a ping was received in __ensure_ping"""
310
        logging.debug("ConnectorDB:WS: ping")
311
        self.lastpingtime = time.time()
312
313
    def __ensure_ping(self):
314
        """Each time the server sends a ping message, we record the timestamp. If we haven't received a ping
315
        within the given interval, then we assume that the connection was lost, close the websocket and
316
        attempt to reconnect"""
317
318
        logging.debug("ConnectorDB:WS: pingcheck")
319
        if (time.time() - self.lastpingtime > self.connection_ping_timeout):
320
            logging.warn("ConnectorDB:WS: Websocket ping timed out!")
321
            if self.ws is not None:
322
                self.ws.close()
323
                self.__on_close(self.ws)
324
        else:
325
            # reset the ping timer
326
            self.pingtimer = threading.Timer(self.connection_ping_timeout,
327
                                             self.__ensure_ping)
328
            self.pingtimer.daemon = True
329
            self.pingtimer.start()
330
331
    def __del__(self):
332
        """Make sure that all threads shut down when needed"""
333
        self.disconnect()
334