GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Completed
Push — master ( 7ed725...c09028 )
by Daniel
01:06
created

TestConnectorDB.test_device()   B

Complexity

Conditions 1

Size

Total Lines 24

Duplication

Lines 0
Ratio 0 %

Importance

Changes 4
Bugs 0 Features 0
Metric Value
cc 1
dl 0
loc 24
rs 8.9713
c 4
b 0
f 0
1
from __future__ import absolute_import
2
3
import unittest
4
import time
5
import json
6
import logging
7
import connectordb
8
9
from jsonschema import SchemaError
10
11
# Allows debugging the websocket
12
import websocket
13
websocket.enableTrace(True)
14
15
TEST_URL = "http://localhost:8000"
16
17
18
class subscriber:
19
    # Special class that allows tests of subscriptions
20
    def __init__(self):
21
        self.reset()
22
        self.returnvalue = None
23
24
    def reset(self):
25
        self.msg = None
26
        self.stream = None
27
        self.callnumber = 0
28
29
    def subscribe_callback(self, stream, datapoints):
30
        logging.info("Got message: %s:: %s", stream, json.dumps(datapoints))
31
        self.msg = datapoints
32
        self.stream = stream
33
        self.callnumber +=1
34
35
        return self.returnvalue
36
37
38
class TestConnectorDB(unittest.TestCase):
39
    def setUp(self):
40
        self.db = connectordb.ConnectorDB("test", "test", url=TEST_URL)
41
        self.db.user.public = False
42
        self.usr = self.db("python_test")
43
        if self.usr.exists():
44
            self.usr.delete()
45
        self.usr.create("pythontest@localhost", "mypass")
46
        self.usrdb = connectordb.ConnectorDB("python_test",
47
                                             "mypass",
48
                                             url=TEST_URL)
49
50
    def tearDown(self):
51
        self.usrdb.close()
52
        try:
53
            self.usr.delete()
54
        except:
55
            pass
56
        try:
57
            self.db("myuser").delete()
58
        except:
59
            pass
60
        self.db.close()
61
62
    def test_authfail(self):
63
        try:
64
            connectordb.ConnectorDB("notauser", "badpass", url=TEST_URL)
65
        except connectordb.AuthenticationError:
66
            return
67
68
    def test_basics(self):
69
        self.assertEqual(self.db.path, "test/user")
70
        self.assertEqual(self.db.name, "user")
71
        self.assertEqual(self.db.user.name, "test")
72
        i = self.db.info()
73
        self.assertTrue(len(i["version"]) > 1)
74
        self.assertTrue(len(i["interpolators"]) > 1)
75
        self.assertTrue(len(i["transforms"]) > 1)
76
77
        self.assertTrue(len(self.db.users())> 1)
78
        self.assertRaises(connectordb.AuthenticationError,self.usrdb.users)
79
80
    def test_counting(self):
81
        db = self.db
82
        self.assertGreaterEqual(db.count_users(), 1)
83
        self.assertGreaterEqual(db.count_devices(), 1)
84
        self.assertGreaterEqual(db.count_streams(), 1)
85
86
        usr = db("python_counting_test")
87
        if usr.exists():
88
            usr.delete()
89
        self.assertFalse(usr.exists())
90
        curusers = db.count_users()
91
        usr.create("[email protected]", "mypass")
92
        curusers += 1
93
        self.assertEqual(curusers, db.count_users())
94
95
        self.assertRaises(connectordb.AuthenticationError,
96
                          self.usrdb.count_streams)
97
        self.assertRaises(connectordb.AuthenticationError,
98
                          self.usrdb.count_devices)
99
        self.assertRaises(connectordb.AuthenticationError,
100
                          self.usrdb.count_streams)
101
102
        curdevices = db.count_devices()
103
        curstreams = db.count_streams()
104
105
        self.usrdb["counting_stream"].create({"type": "string"})
106
        curstreams += 1
107
108
        self.assertEqual(curusers, db.count_users())
109
        self.assertEqual(curdevices, db.count_devices())
110
        self.assertEqual(curstreams, db.count_streams())
111
112
        self.usrdb.user["countdevice"].create()
113
        curdevices += 1
114
115
        self.assertEqual(curusers, db.count_users())
116
        self.assertEqual(curdevices, db.count_devices())
117
        self.assertEqual(curstreams, db.count_streams())
118
119
        usr.delete()
120
121
    def test_user(self):
122
        self.assertEqual(self.db.user.exists(), True)
123
        self.assertEqual(self.db(self.usrdb.user.name).exists(), True)
124
        self.assertEqual(self.usrdb.exists(), True)
125
        self.assertEqual(self.db("baduser").exists(), False)
126
        self.assertEqual(self.usrdb("test").exists(), False)
127
128
        self.assertEqual(self.db.user.role, "admin")
129
        self.assertEqual(self.usrdb.user.role, "user")
130
131
        self.assertEqual(self.usrdb.user.name, "python_test")
132
        self.usrdb.user.email = "testemail@change"
133
        self.assertEqual(self.usrdb.user.email, "testemail@change")
134
135
        self.usrdb.user.set_password("newpass")
