GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Completed
Push — master ( 71a987...4ed013 )
by Daniel
01:03
created

TestConnectorDB.test_counting()   B

Complexity

Conditions 2

Size

Total Lines 40

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 2
dl 0
loc 40
rs 8.8571
1
from __future__ import absolute_import
2
3
import unittest
4
import time
5
import json
6
import logging
7
import connectordb
8
9
from jsonschema import SchemaError
10
11
# Allows debugging the websocket
12
import websocket
13
websocket.enableTrace(True)
14
15
TEST_URL = "http://localhost:8000"
16
17
18
class subscriber:
19
    # Special class that allows tests of subscriptions
20
    def __init__(self):
21
        self.reset()
22
        self.returnvalue = None
23
24
    def reset(self):
25
        self.msg = None
26
        self.stream = None
27
28
    def subscribe_callback(self, stream, datapoints):
29
        logging.info("Got message: %s:: %s", stream, json.dumps(datapoints))
30
        self.msg = datapoints
31
        self.stream = stream
32
33
        return self.returnvalue
34
35
36
class TestConnectorDB(unittest.TestCase):
37
    def setUp(self):
38
        self.db = connectordb.ConnectorDB("test", "test", url=TEST_URL)
39
        self.db.user.public = False
40
        self.usr = self.db("python_test")
41
        if self.usr.exists():
42
            self.usr.delete()
43
        self.usr.create("pythontest@localhost", "mypass")
44
        self.usrdb = connectordb.ConnectorDB("python_test",
45
                                             "mypass",
46
                                             url=TEST_URL)
47
48
    def tearDown(self):
49
        self.usrdb.close()
50
        try:
51
            self.usr.delete()
52
        except:
53
            pass
54
        try:
55
            self.db("myuser").delete()
56
        except:
57
            pass
58
        self.db.close()
59
60
    def test_authfail(self):
61
        try:
62
            connectordb.ConnectorDB("notauser", "badpass", url=TEST_URL)
63
        except connectordb.AuthenticationError:
64
            return
65
66
    def test_basics(self):
67
        self.assertEqual(self.db.path, "test/user")
68
        self.assertEqual(self.db.name, "user")
69
        self.assertEqual(self.db.user.name, "test")
70
        i = self.db.info()
71
        self.assertTrue(len(i["version"]) > 1)
72
        self.assertTrue(len(i["interpolators"]) > 1)
73
        self.assertTrue(len(i["transforms"]) > 1)
74
75
        self.assertTrue(len(self.db.users())> 1)
76
        self.assertRaises(connectordb.AuthenticationError,self.usrdb.users)
77
78
    def test_counting(self):
79
        db = self.db
80
        self.assertGreaterEqual(db.count_users(), 1)
81
        self.assertGreaterEqual(db.count_devices(), 1)
82
        self.assertGreaterEqual(db.count_streams(), 1)
83
84
        usr = db("python_counting_test")
85
        if usr.exists():
86
            usr.delete()
87
        self.assertFalse(usr.exists())
88
        curusers = db.count_users()
89
        usr.create("[email protected]", "mypass")
90
        curusers += 1
91
        self.assertEqual(curusers, db.count_users())
92
93
        self.assertRaises(connectordb.AuthenticationError,
94
                          self.usrdb.count_streams)
95
        self.assertRaises(connectordb.AuthenticationError,
96
                          self.usrdb.count_devices)
97
        self.assertRaises(connectordb.AuthenticationError,
98
                          self.usrdb.count_streams)
99
100
        curdevices = db.count_devices()
101
        curstreams = db.count_streams()
102
103
        self.usrdb["counting_stream"].create({"type": "string"})
104
        curstreams += 1
105
106
        self.assertEqual(curusers, db.count_users())
107
        self.assertEqual(curdevices, db.count_devices())
108
        self.assertEqual(curstreams, db.count_streams())
109
110
        self.usrdb.user["countdevice"].create()
111
        curdevices += 1
112
113
        self.assertEqual(curusers, db.count_users())
114
        self.assertEqual(curdevices, db.count_devices())
115
        self.assertEqual(curstreams, db.count_streams())
116
117
        usr.delete()
118
119
    def test_user(self):
120
        self.assertEqual(self.db.user.exists(), True)
121
        self.assertEqual(self.db(self.usrdb.user.name).exists(), True)
122
        self.assertEqual(self.usrdb.exists(), True)
123
        self.assertEqual(self.db("baduser").exists(), False)
124
        self.assertEqual(self.usrdb("test").exists(), False)
125
126
        self.assertEqual(self.db.user.role, "admin")
127
        self.assertEqual(self.usrdb.user.role, "user")
128
129
        self.assertEqual(self.usrdb.user.name, "python_test")
