GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Completed
Push — master ( 40fbfa...d5d9b4 )
by Daniel
51s
created

TestConnectorDB.test_importexport()   B

Complexity

Conditions 2

Size

Total Lines 37

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 2
c 0
b 0
f 0
dl 0
loc 37
rs 8.8571
1
from __future__ import absolute_import
2
3
import unittest
4
import time
5
import json
6
import logging
7
import connectordb
8
import shutil
9
import os
10
11
from jsonschema import SchemaError
12
13
# Allows debugging the websocket
14
#import websocket
15
# websocket.enableTrace(True)
16
17
TEST_URL = "http://localhost:8000"
18
19
20
class subscriber:
21
    # Special class that allows tests of subscriptions
22
23
    def __init__(self):
24
        self.reset()
25
        self.returnvalue = None
26
27
    def reset(self):
28
        self.msg = None
29
        self.stream = None
30
        self.callnumber = 0
31
32
    def subscribe_callback(self, stream, datapoints):
33
        logging.info("Got message: %s:: %s", stream, json.dumps(datapoints))
34
        self.msg = datapoints
35
        self.stream = stream
36
        self.callnumber += 1
37
38
        return self.returnvalue
39
40
41
class TestConnectorDB(unittest.TestCase):
42
43
    def setUp(self):
44
        self.db = connectordb.ConnectorDB("test", "test", url=TEST_URL)
45
        self.db.user.public = False
46
        self.usr = self.db("python_test")
47
        if self.usr.exists():
48
            self.usr.delete()
49
        self.usr.create("pythontest@localhost", "mypass")
50
        self.usrdb = connectordb.ConnectorDB("python_test",
51
                                             "mypass",
52
                                             url=TEST_URL)
53
54
    def tearDown(self):
55
        self.usrdb.close()
56
        try:
57
            self.usr.delete()
58
        except:
59
            pass
60
        try:
61
            self.db("myuser").delete()
62
        except:
63
            pass
64
        try:
65
            self.db("pyexport").delete()
66
        except:
67
            pass
68
69
        self.db.close()
70
71
    def test_authfail(self):
72
        try:
73
            connectordb.ConnectorDB("notauser", "badpass", url=TEST_URL)
74
        except connectordb.AuthenticationError:
75
            return
76
77
    def test_basics(self):
78
        self.assertEqual(self.db.path, "test/user")
79
        self.assertEqual(self.db.name, "user")
80
        self.assertEqual(self.db.user.name, "test")
81
        i = self.db.info()
82
        self.assertTrue(len(i["version"]) > 1)
83
        self.assertTrue(len(i["interpolators"]) > 1)
84
        self.assertTrue(len(i["transforms"]) > 1)
85
86
        self.assertTrue(len(self.db.users()) > 1)
87
        self.assertRaises(connectordb.AuthenticationError, self.usrdb.users)
88
89
    def test_counting(self):
90
        db = self.db
91
        self.assertGreaterEqual(db.count_users(), 1)
92
        self.assertGreaterEqual(db.count_devices(), 1)
93
        self.assertGreaterEqual(db.count_streams(), 1)
94
95
        usr = db("python_counting_test")
96
        if usr.exists():
97
            usr.delete()
98
        self.assertFalse(usr.exists())
99
        curusers = db.count_users()
100
        usr.create("[email protected]", "mypass")
101
        curusers += 1
102
        self.assertEqual(curusers, db.count_users())
103
104
        self.assertRaises(connectordb.AuthenticationError,
105
                          self.usrdb.count_streams)
106
        self.assertRaises(connectordb.AuthenticationError,
107
                          self.usrdb.count_devices)
108
        self.assertRaises(connectordb.AuthenticationError,
109
                          self.usrdb.count_streams)
110
111
        curdevices = db.count_devices()
112
        curstreams = db.count_streams()
113
114
        self.usrdb["counting_stream"].create({"type": "string"})
115
        curstreams += 1
116
117
        self.assertEqual(curusers, db.count_users())
118
        self.assertEqual(curdevices, db.count_devices())
119
        self.assertEqual(curstreams, db.count_streams())
120
121
        self.usrdb.user["countdevice"].create()
122
        curdevices += 1
123
124
        self.assertEqual(curusers, db.count_users())