136
        usrdb = connectordb.ConnectorDB("python_test", "newpass", url=TEST_URL)
137
        self.assertEqual(usrdb.path, "python_test/user")
138
139
        self.usr.set({"role": "admin"})
140
141
        usrdb.refresh()
142
        self.assertEqual(usrdb.role, "user")
143
        self.assertEqual(usrdb.user.role, "admin")
144
145
        usrdb.user.nickname = "myuser"
146
        self.assertEqual(self.db("python_test").nickname, "myuser")
147
        
148
        usrdb.user.description = "Hello World!"
149
        self.assertEqual(self.db("python_test").description, "Hello World!")
150
151
        self.assertRaises(connectordb.AuthenticationError, self.usr.set,
152
                          {"admin": "Hello"})
153
154
    def test_device(self):
155
        db = self.usrdb
156
        self.assertEqual(len(db.user.devices()), 2)
157
        self.assertFalse(db.user["mydevice"].exists())
158
        db.user["mydevice"].create()
159
        self.assertTrue(db.user["mydevice"].exists())
160
        self.assertEqual(3, len(db.user.devices()))
161
162
        dev = connectordb.ConnectorDB(db.user["mydevice"].apikey, url=TEST_URL)
163
        self.assertRaises(connectordb.AuthenticationError, dev.user.devices)  # Device has no access to other devices
164
165
        dev.nickname = "test nickname"
166
        self.assertEqual(db("python_test/mydevice").nickname, "test nickname")
167
        self.assertEqual(dev.enabled, False)
168
        dev.enabled = True
169
        self.assertEqual(db("python_test/mydevice").enabled, True)
170
171
        apikey = dev.apikey
172
        newkey = dev.reset_apikey()
173
        self.assertFalse(apikey == newkey)
174
        self.assertEqual(dev.nickname, "test nickname")
175
176
        db.user["mydevice"].role = "reader"
177
        self.assertEqual(dev.user.name, "python_test")
178
179
    def test_stream(self):
180
        self.assertRaises(SchemaError, self.usrdb["mystream"].create,
181
                          {"type": "blah blah"})
182
        self.assertEqual(self.usrdb.role,"user")
183
        initialstreams = len(self.usrdb.streams())
184
        s = self.usrdb["mystream"]
185
        self.assertFalse(s.exists())
186
        s.create({"type": "string"},datatype="string.text")
187
        self.assertTrue(s.exists())
188
        self.assertEqual(len(self.usrdb.streams()), initialstreams + 1)
189
190
        self.assertEqual(s.user.name, "python_test")
191
        self.assertEqual(s.device.name, "user")
192
193
        self.assertEqual(s.ephemeral, False)
194
        self.assertEqual(s.downlink, False)
195
        self.assertEqual(s.datatype,"string.text")
196
        s.ephemeral = True
197
        self.assertEqual(s.ephemeral, True)
198
        self.assertEqual(s.downlink, False)
199
        s.downlink = True
200
        self.assertEqual(s.ephemeral, True)
201
        self.assertEqual(s.downlink, True)
202
        s.datatype="lol.lol"
203
        self.assertEqual(s.datatype,"lol.lol")
204
205
        s.delete()
206
        self.assertFalse(s.exists())
207
208
    def test_streamio(self):
209
        s = self.usrdb("python_test/user/mystream")
210
211
        s.create({"type": "string"})
212
        self.assertEqual(0, len(s))
213
214
        s.insert("Hello World!")
215
        self.assertEqual(1, len(s))
216
217
        self.assertEqual("Hello World!", s[0]["d"])
218
        self.assertEqual("Hello World!", s(0)[0]["d"])
219
220
        s.ephemeral = True
221
222
        s.insert("another hello!")
223
        s.insert("yet another hello!")
224
225
        self.assertEqual(1, len(s))
226
227
        s.ephemeral = False
228
229
        s.insert("1")
230
        time.sleep(0.1)
231
        s.insert_array([{"d": "2", "t": time.time() - 0.01}, {"d": "3"}])
232
233
        self.assertEqual("3", s[-1]["d"])
234
        self.assertEqual(3, len(s[1:]))
235
        self.assertEqual(4, len(s[:]))
236
237
        self.assertEqual(s.schema["type"], "string")
238
239
    def test_struct(self):
240
        # This test is specifically to make sure that structs are correctly sent back (this was a bug in connectordb)
241
        s = self.usrdb["mystream"]
242
243
        s.create({
244
            "type": "object",
245
            "properties": {
246
                "test": {"type": "string"},
247
                "t2": {"type": "number"}
248
            }
249
        })
250
251
        s.insert({"test": "hi!", "t2": -1337.1})
252
253
        v = s[-1]["d"]
254
        self.assertEqual(v["test"], "hi!")
255
        self.assertEqual(v["t2"], -1337.1)
256
257
    def test_call(self):
258
        s = self.usrdb["teststream"]
259
        s.create({"type": "number"})
260
261
        s.insert_array([{"d": 3}, {"d": 10}, {"d": 4}, {"d": 35}, {"d": 9}])
262
263
        dp = s(i1=0, i2=0, transform="if $>5 | $<20")