130
        self.usrdb.user.email = "testemail@change"
131
        self.assertEqual(self.usrdb.user.email, "testemail@change")
132
133
        self.usrdb.user.set_password("newpass")
134
        usrdb = connectordb.ConnectorDB("python_test", "newpass", url=TEST_URL)
135
        self.assertEqual(usrdb.path, "python_test/user")
136
137
        self.usr.set({"role": "admin"})
138
139
        usrdb.refresh()
140
        self.assertEqual(usrdb.role, "user")
141
        self.assertEqual(usrdb.user.role, "admin")
142
143
        usrdb.user.nickname = "myuser"
144
        self.assertEqual(self.db("python_test").nickname, "myuser")
145
146
        self.assertRaises(connectordb.AuthenticationError, self.usr.set,
147
                          {"admin": "Hello"})
148
149
    def test_device(self):
150
        db = self.usrdb
151
        self.assertEqual(len(db.user.devices()), 2)
152
        self.assertFalse(db.user["mydevice"].exists())
153
        db.user["mydevice"].create()
154
        self.assertTrue(db.user["mydevice"].exists())
155
        self.assertEqual(3, len(db.user.devices()))
156
157
        dev = connectordb.ConnectorDB(db.user["mydevice"].apikey, url=TEST_URL)
158
        self.assertRaises(connectordb.AuthenticationError, dev.user.devices)  # Device has no access to other devices
159
160
        dev.nickname = "test nickname"
161
        self.assertEqual(db("python_test/mydevice").nickname, "test nickname")
162
163
        apikey = dev.apikey
164
        newkey = dev.reset_apikey()
165
        self.assertFalse(apikey == newkey)
166
        self.assertEqual(dev.nickname, "test nickname")
167
168
        db.user["mydevice"].role = "reader"
169
        self.assertEqual(dev.user.name, "python_test")
170
171
    def test_stream(self):
172
        self.assertRaises(SchemaError, self.usrdb["mystream"].create,
173
                          {"type": "blah blah"})
174
        self.assertEqual(self.usrdb.role,"user")
175
        initialstreams = len(self.usrdb.streams())
176
        s = self.usrdb["mystream"]
177
        self.assertFalse(s.exists())
178
        s.create({"type": "string"},datatype="string.text")
179
        self.assertTrue(s.exists())
180
        self.assertEqual(len(self.usrdb.streams()), initialstreams + 1)
181
182
        self.assertEqual(s.user.name, "python_test")
183
        self.assertEqual(s.device.name, "user")
184
185
        self.assertEqual(s.ephemeral, False)
186
        self.assertEqual(s.downlink, False)
187
        self.assertEqual(s.datatype,"string.text")
188
        s.ephemeral = True
189
        self.assertEqual(s.ephemeral, True)
190
        self.assertEqual(s.downlink, False)
191
        s.downlink = True
192
        self.assertEqual(s.ephemeral, True)
193
        self.assertEqual(s.downlink, True)
194
        s.datatype="lol.lol"
195
        self.assertEqual(s.datatype,"lol.lol")
196
197
        s.delete()
198
        self.assertFalse(s.exists())
199
200
    def test_streamio(self):
201
        s = self.usrdb("python_test/user/mystream")
202
203
        s.create({"type": "string"})
204
        self.assertEqual(0, len(s))
205
206
        s.insert("Hello World!")
207
        self.assertEqual(1, len(s))
208
209
        self.assertEqual("Hello World!", s[0]["d"])
210
        self.assertEqual("Hello World!", s(0)[0]["d"])
211
212
        s.ephemeral = True
213
214
        s.insert("another hello!")
215
        s.insert("yet another hello!")
216
217
        self.assertEqual(1, len(s))
218
219
        s.ephemeral = False
220
221
        s.insert("1")
222
        time.sleep(0.1)
223
        s.insert_array([{"d": "2", "t": time.time() - 0.01}, {"d": "3"}])
224
225
        self.assertEqual("3", s[-1]["d"])
226
        self.assertEqual(3, len(s[1:]))
227
        self.assertEqual(4, len(s[:]))
228
229
        self.assertEqual(s.schema["type"], "string")
230
231
    def test_struct(self):
232
        # This test is specifically to make sure that structs are correctly sent back (this was a bug in connectordb)
233
        s = self.usrdb["mystream"]
234
235
        s.create({
236
            "type": "object",
237
            "properties": {
238
                "test": {"type": "string"},
239
                "t2": {"type": "number"}
240
            }
241
        })
242
243
        s.insert({"test": "hi!", "t2": -1337.1})
244
245
        v = s[-1]["d"]
246
        self.assertEqual(v["test"], "hi!")
247
        self.assertEqual(v["t2"], -1337.1)
248
249
    def test_call(self):
