GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Completed
Push — master ( 3d6413...7ed725 )
by Daniel
01:08
created

Logger.addStream()   A

Complexity

Conditions 3

Size

Total Lines 16

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 3
c 1
b 0
f 0
dl 0
loc 16
rs 9.4285
1
from __future__ import absolute_import
2
3
import apsw  # The default python slite implementatino is not threadsafe, so we use apsw
4
import logging
5
import time
6
import threading
7
import os
8
9
import json
10
from jsonschema import validate
11
12
from ._connectordb import ConnectorDB, CONNECTORDB_URL
13
14
# The maximum number of datapoints to insert in one query
15
DATAPOINT_INSERT_LIMIT = 5000
16
17
class Logger(object):
18
    """Logger enables logging datapoints with periodic synchronization to a ConnectorDB database.
19
    the logged datapoints are cached in a sqlite database, as well as the necessary connection data,
20
    so that no data is lost, and settings don't need to be reloaded from the database after initial connection.
21
    """
22
23
    def __init__(self, database_file_path, on_create=None, apikey=None, onsync=None, onsyncfail=None, syncraise=False):
24
        """Logger is started by passing its database file, and an optional callback which is run if the database
25
        is not initialized, allowing setup code to be only run once.
26
        
27
        The on_create callback can optionally be used to initialize the necessary api keys and such.
28
        If on_create returns False or raises an error, the uninitialized database file will be removed."""
29
        self.database = apsw.Connection(database_file_path)
30
        c = self.database.cursor()
31
32
        # Create the tables which will make up the cache if they don't exist yet
33
        c.execute(
34
            "CREATE TABLE IF NOT EXISTS cache (stream TEXT, timestamp REAL, jsondata TEXT);")
35
        c.execute(
36
            "CREATE TABLE IF NOT EXISTS streams (stream TEXT PRIMARY KEY, jsonschema TEXT);")
37
        c.execute(
38
            "CREATE TABLE IF NOT EXISTS metadata (apikey TEXT, serverurl TEXT, lastsynctime REAL, syncperiod REAL, userdatajson TEXT);")
39
40
        # Now check if there is already metadata in the table, and if not, insert new metadata,
41
        # and run the on_create callback
42
        c.execute("SELECT COUNT(*) FROM metadata;")
43
        row_number = next(c)[0]
44
        if row_number == 0:
45
            logging.debug("Logger: Creating new database")
46
            # The default values are as follows:
47
            # apikey: '' (needs to be set by user)
48
            # serverurl: connectordb.com
49
            # lastsynctime: 0 (never syncd)
50
            # syncperiod: 600 (10 minutes)
51
            # user data: {} (empty dict)
52
            c.execute("INSERT INTO metadata VALUES ('',?,0,600,'{}')",
53
                      (CONNECTORDB_URL, ))
54
55
        # Load the database metadata into variables
56
        c.execute(
57
            "SELECT apikey,serverurl,lastsynctime,syncperiod FROM metadata;")
58
        self.__apikey, self.__serverurl, self.__lastsync, self.__syncperiod = next(c)
59
60
        # Load the streams that are being logged
61
        c.execute("SELECT * FROM streams;")
62
        self.streams = {}
63
        for row in c.fetchall():
64
            self.streams[row[0]] = json.loads(row[1])
65
66
        if apikey is not None:
67
            self.apikey = apikey
68
69
        self.synclock = threading.Lock()
70
        self.syncthread = None
71
        self.__cdb = None
72
73
        # Callbacks that are called for synchronization
74
        self.onsync = onsync
75
        self.onsyncfail = onsyncfail
76
77
        # Whether or not failed synchronization raises an error
78
        self.syncraise = syncraise
79
80
        # Run the create callback which is for the user to set up the necessary
81
        # values to ensure a connection - only if the database was just created
82
        if on_create is not None and row_number == 0:
83
            try:
84
                if False == on_create(self):
85
                    raise Exception("on_create returned False - logger is invalid")
86
            except:
87
                # If there was a failure to run on_create, delete the database file,
88
                # so that runing the program again will not use the invalid file.
89
                self.database.close()
90
                os.remove(database_file_path)
91
                raise
92
93
    @property
94
    def connectordb(self):
95
        """Returns the ConnectorDB object that the logger uses. Raises an error if Logger isn't able to connect"""
96
        if self.__cdb is None:
97
            logging.debug("Logger: Connecting to " + self.serverurl)
98
            self.__cdb = ConnectorDB(self.apikey, url=self.serverurl)
99
        return self.__cdb
100
101
    def ping(self):
102
        """Attempts to ping the currently connected ConnectorDB database. Returns an error if it fails to connect"""
103
        self.connectordb.ping()
104
105
    def cleardata(self):
106
        """Deletes all cached data without syncing it to the server"""
107
        c = self.database.cursor()
108
        c.execute("DELETE FROM cache;")