125
        self.assertEqual(curdevices, db.count_devices())
126
        self.assertEqual(curstreams, db.count_streams())
127
128
        usr.delete()
129
130
    def test_user(self):
131
        self.assertEqual(self.db.user.exists(), True)
132
        self.assertEqual(self.db(self.usrdb.user.name).exists(), True)
133
        self.assertEqual(self.usrdb.exists(), True)
134
        self.assertEqual(self.db("baduser").exists(), False)
135
        self.assertEqual(self.usrdb("test").exists(), False)
136
137
        self.assertEqual(self.db.user.role, "admin")
138
        self.assertEqual(self.usrdb.user.role, "user")
139
140
        self.assertEqual(self.usrdb.user.name, "python_test")
141
        self.usrdb.user.email = "testemail@change"
142
        self.assertEqual(self.usrdb.user.email, "testemail@change")
143
        self.usrdb.user.icon = "material:person"
144
        self.assertEqual(self.usrdb.user.icon, "material:person")
145
146
        self.usrdb.user.set_password("newpass")
147
        usrdb = connectordb.ConnectorDB("python_test", "newpass", url=TEST_URL)
148
        self.assertEqual(usrdb.path, "python_test/user")
149
150
        self.usr.set({"role": "admin"})
151
152
        usrdb.refresh()
153
        self.assertEqual(usrdb.role, "user")
154
        self.assertEqual(usrdb.user.role, "admin")
155
156
        usrdb.user.nickname = "myuser"
157
        self.assertEqual(self.db("python_test").nickname, "myuser")
158
159
        usrdb.user.description = "Hello World!"
160
        self.assertEqual(self.db("python_test").description, "Hello World!")
161
162
        self.assertRaises(connectordb.AuthenticationError, self.usr.set,
163
                          {"admin": "Hello"})
164
165
    def test_device(self):
166
        db = self.usrdb
167
        self.assertEqual(len(db.user.devices()), 2)
168
        self.assertFalse(db.user["mydevice"].exists())
169
        db.user["mydevice"].create()
170
        self.assertTrue(db.user["mydevice"].exists())
171
        self.assertEqual(3, len(db.user.devices()))
172
173
        dev = connectordb.ConnectorDB(db.user["mydevice"].apikey, url=TEST_URL)
174
        # Device has no access to other devices
175
        self.assertRaises(connectordb.AuthenticationError, dev.user.devices)
176
177
        dev.nickname = "test nickname"
178
        self.assertEqual(db("python_test/mydevice").nickname, "test nickname")
179
        self.assertEqual(dev.enabled, True)
180
        dev.enabled = False
181
        self.assertEqual(db("python_test/mydevice").enabled, False)
182
183
        apikey = dev.apikey
184
        newkey = dev.reset_apikey()
185
        self.assertFalse(apikey == newkey)
186
        self.assertEqual(dev.nickname, "test nickname")
187
188
        db.user["mydevice"].role = "reader"
189
        self.assertEqual(dev.user.name, "python_test")
190
191
    def test_stream(self):
192
        self.assertRaises(SchemaError, self.usrdb["mystream"].create,
193
                          {"type": "blah blah"})
194
        self.assertEqual(self.usrdb.role, "user")
195
        initialstreams = len(self.usrdb.streams())
196
        s = self.usrdb["mystream"]
197
        self.assertFalse(s.exists())
198
        s.create(datatype="string.text")
199
        self.assertTrue(s.exists())
200
        self.assertEqual(len(self.usrdb.streams()), initialstreams + 1)
201
202
        self.assertEqual(s.user.name, "python_test")
203
        self.assertEqual(s.device.name, "user")
204
205
        self.assertEqual(s.ephemeral, False)
206
        self.assertEqual(s.downlink, False)
207
        self.assertEqual(s.datatype, "string.text")
208
        s.ephemeral = True
209
        self.assertEqual(s.ephemeral, True)
210
        self.assertEqual(s.downlink, False)
211
        s.downlink = True
212
        self.assertEqual(s.ephemeral, True)
213
        self.assertEqual(s.downlink, True)
214
        s.datatype = "lol.lol"
215
        self.assertEqual(s.datatype, "lol.lol")
216
217
        self.assertEqual(s.sschema, '{}')
218
        s.schema = '{"type": "string"}'
