GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Completed
Push — master ( 700e8b...3b44da )
by Daniel
59s
created

TestLogger.test_create()   A

Complexity

Conditions 2

Size

Total Lines 15

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 2
c 1
b 0
f 0
dl 0
loc 15
rs 9.4285
1
from __future__ import absolute_import
2
3
import unittest
4
import time
5
import os
6
7
import connectordb
8
from connectordb.logger import Logger
9
from connectordb import DATAPOINT_INSERT_LIMIT
10
11
TEST_URL = connectordb.CONNECTORDB_URL
12
13
14 View Code Duplication
class TestLogger(unittest.TestCase):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
15
16
    def setUp(self):
17
        self.db = connectordb.ConnectorDB("test", "test", url=TEST_URL)
18
        self.usr = self.db("logger_test")
19
        if self.usr.exists():
20
            self.usr.delete()
21
        self.usr.create("loggertest@localhost", "mypass")
22
        self.device = self.usr["mydevice"]
23
        self.device.create()
24
        self.apikey = self.device.apikey
25
26
        if os.path.exists("test.db"):
27
            os.remove("test.db")
28
29
    def tearDown(self):
30
        try:
31
            self.usr.delete()
32
        except:
33
            pass
34
35
    def test_inserting(self):
36
        s = self.device["mystream"]
37
38
        def test_create(l):
39
            l.apikey = self.apikey
40
            l.serverurl = TEST_URL
41
            l.data = "Hello World!!!"
42
            l.syncperiod = 3.3
43
44
            l.addStream("mystream", {"type": "string"}, nickname="My nickname")
45
46
            haderror = False
47
            try:
48
                l.addStream("stream_DNE")
49
            except:
50
                haderror = True
51
52
            self.assertTrue(haderror)
53
54
        self.assertFalse(s.exists())
55
        l = Logger("test.db", on_create=test_create)
56
        l.ping()
57
        self.assertTrue(s.exists())
58
        self.assertTrue(s.nickname == "My nickname")
59
60
        self.assertEqual("logger_test/mydevice", l.name)
61
        self.assertEqual(self.apikey, l.apikey)
62
        self.assertEqual(TEST_URL, l.serverurl)
63
64
        self.assertEqual(0, len(l))
65
66
        self.assertTrue("mystream" in l)
67
        self.assertFalse("stream_DNE" in l)
68
69
        l.insert("mystream", "Hello World!")
70
71
        self.assertEqual(1, len(l))
72
        self.assertEqual("Hello World!!!", l.data)
73
74
        l.close()
75
76
        def nocreate(self):
77
            raise Exception("OnCreate was called on existing database!")
78
79
        # Now reload from file and make sure everything was saved
80
        l = Logger("test.db", on_create=nocreate)
81
        self.assertEqual(1, len(l))
82
        self.assertEqual(l.name, "logger_test/mydevice")
83
        self.assertTrue("mystream" in l)
84
        self.assertTrue(self.apikey, l.apikey)
85
        self.assertTrue(TEST_URL, l.serverurl)
86
        self.assertTrue(3.3, l.syncperiod)
87
88
        haderror = False
89
        try:
90
            l.insert(5)  # Make sure that the schema is checked correctly
91
        except:
92
            haderror = True
93
        self.assertTrue(haderror)
94
95
        l.insert("mystream", "hi")
96
97
        self.assertEqual(2, len(l))
98
        self.assertEqual(0, len(s))
99
        l.sync()
100
        self.assertEqual(0, len(l))
101
        self.assertEqual(2, len(s))
102
103
        self.assertGreater(l.lastsynctime, time.time() - 1)
104
105
        self.assertEqual("Hello World!!!", l.data)
106
107
        self.assertEqual(s[0]["d"], "Hello World!")
108
        self.assertEqual(s[1]["d"], "hi")
109
        self.assertGreater(s[1]["t"], time.time() - 1)
110
111
        l.close()
112
113
    def test_bgsync(self):
114
        s = self.device["mystream"]
115
116
        # This time we test existing stream
117
        s.create({"type": "string"})
118
119
        l = Logger("test.db")
120
        l.serverurl = TEST_URL
121
        l.apikey = self.apikey
122
123
        l.addStream("mystream")
124
125
        l.syncperiod = 1
126
127
        self.assertEqual(0, len(s))
128
        self.assertEqual(0, len(l))
129
130
        l.start()
131
        l.insert("mystream", "hi")
132
        l.insert("mystream", "hello")
133
        self.assertEqual(0, len(s))
134
        self.assertEqual(2, len(l))
135
        time.sleep(1.1)
136
        self.assertEqual(2, len(s))
137
        self.assertEqual(0, len(l))
138
        l.insert("mystream", "har")
139
        self.assertEqual(2, len(s))
140
        self.assertEqual(1, len(l))
141
        time.sleep(1.1)
142
        self.assertEqual(3, len(s))
143
        self.assertEqual(0, len(l))
144
        l.stop()
145
146
        l.insert("mystream", "stopped")
147
        time.sleep(1.3)
148
        self.assertEqual(3, len(s))
149
        self.assertEqual(1, len(l))
150
151
        l.close()
152
153
    def test_overflow(self):
154
        global DATAPOINT_INSERT_LIMIT
155
        dil = DATAPOINT_INSERT_LIMIT
156
        DATAPOINT_INSERT_LIMIT = 2
157
        s = self.device["mystream"]
158
159
        # This time we test existing stream
160
        s.create({"type": "string"})
161
162
        l = Logger("test.db")
163
        l.serverurl = TEST_URL
164
        l.apikey = self.apikey
165
166
        l.addStream("mystream")
167
168
        l.insert("mystream", "test1")
169
        l.insert("mystream", "test2")
170
        l.insert("mystream", "test3")
171
172
        l.sync()
173
174
        self.assertEqual(3, len(s))
175
        self.assertEqual(0, len(l))
176
177
        DATAPOINT_INSERT_LIMIT = dil
178
179
    def test_clear(self):
180
        s = self.device["mystream"]
181
182
        # This time we test existing stream
183
        s.create({"type": "string"})
184
185
        l = Logger("test.db")
186
        l.serverurl = TEST_URL
187
        l.apikey = self.apikey
188
189
        l.addStream("mystream")
190
191
        l.insert("mystream", "test1")
192
        l.insert("mystream", "test2")
193
194
        l.cleardata()
195
        self.assertEqual(len(l), 0)
196
197
    def test_overwrite(self):
198
        l = Logger("test.db")
199
        l.serverurl = TEST_URL
200
        l.apikey = self.apikey
201
202
        l.addStream("mystream",{"type": "string"})
203
204
        s = l.connectordb["mystream"]
205
206
        l.insert("mystream","will_be_overwritten")
207
        l.insert("mystream","will_be_overwritten2")
208
        s.insert("ONO - it exists!")
209
        l.insert("mystream","hi!")
210
211
        l.sync()
212
213
        self.assertEqual(2, len(s))
214
215
        l.insert("mystream","will_be_overwritten")
216
        l.insert("mystream","will_be_overwritten2")
217
        s.insert("ONO - it exists!")
218
219
        l.sync()
220
221
        self.assertEqual(3, len(s))
222
223
224
if __name__ == "__main__":
225
    unittest.main()
226