264
        self.assertEqual(3, len(dp))
265
        self.assertEqual(True, dp[0]["d"])
266
        self.assertEqual(False, dp[1]["d"])
267
        self.assertEqual(True, dp[2]["d"])
268
269
    def test_subscribe(self):
270
        s = self.usrdb["teststream"]
271
        s.create({"type": "number"})
272
273
        subs = subscriber()
274
        s.subscribe(subs.subscribe_callback)
275
276
        time.sleep(0.1)  # Give it some time to set up the subscription
277
278
        s.insert(1337)
279
        time.sleep(0.1)
280
281
        self.assertTrue(subs.msg[0]["d"] == 1337)
282
        s.unsubscribe()
283
284
        # Make sure unsubscribe worked
285
        subs.reset()
286
        s.insert(1338)
287
        time.sleep(0.1)
288
289
        self.assertTrue(subs.msg is None)
290
291
        subs2 = subscriber()
292
293
        s.subscribe(subs.subscribe_callback)
294
        s.subscribe(subs2.subscribe_callback, transform="if $ > 200")
295
296
        time.sleep(0.3)
297
298
        s.insert(100)
299
        time.sleep(0.1)
300
301
        self.assertTrue(subs.msg[0]["d"] == 100)
302
        self.assertTrue(subs.callnumber == 1)
303
        self.assertTrue(subs2.msg is None)
304
        self.assertTrue(subs2.callnumber == 0)
305
306
        s.insert(3000)
307
        time.sleep(0.1)
308
309
        self.assertTrue(subs.msg[0]["d"] == 3000)
310
        self.assertTrue(subs.callnumber == 2)
311
        self.assertTrue(subs2.msg[0]["d"] == 3000)
312
        self.assertTrue(subs2.callnumber == 1)
313
        subs2.reset()
314
        subs.reset()
315
316
        s.unsubscribe(transform="if $ > 200")
317
        time.sleep(0.1)
318
        s.insert(900)
319
        time.sleep(0.1)
320
321
        self.assertTrue(subs2.msg is None)
322
        self.assertTrue(subs.msg[0]["d"] == 900)
323
324
        s.ephemeral = True
325
        subs.reset()
326
        s.insert(101)
327
        time.sleep(0.1)
328
        self.assertTrue(subs.msg[0]["d"] == 101)
329
330
    def test_downlink(self):
331
        mydevice = self.usrdb.user["mydevice"]
332
333
        mydevice.create()
334
        s = mydevice["mystream"]
335
        mdconn = connectordb.ConnectorDB(mydevice.apikey, url=TEST_URL)
336
337
        mds = mdconn["mystream"]
338
        mds.create({"type": "string"})
339
        
340
        with self.assertRaises(connectordb.AuthenticationError):
341
            s.insert("devices can not write streams they don't own")
342
        
343
        # Unless it is a downlink
344
        mds.downlink = True
345
        
346
        s.insert("hi!")
347
        self.assertTrue(0== len(s))
348
        self.assertTrue(1== s.length(downlink=True))
349
        
350
        self.assertTrue(s(downlink=True)[0]["d"]== "hi!")
351
352
        subs = subscriber()
353
        subs.returnvalue = True
354
        subs2 = subscriber()
355
356
        mds.subscribe(subs.subscribe_callback, downlink=True)
357
        s.subscribe(subs2.subscribe_callback)
358
359
        time.sleep(0.2)
360
        s.append("hello!")
361
        time.sleep(0.2)
362
363
        self.assertTrue(subs.msg[0]["d"] == "hello!")
364
        self.assertTrue(subs2.msg[0]["d"] == "hello!")
365
366
        mds.unsubscribe(downlink=True)
367
        s.unsubscribe()
368
369
        self.assertTrue(1, len(s))
370
        self.assertTrue(2, s.length(True))
371
372
    def test_multicreate(self):
373
        mydevice = self.usrdb.user["mydevice"]
374
        mydevice.create(streams={
375
            "stream1": {
376
                "nickname": "My Train",
377
                "schema":"{\"type\":\"string\"}"
378
            }
379
        })
380
        self.assertTrue(mydevice.exists())
381
        self.assertTrue(mydevice["stream1"].exists())
382
        self.assertEqual(mydevice["stream1"].nickname,"My Train")
383
384
        self.db("myuser").create("my@email","mypass",description="choo choo",devices={
385
            "device1": {
386
                "streams":{
387
                    "stream1": {
388
                        "nickname": "My Train",
389
                        "schema":"{\"type\":\"string\"}"
390
                    }
391
                }
392
            }
393
        },streams={
394
            "mstream1": {
395
                "nickname": "My Train2",
396
                "schema":"{\"type\":\"string\"}"
397
            }
398
        })
399
400
        usr = self.db("myuser")
401
        self.assertTrue(usr.exists())
402
        self.assertTrue(usr["user"]["mstream1"].exists())
403
        self.assertTrue(usr["device1"].exists())
404
        self.assertTrue(usr["device1"]["stream1"].exists())
405
406
        usr.delete()
407
408
409
if __name__ == "__main__":
410
    unittest.main()
411