GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Completed
Push — master ( d5d9b4...ed39b3 )
by Daniel
59s
created

TestConnectorDB.tearDown()   A

Complexity

Conditions 4

Size

Total Lines 16

Duplication

Lines 0
Ratio 0 %

Importance

Changes 3
Bugs 0 Features 0
Metric Value
cc 4
c 3
b 0
f 0
dl 0
loc 16
rs 9.2
1
from __future__ import absolute_import
2
3
import unittest
4
import time
5
import json
6
import logging
7
import connectordb
8
import shutil
9
import os
10
11
from jsonschema import SchemaError
12
13
# Allows debugging the websocket
14
#import websocket
15
# websocket.enableTrace(True)
16
17
TEST_URL = "http://localhost:8000"
18
19
20
class subscriber:
21
    # Special class that allows tests of subscriptions
22
23
    def __init__(self):
24
        self.reset()
25
        self.returnvalue = None
26
27
    def reset(self):
28
        self.msg = None
29
        self.stream = None
30
        self.callnumber = 0
31
32
    def subscribe_callback(self, stream, datapoints):
33
        logging.info("Got message: %s:: %s", stream, json.dumps(datapoints))
34
        self.msg = datapoints
35
        self.stream = stream
36
        self.callnumber += 1
37
38
        return self.returnvalue
39
40
41
class TestConnectorDB(unittest.TestCase):
42
43
    def setUp(self):
44
        self.db = connectordb.ConnectorDB("test", "test", url=TEST_URL)
45
        self.db.user.public = False
46
        self.usr = self.db("python_test")
47
        if self.usr.exists():
48
            self.usr.delete()
49
        self.usr.create("pythontest@localhost", "mypass")
50
        self.usrdb = connectordb.ConnectorDB("python_test",
51
                                             "mypass",
52
                                             url=TEST_URL)
53
54
    def tearDown(self):
55
        self.usrdb.close()
56
        try:
57
            self.usr.delete()
58
        except:
59
            pass
60
        try:
61
            self.db("myuser").delete()
62
        except:
63
            pass
64
        try:
65
            self.db("pyexport").delete()
66
        except:
67
            pass
68
69
        self.db.close()
70
71
    def test_authfail(self):
72
        try:
73
            connectordb.ConnectorDB("notauser", "badpass", url=TEST_URL)
74
        except connectordb.AuthenticationError:
75
            return
76
77
    def test_basics(self):
78
        self.assertEqual(self.db.path, "test/user")
79
        self.assertEqual(self.db.name, "user")
80
        self.assertEqual(self.db.user.name, "test")
81
        i = self.db.info()
82
        self.assertTrue(len(i["version"]) > 1)
83
        self.assertTrue(len(i["interpolators"]) > 1)
84
        self.assertTrue(len(i["transforms"]) > 1)
85
86
        self.assertTrue(len(self.db.users()) > 1)
87
        self.assertRaises(connectordb.AuthenticationError, self.usrdb.users)
88
89
    def test_counting(self):
90
        db = self.db
91
        self.assertGreaterEqual(db.count_users(), 1)
92
        self.assertGreaterEqual(db.count_devices(), 1)
93
        self.assertGreaterEqual(db.count_streams(), 1)
94
95
        usr = db("python_counting_test")
96
        if usr.exists():
97
            usr.delete()
98
        self.assertFalse(usr.exists())
99
        curusers = db.count_users()
100
        usr.create("[email protected]", "mypass")
101
        curusers += 1
102
        self.assertEqual(curusers, db.count_users())
103
104
        self.assertRaises(connectordb.AuthenticationError,
105
                          self.usrdb.count_streams)
106
        self.assertRaises(connectordb.AuthenticationError,
107
                          self.usrdb.count_devices)
108
        self.assertRaises(connectordb.AuthenticationError,
109
                          self.usrdb.count_streams)
110
111
        curdevices = db.count_devices()
112
        curstreams = db.count_streams()
113
114
        self.usrdb["counting_stream"].create({"type": "string"})
115
        curstreams += 1
116
117
        self.assertEqual(curusers, db.count_users())
118
        self.assertEqual(curdevices, db.count_devices())
119
        self.assertEqual(curstreams, db.count_streams())
120
121
        self.usrdb.user["countdevice"].create()
122
        curdevices += 1
123
124
        self.assertEqual(curusers, db.count_users())