219
        self.assertEqual(s.sschema, '{"type":"string"}')
220
221
        s.delete()
222
        self.assertFalse(s.exists())
223
224
    def test_streamio(self):
225
        s = self.usrdb("python_test/user/mystream")
226
227
        s.create({"type": "string"})
228
        self.assertEqual(0, len(s))
229
230
        s.insert("Hello World!")
231
        self.assertEqual(1, len(s))
232
233
        self.assertEqual("Hello World!", s[0]["d"])
234
        self.assertEqual("Hello World!", s(0)[0]["d"])
235
236
        s.ephemeral = True
237
238
        s.insert("another hello!")
239
        s.insert("yet another hello!")
240
241
        self.assertEqual(1, len(s))
242
243
        s.ephemeral = False
244
245
        s.insert("1")
246
        time.sleep(0.1)
247
        s.insert_array([{"d": "2", "t": time.time() - 0.01}, {"d": "3"}])
248
249
        self.assertEqual("3", s[-1]["d"])
250
        self.assertEqual(3, len(s[1:]))
251
        self.assertEqual(4, len(s[:]))
252
253
        self.assertEqual(s.schema["type"], "string")
254
255
    def test_struct(self):
256
        # This test is specifically to make sure that structs are correctly
257
        # sent back (this was a bug in connectordb)
258
        s = self.usrdb["mystream"]
259
260
        s.create({
261
            "type": "object",
262
            "properties": {
263
                "test": {"type": "string"},
264
                "t2": {"type": "number"}
265
            }
266
        })
267
268
        s.insert({"test": "hi!", "t2": -1337.1})
269
270
        v = s[-1]["d"]
271
        self.assertEqual(v["test"], "hi!")
272
        self.assertEqual(v["t2"], -1337.1)
273
274
    def test_call(self):
275
        s = self.usrdb["teststream"]
276
        s.create({"type": "number"})
277
278
        s.insert_array([{"d": 3}, {"d": 10}, {"d": 4}, {"d": 35}, {"d": 9}])
279
280
        dp = s(i1=0, i2=0, transform="if $>5 | $<20")
281
        self.assertEqual(3, len(dp))
282
        self.assertEqual(True, dp[0]["d"])
283
        self.assertEqual(False, dp[1]["d"])
284
        self.assertEqual(True, dp[2]["d"])
285
286
    def test_subscribe(self):
287
        s = self.usrdb["teststream"]
288
        s.create({"type": "number"})
289
290
        subs = subscriber()
291
        s.subscribe(subs.subscribe_callback)
292
293
        time.sleep(0.1)  # Give it some time to set up the subscription
294
295
        s.insert(1337)
296
        time.sleep(0.1)
297
298
        self.assertTrue(subs.msg[0]["d"] == 1337)
299
        s.unsubscribe()
300
301
        # Make sure unsubscribe worked
302
        subs.reset()
303
        s.insert(1338)
304
        time.sleep(0.1)
305
306
        self.assertTrue(subs.msg is None)
307
308
        subs2 = subscriber()
309
310
        s.subscribe(subs.subscribe_callback)
311
        s.subscribe(subs2.subscribe_callback, transform="if $ > 200")
312
313
        time.sleep(0.3)
314
315
        s.insert(100)
316
        time.sleep(0.1)
317
318
        self.assertTrue(subs.msg[0]["d"] == 100)
319
        self.assertTrue(subs.callnumber == 1)
320
        self.assertTrue(subs2.msg is None)
321
        self.assertTrue(subs2.callnumber == 0)
322
323
        s.insert(3000)
324
        time.sleep(0.1)
325
326
        self.assertTrue(subs.msg[0]["d"] == 3000)
327
        self.assertTrue(subs.callnumber == 2)
328
        self.assertTrue(subs2.msg[0]["d"] == 3000)
329
        self.assertTrue(subs2.callnumber == 1)
330
        subs2.reset()
331
        subs.reset()
332
333
        s.unsubscribe(transform="if $ > 200")
334
        time.sleep(0.1)
335
        s.insert(900)
336
        time.sleep(0.1)
337
338
        self.assertTrue(subs2.msg is None)
339
        self.assertTrue(subs.msg[0]["d"] == 900)
340
341
        s.ephemeral = True
342
        subs.reset()