109
110
    def close(self):
111
        """Closes the database connections and stops all synchronization."""
112
        self.stop()
113
        with self.synclock:
114
            self.database.close()
115
116
    def addStream(self, streamname, schema=None):
117
        """Adds the given stream to the logger. Requires an active connection to the ConnectorDB database.
118
119
        If a schema is not specified, loads the stream from the database. If a schema is specified, and the stream
120
        does not exist, creates the stream."""
121
122
        stream = self.connectordb[streamname]
123
124
        if not stream.exists():
125
            if schema is not None:
126
                stream.create(schema)
127
            else:
128
                raise Exception(
129
                    "The stream '%s' was not found" % (streamname, ))
130
131
        self.addStream_force(streamname, stream.schema)
132
133
    def addStream_force(self, streamname, schema=None):
134
        """This function adds the given stream to the logger, but does not check with a ConnectorDB database
135
        to make sure that the stream exists. Use at your own risk."""
136
137
        c = self.database.cursor()
138
        c.execute("INSERT OR REPLACE INTO streams VALUES (?,?);",
139
                  (streamname, json.dumps(schema)))
140
141
        self.streams[streamname] = schema
142
143
    def insert(self, streamname, value):
144
        """Insert the datapoint into the logger for the given stream name. The logger caches the datapoint
145
        and eventually synchronizes it with ConnectorDB"""
146
        if streamname not in self.streams:
147
            raise Exception("The stream '%s' was not found" % (streamname, ))
148
149
        # Validate the schema
150
        validate(value, self.streams[streamname])
151
152
        # Insert the datapoint - it fits the schema
153
        value = json.dumps(value)
154
        logging.debug("Logger: %s <= %s" % (streamname, value))
155
        c = self.database.cursor()
156
        c.execute("INSERT INTO cache VALUES (?,?,?);",
157
                  (streamname, time.time(), value))
158
                  
159
    def insert_many(self,data_dict):
160
        """ Inserts data into the cache, if the data is a dict of the form {streamname: [{"t": timestamp,"d":data,...]}"""
161
        c = self.database.cursor()
162
        c.execute("BEGIN TRANSACTION;")
163
        try:
164
            for streamname in data_dict:
165
                if streamname not in self.streams:
166
                    raise Exception("The stream '%s' was not found" % (streamname, ))
167
                for dp in data_dict[streamname]:
168
                    validate(dp["d"], self.streams[streamname])
169
                    c.execute("INSERT INTO cache VALUES (?,?,?);",
170
                                (streamname, dp["t"], dp["d"]))
171
        except:
172
            c.execute("ROLLBACK;")
173
            raise
174
        c.exectute("COMMIT;")
175
        
176
177
    def sync(self):
178
        """Attempt to sync with the ConnectorDB server"""
179
        logging.debug("Logger: Syncing...")
180
        failed = False
181
        try:
182
            # Get the connectordb object
183
            cdb = self.connectordb
184
185
            # Ping the database - most connection errors will happen here
186
            cdb.ping()
187
188
            with self.synclock:
189
                c = self.database.cursor()
190
                for stream in self.streams:
191
                    s = cdb[stream]
192
193
                    c.execute(
194
                        "SELECT * FROM cache WHERE stream=? ORDER BY timestamp ASC;",
195
                        (stream, ))
196
                    datapointArray = []
197
                    for dp in c.fetchall():
198
                        datapointArray.append(
199
                            {"t": dp[1],
200
                             "d": json.loads(dp[2])})
201
                    if len(datapointArray) > 0:
202
                        logging.debug("%s: syncing %i datapoints" %
203
                                      (stream, len(datapointArray)))
204
205
                        while (len(datapointArray) > DATAPOINT_INSERT_LIMIT):
206
                            # We insert datapoints in chunks of a couple thousand so that they fit in the insert size limit of ConnectorDB
207
                            s.insert_array(datapointArray[:DATAPOINT_INSERT_LIMIT])
208
209
                            # Clear the written datapoints
210
                            datapointArray = datapointArray[DATAPOINT_INSERT_LIMIT:]
211
212
                            # If there was no error inserting, delete the datapoints from the cache
213
                            c.execute(
214
                                "DELETE FROM cache WHERE stream=? AND timestamp <?",
215
                                (stream, datapointArray[0]["t"]))
216
217
                        s.insert_array(datapointArray)
218
219
                        # If there was no error inserting, delete the datapoints from the cache
220
                        c.execute(
221
                            "DELETE FROM cache WHERE stream=? AND timestamp <=?",
222
                            (stream, datapointArray[-1]["t"]))
223
                self.lastsynctime = time.time()
224
225
                if self.onsync is not None:
226
                    self.onsync()
227
        except Exception as e:
228
            # Handle the sync failure callback
229
            falied = True
230
            reraise = self.syncraise
231
            if self.onsyncfail is not None:
232
                reraise = self.onsyncfail(e)
233
            if reraise:
234
                raise
235
236
    def __setsync(self):
237
        with self.synclock:
238
            logging.debug("Next sync attempt in " + str(self.syncperiod))
239
            if self.syncthread is not None:
240
                self.syncthread.cancel()
241
            self.syncthread = threading.Timer(self.syncperiod,
242
                                              self.__runsyncer)
243
            self.syncthread.daemon = True
244
            self.syncthread.start()
245
246
    def __runsyncer(self):
247
        try:
248
            self.sync()
249
        except Exception as e:
250
            logging.warn("ConnectorDB sync failed: " + str(e))
251
        self.__setsync()
252
253
    def start(self):
254
        """Start the logger background synchronization service. This allows you to not need to
255
        worry about syncing with ConnectorDB - you just insert into the Logger, and the Logger
256
        will by synced every syncperiod."""
257
258
        with self.synclock:
259
            if self.syncthread is not None:
260
                logging.warn(
261
                    "Logger: Start called on a syncer that is already running")
262
                return
263
264
        self.sync() # Attempt a sync right away
265
        self.__setsync() # Set up background sync
266
267
    def stop(self):
268
        """Stops the background synchronization thread"""
269
        with self.synclock:
270
            if self.syncthread is not None:
271
                self.syncthread.cancel()
272
                self.syncthread = None
273
274
    def __len__(self):
275
        """Returns the number of datapoints currently cached"""
276
        c = self.database.cursor()
277
        c.execute("SELECT COUNT() FROM cache;")
278
        return next(c)[0]
279
280
    def __contains__(self, streamname):
281
        """Returns whether the logger is caching the given stream name"""
282
        return streamname in self.streams
283
284
    @property
285
    def syncperiod(self):
286
        """Syncperiod is the time in seconds between attempting to synchronize with ConnectorDB.
287
        The Logger will gather all data in its sqlite database between sync periods, and every syncperiod
288
        seconds, it will attempt to connect to write the data to ConnectorDB."""
289
        return self.__syncperiod
290
291
    @syncperiod.setter
292
    def syncperiod(self, value):
293
        resync = False
294
        with self.synclock:
295
            self.__syncperiod = value
296
            resync = self.syncthread is not None
297
        c = self.database.cursor()
298
        c.execute("UPDATE metadata SET syncperiod=?", (value, ))
299
300
        if resync:
301
            self.__setsync()  # If we change the sync period during runtime, immediately update
302
303
    @property
304
    def lastsynctime(self):
305
        """The timestamp of the most recent successful synchronization with the server"""
306
        return self.__lastsync
307
308
    @lastsynctime.setter
309
    def lastsynctime(self, value):
310
        self.__lastsync = value
311
        c = self.database.cursor()
312
        c.execute("UPDATE metadata SET lastsynctime=?", (value, ))
313
314
    @property
315
    def apikey(self):
316
        """The API key used to connect to ConnectorDB. This needs to be set before the logger can do anything!
317
        The apikey only needs to be set once, since it is stored in the Logger database.
318
319
        Note that changing the api key is not supported during logger runtime (after start is called).
320
        Logger must be recreated for a changed apikey to come into effect."""
321
        return self.__apikey
322
323
    @apikey.setter
324
    def apikey(self, value):
325
        self.__apikey = value
326
        c = self.database.cursor()
327
        c.execute("UPDATE metadata SET apikey=?", (value, ))
328
329
    @property
330
    def serverurl(self):
331
        """The URL of the ConnectorDB server that Logger is using. By default this is connectordb.com, but can
332
        be set with this property. Note that the property will only take into effect before runtime"""
333
        return self.__serverurl
334
335
    @serverurl.setter
336
    def serverurl(self, value):
337
        self.__serverurl = value
338
        c = self.database.cursor()
339
        c.execute("UPDATE metadata SET serverurl=?", (value, ))
340
341
    @property
342
    def data(self):
343
        """The data property allows the user to save settings/data in the database, so that
344
        there does not need to be extra code messing around with settings.
345
346
        Use this property to save things that can be converted to JSON inside the logger database,
347
        so that you don't have to mess with configuration files or saving setting otherwise::
348
349
            from connectordb.logger import Logger
350
351
            l = Logger("log.db")
352
353
            l.data = {"hi": 56}
354
355
            # prints the data dictionary
356
            print l.data
357
        """
358
        c = self.database.cursor()
359
        c.execute("SELECT userdatajson FROM metadata;")
360
        return json.loads(next(c)[0])
361
362
    @data.setter
363
    def data(self, value):
364
        c = self.database.cursor()
365
        c.execute("UPDATE metadata SET userdatajson=?;", (json.dumps(value), ))
366
367
    @property
368
    def name(self):
369
        """Gets the name of the currently logged in device"""
370
        return self.connectordb.ping()
371