125
        self.assertEqual(curdevices, db.count_devices())
126
        self.assertEqual(curstreams, db.count_streams())
127
128
        usr.delete()
129
130
    def test_user(self):
131
        self.assertEqual(self.db.user.exists(), True)
132
        self.assertEqual(self.db(self.usrdb.user.name).exists(), True)
133
        self.assertEqual(self.usrdb.exists(), True)
134
        self.assertEqual(self.db("baduser").exists(), False)
135
        self.assertEqual(self.usrdb("test").exists(), False)
136
137
        self.assertEqual(self.db.user.role, "admin")
138
        self.assertEqual(self.usrdb.user.role, "user")
139
140
        self.assertEqual(self.usrdb.user.name, "python_test")
141
        self.usrdb.user.email = "testemail@change"
142
        self.assertEqual(self.usrdb.user.email, "testemail@change")
143
        self.usrdb.user.icon = "material:person"
144
        self.assertEqual(self.usrdb.user.icon, "material:person")
145
146
        self.usrdb.user.set_password("newpass")
147
        usrdb = connectordb.ConnectorDB("python_test", "newpass", url=TEST_URL)
148
        self.assertEqual(usrdb.path, "python_test/user")
149
150
        self.usr.set({"role": "admin"})
151
152
        usrdb.refresh()
153
        self.assertEqual(usrdb.role, "user")
154
        self.assertEqual(usrdb.user.role, "admin")
155
156
        usrdb.user.nickname = "myuser"
157
        self.assertEqual(self.db("python_test").nickname, "myuser")
158
159
        usrdb.user.description = "Hello World!"
160
        self.assertEqual(self.db("python_test").description, "Hello World!")
161
162
        self.assertRaises(connectordb.AuthenticationError, self.usr.set,
163
                          {"admin": "Hello"})
164
165
    def test_userstreams(self):
166
        db = self.usrdb
167
168
        dev = db.user["mydevice"]
169
        dev.create()
170
171
        dev["mystream"].create(downlink=True)
172
        dev["mystream2"].create()
173
174
        self.assertEqual(db.user.streams(downlink=True)[0].path,
175
                         dev["mystream"].path)
176
177
    def test_device(self):
178
        db = self.usrdb
179
        self.assertEqual(len(db.user.devices()), 2)
180
        self.assertFalse(db.user["mydevice"].exists())
181
        db.user["mydevice"].create()
182
        self.assertTrue(db.user["mydevice"].exists())
183
        self.assertEqual(3, len(db.user.devices()))
184
185
        dev = connectordb.ConnectorDB(db.user["mydevice"].apikey, url=TEST_URL)
186
        # Device has no access to other devices
187
        self.assertRaises(connectordb.AuthenticationError, dev.user.devices)
188
189
        dev.nickname = "test nickname"
190
        self.assertEqual(db("python_test/mydevice").nickname, "test nickname")
191
        self.assertEqual(dev.enabled, True)
192
        dev.enabled = False
193
        self.assertEqual(db("python_test/mydevice").enabled, False)
194
195
        apikey = dev.apikey
196
        newkey = dev.reset_apikey()
197
        self.assertFalse(apikey == newkey)
198
        self.assertEqual(dev.nickname, "test nickname")
199
200
        db.user["mydevice"].role = "reader"
201
        self.assertEqual(dev.user.name, "python_test")
202
203
    def test_stream(self):
204
        self.assertRaises(SchemaError, self.usrdb["mystream"].create,
205
                          {"type": "blah blah"})
206
        self.assertEqual(self.usrdb.role, "user")
207
        initialstreams = len(self.usrdb.streams())
208
        s = self.usrdb["mystream"]
209
        self.assertFalse(s.exists())
210
        s.create(datatype="string.text")
211
        self.assertTrue(s.exists())
212
        self.assertEqual(len(self.usrdb.streams()), initialstreams + 1)
213
214
        self.assertEqual(s.user.name, "python_test")
215
        self.assertEqual(s.device.name, "user")
216
217
        self.assertEqual(s.ephemeral, False)
218
        self.assertEqual(s.downlink, False)
219
        self.assertEqual(s.datatype, "string.text")
220
        s.ephemeral = True
221
        self.assertEqual(s.ephemeral, True)
222
        self.assertEqual(s.downlink, False)
223
        s.downlink = True
224
        self.assertEqual(s.ephemeral, True)
225
        self.assertEqual(s.downlink, True)
