GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Completed
Push — master ( 700e8b...3b44da )
by Daniel
59s
created

Logger.sync()   F

Complexity

Conditions 13

Size

Total Lines 74

Duplication

Lines 0
Ratio 0 %

Importance

Changes 2
Bugs 0 Features 0
Metric Value
cc 13
c 2
b 0
f 0
dl 0
loc 74
rs 2.2665

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like Logger.sync() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
from __future__ import absolute_import
2
3
import apsw  # The default python slite implementatino is not threadsafe, so we use apsw
4
import logging
5
import time
6
import threading
7
import os
8
9
import json
10
from jsonschema import validate
11
12
from ._connectordb import ConnectorDB, CONNECTORDB_URL, DATAPOINT_INSERT_LIMIT
13
14
15
class Logger(object):
16
    """Logger enables logging datapoints with periodic synchronization to a ConnectorDB database.
17
    the logged datapoints are cached in a sqlite database, as well as the necessary connection data,
18
    so that no data is lost, and settings don't need to be reloaded from the database after initial connection.
19
    """
20
21
    def __init__(self, database_file_path, on_create=None, apikey=None, onsync=None, onsyncfail=None, syncraise=False):
22
        """Logger is started by passing its database file, and an optional callback which is run if the database
23
        is not initialized, allowing setup code to be only run once.
24
25
        The on_create callback can optionally be used to initialize the necessary api keys and such.
26
        If on_create returns False or raises an error, the uninitialized database file will be removed."""
27
        self.database = apsw.Connection(database_file_path)
28
        c = self.database.cursor()
29
30
        # Create the tables which will make up the cache if they don't exist
31
        # yet
32
        c.execute(
33
            "CREATE TABLE IF NOT EXISTS cache (stream TEXT, timestamp REAL, jsondata TEXT);")
34
        c.execute(
35
            "CREATE TABLE IF NOT EXISTS streams (stream TEXT PRIMARY KEY, jsonschema TEXT);")
36
        c.execute(
37
            "CREATE TABLE IF NOT EXISTS metadata (apikey TEXT, serverurl TEXT, lastsynctime REAL, syncperiod REAL, userdatajson TEXT);")
38
39
        # Now check if there is already metadata in the table, and if not, insert new metadata,
40
        # and run the on_create callback
41
        c.execute("SELECT COUNT(*) FROM metadata;")
42
        row_number = next(c)[0]
43
        if row_number == 0:
44
            logging.debug("Logger: Creating new database")
45
            # The default values are as follows:
46
            # apikey: '' (needs to be set by user)
47
            # serverurl: connectordb.com
48
            # lastsynctime: 0 (never syncd)
49
            # syncperiod: 600 (10 minutes)
50
            # user data: {} (empty dict)
51
            c.execute("INSERT INTO metadata VALUES ('',?,0,600,'{}')",
52
                      (CONNECTORDB_URL, ))
53
54
        # Load the database metadata into variables
55
        c.execute(
56
            "SELECT apikey,serverurl,lastsynctime,syncperiod FROM metadata;")
57
        self.__apikey, self.__serverurl, self.__lastsync, self.__syncperiod = next(
58
            c)
59
60
        # Load the streams that are being logged
61
        c.execute("SELECT * FROM streams;")
62
        self.streams = {}
63
        for row in c.fetchall():
64
            self.streams[row[0]] = json.loads(row[1])
65
66
        if apikey is not None:
67
            self.apikey = apikey
68
69
        self.synclock = threading.Lock()
70
        self.syncthread = None
71
        self.__cdb = None
72
73
        # Callbacks that are called for synchronization
74
        self.onsync = onsync
75
        self.onsyncfail = onsyncfail
76
77
        # Whether or not failed synchronization raises an error
78
        self.syncraise = syncraise
79
80
        # Run the create callback which is for the user to set up the necessary
81
        # values to ensure a connection - only if the database was just created
82
        if on_create is not None and row_number == 0:
83
            try:
84
                if False == on_create(self):
85
                    raise Exception(
86
                        "on_create returned False - logger is invalid")
87
            except:
88
                # If there was a failure to run on_create, delete the database file,
89
                # so that runing the program again will not use the invalid
90
                # file.
91
                self.database.close()
92
                os.remove(database_file_path)
93
                raise
94
95
    @property
96
    def connectordb(self):
97
        """Returns the ConnectorDB object that the logger uses. Raises an error if Logger isn't able to connect"""
98
        if self.__cdb is None:
99
            logging.debug("Logger: Connecting to " + self.serverurl)
100
            self.__cdb = ConnectorDB(self.apikey, url=self.serverurl)
101
        return self.__cdb
102
103
    def ping(self):
104
        """Attempts to ping the currently connected ConnectorDB database. Returns an error if it fails to connect"""
105
        self.connectordb.ping()
106
107
    def cleardata(self):
108
        """Deletes all cached data without syncing it to the server"""
109
        c = self.database.cursor()
110
        c.execute("DELETE FROM cache;")
111
112
    def close(self):
113
        """Closes the database connections and stops all synchronization."""
114
        self.stop()
115
        with self.synclock:
116
            self.database.close()
117
118
    def addStream(self, streamname, schema=None, **kwargs):
119
        """Adds the given stream to the logger. Requires an active connection to the ConnectorDB database.
120
121
        If a schema is not specified, loads the stream from the database. If a schema is specified, and the stream
122
        does not exist, creates the stream. You can also add stream properties such as description or nickname to be added
123
        during creation."""
124
125
        stream = self.connectordb[streamname]
126
127
        if not stream.exists():
128
            if schema is not None:
129
                stream.create(schema, **kwargs)
130
            else:
131
                raise Exception(
132
                    "The stream '%s' was not found" % (streamname, ))
133
134
        self.addStream_force(streamname, stream.schema)
135
136
    def addStream_force(self, streamname, schema=None):
137
        """This function adds the given stream to the logger, but does not check with a ConnectorDB database
138
        to make sure that the stream exists. Use at your own risk."""
139
140
        c = self.database.cursor()
141
        c.execute("INSERT OR REPLACE INTO streams VALUES (?,?);",
142
                  (streamname, json.dumps(schema)))
143
144
        self.streams[streamname] = schema
145
146
    def insert(self, streamname, value):
147
        """Insert the datapoint into the logger for the given stream name. The logger caches the datapoint
148
        and eventually synchronizes it with ConnectorDB"""
149
        if streamname not in self.streams:
150
            raise Exception("The stream '%s' was not found" % (streamname, ))
151
152
        # Validate the schema
153
        validate(value, self.streams[streamname])
154
155
        # Insert the datapoint - it fits the schema
156
        value = json.dumps(value)
157
        logging.debug("Logger: %s <= %s" % (streamname, value))
158
        c = self.database.cursor()
159
        c.execute("INSERT INTO cache VALUES (?,?,?);",
160
                  (streamname, time.time(), value))
161
162
    def insert_many(self, data_dict):
163
        """ Inserts data into the cache, if the data is a dict of the form {streamname: [{"t": timestamp,"d":data,...]}"""
164
        c = self.database.cursor()
165
        c.execute("BEGIN TRANSACTION;")
166
        try:
167
            for streamname in data_dict:
168
                if streamname not in self.streams:
169
                    raise Exception(
170
                        "The stream '%s' was not found" % (streamname, ))
171
                for dp in data_dict[streamname]:
172
                    validate(dp["d"], self.streams[streamname])
173
                    c.execute("INSERT INTO cache VALUES (?,?,?);",
174
                              (streamname, dp["t"], dp["d"]))
175
        except:
176
            c.execute("ROLLBACK;")
177
            raise
178
        c.exectute("COMMIT;")
179
180
    def sync(self):
181
        """Attempt to sync with the ConnectorDB server"""
182
        logging.debug("Logger: Syncing...")
183
        failed = False
184
        try:
185
            # Get the connectordb object
186
            cdb = self.connectordb
187
188
            # Ping the database - most connection errors will happen here
189
            cdb.ping()
190
191
            with self.synclock:
192
                c = self.database.cursor()
193
                for stream in self.streams:
194
                    s = cdb[stream]
195
196
                    c.execute(
197
                        "SELECT * FROM cache WHERE stream=? ORDER BY timestamp ASC;",
198
                        (stream, ))
199
                    datapointArray = []
200
                    for dp in c.fetchall():
201
                        datapointArray.append(
202
                            {"t": dp[1],
203
                             "d": json.loads(dp[2])})
204
205
                    # First, check if the data already inserted has newer timestamps,
206
                    # and in that case, assume that there was an error, and remove the datapoints
207
                    # with an older timestamp, so that we don't have an error when syncing
208
                    if len(s) > 0:
209
                        newtime = s[-1]["t"]
210
                        while (len(datapointArray) > 0 and datapointArray[0]["t"] < newtime):
211
                            logging.debug("Datapoint exists with older timestamp. Removing the datapoint.")
212
                            datapointArray = datapointArray[1:]
213
214
                    if len(datapointArray) > 0:
215
                        logging.debug("%s: syncing %i datapoints" %
216
                                      (stream, len(datapointArray)))
217
218
                        while (len(datapointArray) > DATAPOINT_INSERT_LIMIT):
219
                            # We insert datapoints in chunks of a couple
220
                            # thousand so that they fit in the insert size
221
                            # limit of ConnectorDB
222
                            s.insert_array(
223
                                datapointArray[:DATAPOINT_INSERT_LIMIT])
224
225
                            # Clear the written datapoints
226
                            datapointArray = datapointArray[
227
                                DATAPOINT_INSERT_LIMIT:]
228
229
                            # If there was no error inserting, delete the
230
                            # datapoints from the cache
231
                            c.execute(
232
                                "DELETE FROM cache WHERE stream=? AND timestamp <?",
233
                                (stream, datapointArray[0]["t"]))
234
235
                        s.insert_array(datapointArray)
236
237
                        # If there was no error inserting, delete the
238
                        # datapoints from the cache
239
                        c.execute(
240
                            "DELETE FROM cache WHERE stream=? AND timestamp <=?",
241
                            (stream, datapointArray[-1]["t"]))