343
        s.insert(101)
344
        time.sleep(0.1)
345
        self.assertTrue(subs.msg[0]["d"] == 101)
346
347
    def test_downlink(self):
348
        mydevice = self.usrdb.user["mydevice"]
349
350
        mydevice.create()
351
        s = mydevice["mystream"]
352
        mdconn = connectordb.ConnectorDB(mydevice.apikey, url=TEST_URL)
353
354
        mds = mdconn["mystream"]
355
        mds.create({"type": "string"})
356
357
        with self.assertRaises(connectordb.AuthenticationError):
358
            s.insert("devices can not write streams they don't own")
359
360
        # Unless it is a downlink
361
        mds.downlink = True
362
363
        s.insert("hi!")
364
        self.assertTrue(0 == len(s))
365
        self.assertTrue(1 == s.length(downlink=True))
366
367
        self.assertTrue(s(downlink=True)[0]["d"] == "hi!")
368
369
        subs = subscriber()
370
        subs.returnvalue = True
371
        subs2 = subscriber()
372
373
        mds.subscribe(subs.subscribe_callback, downlink=True)
374
        s.subscribe(subs2.subscribe_callback)
375
376
        time.sleep(0.2)
377
        s.append("hello!")
378
        time.sleep(0.2)
379
380
        self.assertTrue(subs.msg[0]["d"] == "hello!")
381
        self.assertTrue(subs2.msg[0]["d"] == "hello!")
382
383
        mds.unsubscribe(downlink=True)
384
        s.unsubscribe()
385
386
        self.assertTrue(1, len(s))
387
        self.assertTrue(2, s.length(True))
388
389
    def test_multicreate(self):
390
        mydevice = self.usrdb.user["mydevice"]
391
        mydevice.create(streams={
392
            "stream1": {
393
                "nickname": "My Train",
394
                "schema": "{\"type\":\"string\"}"
395
            }
396
        })
397
        self.assertTrue(mydevice.exists())
398
        self.assertTrue(mydevice["stream1"].exists())
399
        self.assertEqual(mydevice["stream1"].nickname, "My Train")
400
401
        self.db("myuser").create("my@email", "mypass", description="choo choo", devices={
402
            "device1": {
403
                "streams": {
404
                    "stream1": {
405
                        "nickname": "My Train",
406
                        "schema": "{\"type\":\"string\"}"
407
                    }
408
                }
409
            }
410
        }, streams={
411
            "mstream1": {
412
                "nickname": "My Train2",
413
                "schema": "{\"type\":\"string\"}"
414
            }
415
        })
416
417
        usr = self.db("myuser")
418
        self.assertTrue(usr.exists())
419
        self.assertTrue(usr["user"]["mstream1"].exists())
420
        self.assertTrue(usr["device1"].exists())
421
        self.assertTrue(usr["device1"]["stream1"].exists())
422
423
        usr.delete()
424
425
    def test_importexport(self):
426
        if os.path.exists("pyexport"):
427
            shutil.rmtree("pyexport")
428
        u = self.db("pyexport")
429
        u.create("pyexport@test", "usr", devices={
430
            "mydevice": {
431
                "streams": {
432
                    "mystream": {
433
                        "schema": "{\"type\":\"string\"}",
434
                        "downlink": True
435
                    }
436
                }
437
            }
438
        })
439
440
        mdconn = connectordb.ConnectorDB(u["mydevice"].apikey, url=TEST_URL)
441
        mdconn("pyexport/mydevice/mystream").insert("Hello World!")
442
        self.db("pyexport/mydevice/mystream").insert("in da downlink")
443
444
        # Export the user
445
        u.export("pyexport")
446
447
        # Delete the User
448
        u.delete()
449
450
        self.assertFalse(u.exists())
451
452
        # Import the user!
453
        self.db.import_users("pyexport")
454
455
        self.assertTrue(u.exists())
456
457
        # Make sure the stream data was created correctly
458
        self.assertEqual(self.db("pyexport/mydevice/mystream")
459
                         [0]["d"], "Hello World!")
460
        self.assertEqual(self.db("pyexport/mydevice/mystream")(i1=0, i2=1, downlink=True)
461
                         [0]["d"], "in da downlink")
462
463
464
if __name__ == "__main__":
465
    unittest.main()
466