226
        s.datatype = "lol.lol"
227
        self.assertEqual(s.datatype, "lol.lol")
228
229
        self.assertEqual(s.sschema, '{}')
230
        s.schema = '{"type": "string"}'
231
        self.assertEqual(s.sschema, '{"type":"string"}')
232
233
        s.delete()
234
        self.assertFalse(s.exists())
235
236
    def test_streamio(self):
237
        s = self.usrdb("python_test/user/mystream")
238
239
        s.create({"type": "string"})
240
        self.assertEqual(0, len(s))
241
242
        s.insert("Hello World!")
243
        self.assertEqual(1, len(s))
244
245
        self.assertEqual("Hello World!", s[0]["d"])
246
        self.assertEqual("Hello World!", s(0)[0]["d"])
247
248
        s.ephemeral = True
249
250
        s.insert("another hello!")
251
        s.insert("yet another hello!")
252
253
        self.assertEqual(1, len(s))
254
255
        s.ephemeral = False
256
257
        s.insert("1")
258
        time.sleep(0.1)
259
        s.insert_array([{"d": "2", "t": time.time() - 0.01}, {"d": "3"}])
260
261
        self.assertEqual("3", s[-1]["d"])
262
        self.assertEqual(3, len(s[1:]))
263
        self.assertEqual(4, len(s[:]))
264
265
        self.assertEqual(s.schema["type"], "string")
266
267
    def test_struct(self):
268
        # This test is specifically to make sure that structs are correctly
269
        # sent back (this was a bug in connectordb)
270
        s = self.usrdb["mystream"]
271
272
        s.create({
273
            "type": "object",
274
            "properties": {
275
                "test": {"type": "string"},
276
                "t2": {"type": "number"}
277
            }
278
        })
279
280
        s.insert({"test": "hi!", "t2": -1337.1})
281
282
        v = s[-1]["d"]
283
        self.assertEqual(v["test"], "hi!")
284
        self.assertEqual(v["t2"], -1337.1)
285
286
    def test_call(self):
287
        s = self.usrdb["teststream"]
288
        s.create({"type": "number"})
289
290
        s.insert_array([{"d": 3}, {"d": 10}, {"d": 4}, {"d": 35}, {"d": 9}])
291
292
        dp = s(i1=0, i2=0, transform="if $>5 | $<20")
293
        self.assertEqual(3, len(dp))
294
        self.assertEqual(True, dp[0]["d"])
295
        self.assertEqual(False, dp[1]["d"])
296
        self.assertEqual(True, dp[2]["d"])
297
298
    def test_subscribe(self):
299
        s = self.usrdb["teststream"]
300
        s.create({"type": "number"})
301
302
        subs = subscriber()
303
        s.subscribe(subs.subscribe_callback)
304
305
        time.sleep(0.1)  # Give it some time to set up the subscription
306
307
        s.insert(1337)
308
        time.sleep(0.1)
309
310
        self.assertTrue(subs.msg[0]["d"] == 1337)
311
        s.unsubscribe()
312
313
        # Make sure unsubscribe worked
314
        subs.reset()
315
        s.insert(1338)
316
        time.sleep(0.1)
317
318
        self.assertTrue(subs.msg is None)
319
320
        subs2 = subscriber()
321
322
        s.subscribe(subs.subscribe_callback)
323
        s.subscribe(subs2.subscribe_callback, transform="if $ > 200")
324
325
        time.sleep(0.3)
326
327
        s.insert(100)
328
        time.sleep(0.1)
329
330
        self.assertTrue(subs.msg[0]["d"] == 100)
331
        self.assertTrue(subs.callnumber == 1)
332
        self.assertTrue(subs2.msg is None)
333
        self.assertTrue(subs2.callnumber == 0)
334
335
        s.insert(3000)
336
        time.sleep(0.1)
337
338
        self.assertTrue(subs.msg[0]["d"] == 3000)
339
        self.assertTrue(subs.callnumber == 2)
340
        self.assertTrue(subs2.msg[0]["d"] == 3000)
341
        self.assertTrue(subs2.callnumber == 1)
342
        subs2.reset()
343
        subs.reset()
344
345
        s.unsubscribe(transform="if $ > 200")
346
        time.sleep(0.1)
347
        s.insert(900)
348
        time.sleep(0.1)
349
350
        self.assertTrue(subs2.msg is None)