242
                self.lastsynctime = time.time()
243
244
                if self.onsync is not None:
245
                    self.onsync()
246
        except Exception as e:
247
            # Handle the sync failure callback
248
            falied = True
249
            reraise = self.syncraise
250
            if self.onsyncfail is not None:
251
                reraise = self.onsyncfail(e)
252
            if reraise:
253
                raise
254
255
    def __setsync(self):
256
        with self.synclock:
257
            logging.debug("Next sync attempt in " + str(self.syncperiod))
258
            if self.syncthread is not None:
259
                self.syncthread.cancel()
260
            self.syncthread = threading.Timer(self.syncperiod,
261
                                              self.__runsyncer)
262
            self.syncthread.daemon = True
263
            self.syncthread.start()
264
265
    def __runsyncer(self):
266
        try:
267
            self.sync()
268
        except Exception as e:
269
            logging.warn("ConnectorDB sync failed: " + str(e))
270
        self.__setsync()
271
272
    def start(self):
273
        """Start the logger background synchronization service. This allows you to not need to
274
        worry about syncing with ConnectorDB - you just insert into the Logger, and the Logger
275
        will by synced every syncperiod."""
276
277
        with self.synclock:
278
            if self.syncthread is not None:
279
                logging.warn(
280
                    "Logger: Start called on a syncer that is already running")
281
                return
282
283
        self.sync()  # Attempt a sync right away
284
        self.__setsync()  # Set up background sync
285
286
    def stop(self):
287
        """Stops the background synchronization thread"""
288
        with self.synclock:
289
            if self.syncthread is not None:
290
                self.syncthread.cancel()
291
                self.syncthread = None
292
293
    def __len__(self):
294
        """Returns the number of datapoints currently cached"""
295
        c = self.database.cursor()
296
        c.execute("SELECT COUNT() FROM cache;")
297
        return next(c)[0]
298
299
    def __contains__(self, streamname):
300
        """Returns whether the logger is caching the given stream name"""
301
        return streamname in self.streams
302
303
    @property
304
    def syncperiod(self):
305
        """Syncperiod is the time in seconds between attempting to synchronize with ConnectorDB.
306
        The Logger will gather all data in its sqlite database between sync periods, and every syncperiod
307
        seconds, it will attempt to connect to write the data to ConnectorDB."""
308
        return self.__syncperiod
309
310
    @syncperiod.setter
311
    def syncperiod(self, value):
312
        resync = False
313
        with self.synclock:
314
            self.__syncperiod = value
315
            resync = self.syncthread is not None
316
        c = self.database.cursor()
317
        c.execute("UPDATE metadata SET syncperiod=?", (value, ))
318
319
        if resync:
320
            self.__setsync()  # If we change the sync period during runtime, immediately update
321
322
    @property
323
    def lastsynctime(self):
324
        """The timestamp of the most recent successful synchronization with the server"""
325
        return self.__lastsync
326
327
    @lastsynctime.setter
328
    def lastsynctime(self, value):
329
        self.__lastsync = value
330
        c = self.database.cursor()
331
        c.execute("UPDATE metadata SET lastsynctime=?", (value, ))
332
333
    @property
334
    def apikey(self):
335
        """The API key used to connect to ConnectorDB. This needs to be set before the logger can do anything!
336
        The apikey only needs to be set once, since it is stored in the Logger database.
337
338
        Note that changing the api key is not supported during logger runtime (after start is called).
339
        Logger must be recreated for a changed apikey to come into effect."""
340
        return self.__apikey
341
342
    @apikey.setter
343
    def apikey(self, value):
344
        self.__apikey = value
345
        c = self.database.cursor()
346
        c.execute("UPDATE metadata SET apikey=?", (value, ))
347
348
    @property
349
    def serverurl(self):
350
        """The URL of the ConnectorDB server that Logger is using. By default this is connectordb.com, but can
351
        be set with this property. Note that the property will only take into effect before runtime"""
352
        return self.__serverurl
353
354
    @serverurl.setter
355
    def serverurl(self, value):
356
        self.__serverurl = value
357
        c = self.database.cursor()
358
        c.execute("UPDATE metadata SET serverurl=?", (value, ))
359
360
    @property
361
    def data(self):
362
        """The data property allows the user to save settings/data in the database, so that
363
        there does not need to be extra code messing around with settings.
364
365
        Use this property to save things that can be converted to JSON inside the logger database,
366
        so that you don't have to mess with configuration files or saving setting otherwise::
367
368
            from connectordb.logger import Logger
369
370
            l = Logger("log.db")
371
372
            l.data = {"hi": 56}
373
374
            # prints the data dictionary
375
            print l.data
376
        """
377
        c = self.database.cursor()
378
        c.execute("SELECT userdatajson FROM metadata;")
379
        return json.loads(next(c)[0])
380
381
    @data.setter
382
    def data(self, value):
383
        c = self.database.cursor()
384
        c.execute("UPDATE metadata SET userdatajson=?;", (json.dumps(value), ))
385
386
    @property
387
    def name(self):
388
        """Gets the name of the currently logged in device"""
389
        return self.connectordb.ping()
390