250
        s = self.usrdb["teststream"]
251
        s.create({"type": "number"})
252
253
        s.insert_array([{"d": 3}, {"d": 10}, {"d": 4}, {"d": 35}, {"d": 9}])
254
255
        dp = s(i1=0, i2=0, transform="if $>5 | $<20")
256
        self.assertEqual(3, len(dp))
257
        self.assertEqual(True, dp[0]["d"])
258
        self.assertEqual(False, dp[1]["d"])
259
        self.assertEqual(True, dp[2]["d"])
260
261
    def test_subscribe(self):
262
        s = self.usrdb["teststream"]
263
        s.create({"type": "number"})
264
265
        subs = subscriber()
266
        s.subscribe(subs.subscribe_callback)
267
268
        time.sleep(0.1)  # Give it some time to set up the subscription
269
270
        s.insert(1337)
271
        time.sleep(0.1)
272
273
        self.assertTrue(subs.msg[0]["d"] == 1337)
274
        s.unsubscribe()
275
276
        # Make sure unsubscribe worked
277
        subs.reset()
278
        s.insert(1338)
279
        time.sleep(0.1)
280
281
        self.assertTrue(subs.msg is None)
282
283
        subs2 = subscriber()
284
285
        s.subscribe(subs.subscribe_callback)
286
        s.subscribe(subs2.subscribe_callback, transform="if $ > 200")
287
288
        time.sleep(0.3)
289
290
        s.insert(100)
291
        time.sleep(0.1)
292
293
        self.assertTrue(subs.msg[0]["d"] == 100)
294
        self.assertTrue(subs2.msg is None)
295
296
        s.insert(3000)
297
        time.sleep(0.1)
298
299
        self.assertTrue(subs.msg[0]["d"] == 3000)
300
        self.assertTrue(subs2.msg[0]["d"] == 3000)
301
        subs2.reset()
302
        subs.reset()
303
304
        s.unsubscribe(transform="if $ > 200")
305
        time.sleep(0.1)
306
        s.insert(900)
307
        time.sleep(0.1)
308
309
        self.assertTrue(subs2.msg is None)
310
        self.assertTrue(subs.msg[0]["d"] == 900)
311
312
        s.ephemeral = True
313
        subs.reset()
314
        s.insert(101)
315
        time.sleep(0.1)
316
        self.assertTrue(subs.msg[0]["d"] == 101)
317
318
    def test_downlink(self):
319
        mydevice = self.usrdb.user["mydevice"]
320
321
        mydevice.create()
322
        s = mydevice["mystream"]
323
        mdconn = connectordb.ConnectorDB(mydevice.apikey, url=TEST_URL)
324
325
        mds = mdconn["mystream"]
326
        mds.create({"type": "string"})
327
        mds.downlink = True
328
329
        subs = subscriber()
330
        subs.returnvalue = True
331
        subs2 = subscriber()
332
333
        mds.subscribe(subs.subscribe_callback, downlink=True)
334
        s.subscribe(subs2.subscribe_callback)
335
336
        time.sleep(0.2)
337
        s.insert("hello!")
338
        time.sleep(0.2)
339
340
        self.assertTrue(subs.msg[0]["d"] == "hello!")
341
        self.assertTrue(subs2.msg[0]["d"] == "hello!")
342
343
        mds.unsubscribe(downlink=True)
344
        s.unsubscribe()
345
346
        self.assertTrue(1, len(s))
347
348
    def test_multicreate(self):
349
        mydevice = self.usrdb.user["mydevice"]
350
        mydevice.create(streams={
351
            "stream1": {
352
                "nickname": "My Train",
353
                "schema":"{\"type\":\"string\"}"
354
            }
355
        })
356
        self.assertTrue(mydevice.exists())
357
        self.assertTrue(mydevice["stream1"].exists())
358
        self.assertEqual(mydevice["stream1"].nickname,"My Train")
359
360
        self.db("myuser").create("my@email","mypass",description="choo choo",devices={
361
            "device1": {
362
                "streams":{
363
                    "stream1": {
364
                        "nickname": "My Train",
365
                        "schema":"{\"type\":\"string\"}"
366
                    }
367
                }
368
            }
369
        },streams={
370
            "mstream1": {
371
                "nickname": "My Train2",
372
                "schema":"{\"type\":\"string\"}"
373
            }
374
        })
375
376
        usr = self.db("myuser")
377
        self.assertTrue(usr.exists())
378
        self.assertTrue(usr["user"]["mstream1"].exists())
379
        self.assertTrue(usr["device1"].exists())
380
        self.assertTrue(usr["device1"]["stream1"].exists())
381
382
        usr.delete()
383
384
385
if __name__ == "__main__":
386
    unittest.main()
387