351
        self.assertTrue(subs.msg[0]["d"] == 900)
352
353
        s.ephemeral = True
354
        subs.reset()
355
        s.insert(101)
356
        time.sleep(0.1)
357
        self.assertTrue(subs.msg[0]["d"] == 101)
358
359
    def test_downlink(self):
360
        mydevice = self.usrdb.user["mydevice"]
361
362
        mydevice.create()
363
        s = mydevice["mystream"]
364
        mdconn = connectordb.ConnectorDB(mydevice.apikey, url=TEST_URL)
365
366
        mds = mdconn["mystream"]
367
        mds.create({"type": "string"})
368
369
        with self.assertRaises(connectordb.AuthenticationError):
370
            s.insert("devices can not write streams they don't own")
371
372
        # Unless it is a downlink
373
        mds.downlink = True
374
375
        s.insert("hi!")
376
        self.assertTrue(0 == len(s))
377
        self.assertTrue(1 == s.length(downlink=True))
378
379
        self.assertTrue(s(downlink=True)[0]["d"] == "hi!")
380
381
        subs = subscriber()
382
        subs.returnvalue = True
383
        subs2 = subscriber()
384
385
        mds.subscribe(subs.subscribe_callback, downlink=True)
386
        s.subscribe(subs2.subscribe_callback)
387
388
        time.sleep(0.2)
389
        s.append("hello!")
390
        time.sleep(0.2)
391
392
        self.assertTrue(subs.msg[0]["d"] == "hello!")
393
        self.assertTrue(subs2.msg[0]["d"] == "hello!")
394
395
        mds.unsubscribe(downlink=True)
396
        s.unsubscribe()
397
398
        self.assertTrue(1, len(s))
399
        self.assertTrue(2, s.length(True))
400
401
    def test_multicreate(self):
402
        mydevice = self.usrdb.user["mydevice"]
403
        mydevice.create(streams={
404
            "stream1": {
405
                "nickname": "My Train",
406
                "schema": "{\"type\":\"string\"}"
407
            }
408
        })
409
        self.assertTrue(mydevice.exists())
410
        self.assertTrue(mydevice["stream1"].exists())
411
        self.assertEqual(mydevice["stream1"].nickname, "My Train")
412
413
        self.db("myuser").create("my@email", "mypass", description="choo choo", devices={
414
            "device1": {
415
                "streams": {
416
                    "stream1": {
417
                        "nickname": "My Train",
418
                        "schema": "{\"type\":\"string\"}"
419
                    }
420
                }
421
            }
422
        }, streams={
423
            "mstream1": {
424
                "nickname": "My Train2",
425
                "schema": "{\"type\":\"string\"}"
426
            }
427
        })
428
429
        usr = self.db("myuser")
430
        self.assertTrue(usr.exists())
431
        self.assertTrue(usr["user"]["mstream1"].exists())
432
        self.assertTrue(usr["device1"].exists())
433
        self.assertTrue(usr["device1"]["stream1"].exists())
434
435
        usr.delete()
436
437
    def test_importexport(self):
438
        if os.path.exists("pyexport"):
439
            shutil.rmtree("pyexport")
440
        u = self.db("pyexport")
441
        u.create("pyexport@test", "usr", devices={
442
            "mydevice": {
443
                "streams": {
444
                    "mystream": {
445
                        "schema": "{\"type\":\"string\"}",
446
                        "downlink": True
447
                    }
448
                }
449
            }
450
        })
451
452
        mdconn = connectordb.ConnectorDB(u["mydevice"].apikey, url=TEST_URL)
453
        mdconn("pyexport/mydevice/mystream").insert("Hello World!")
454
        self.db("pyexport/mydevice/mystream").insert("in da downlink")
455
456
        # Export the user
457
        u.export("pyexport")
458
459
        # Delete the User
460
        u.delete()
461
462
        self.assertFalse(u.exists())
463
464
        # Import the user!
465
        self.db.import_users("pyexport")
466
467
        self.assertTrue(u.exists())
468
469
        # Make sure the stream data was created correctly
470
        self.assertEqual(self.db("pyexport/mydevice/mystream")
471
                         [0]["d"], "Hello World!")
472
        self.assertEqual(self.db("pyexport/mydevice/mystream")(i1=0, i2=1, downlink=True)
473
                         [0]["d"], "in da downlink")
474
475
476
if __name__ == "__main__":
477
    unittest.main()
478