|
1
|
|
|
from __future__ import absolute_import |
|
2
|
|
|
import json |
|
3
|
|
|
import os |
|
4
|
|
|
|
|
5
|
|
|
from ._connectorobject import ConnectorObject |
|
6
|
|
|
from ._datapointarray import DatapointArray |
|
7
|
|
|
|
|
8
|
|
|
from jsonschema import Draft4Validator |
|
9
|
|
|
import json |
|
10
|
|
|
import time |
|
11
|
|
|
|
|
12
|
|
|
# https://github.com/oxplot/fysom/issues/1 |
|
13
|
|
|
try: |
|
14
|
|
|
unicode = unicode |
|
15
|
|
|
except NameError: |
|
16
|
|
|
basestring = (str, bytes) |
|
17
|
|
|
|
|
18
|
|
|
DATAPOINT_INSERT_LIMIT = 5000 |
|
19
|
|
|
|
|
20
|
|
|
|
|
21
|
|
|
def query_maker(t1=None, t2=None, limit=None, i1=None, i2=None, transform=None, downlink=False): |
|
22
|
|
|
"""query_maker takes the optional arguments and constructs a json query for a stream's |
|
23
|
|
|
datapoints using it:: |
|
24
|
|
|
#{"t1": 5, "transform": "if $ > 5"} |
|
25
|
|
|
print query_maker(t1=5,transform="if $ > 5") |
|
26
|
|
|
""" |
|
27
|
|
|
params = {} |
|
28
|
|
|
if t1 is not None: |
|
29
|
|
|
params["t1"] = t1 |
|
30
|
|
|
if t2 is not None: |
|
31
|
|
|
params["t2"] = t2 |
|
32
|
|
|
if limit is not None: |
|
33
|
|
|
params["limit"] = limit |
|
34
|
|
|
if i1 is not None or i2 is not None: |
|
35
|
|
|
if len(params) > 0: |
|
36
|
|
|
raise AssertionError( |
|
37
|
|
|
"Stream cannot be accessed both by index and by timestamp at the same time.") |
|
38
|
|
|
if i1 is not None: |
|
39
|
|
|
params["i1"] = i1 |
|
40
|
|
|
if i2 is not None: |
|
41
|
|
|
params["i2"] = i2 |
|
42
|
|
|
|
|
43
|
|
|
# If no range is given, query whole stream |
|
44
|
|
|
if len(params) == 0: |
|
45
|
|
|
params["i1"] = 0 |
|
46
|
|
|
params["i2"] = 0 |
|
47
|
|
|
|
|
48
|
|
|
if transform is not None: |
|
49
|
|
|
params["transform"] = transform |
|
50
|
|
|
if downlink: |
|
51
|
|
|
params["downlink"] = True |
|
52
|
|
|
|
|
53
|
|
|
return params |
|
54
|
|
|
|
|
55
|
|
|
|
|
56
|
|
|
class Stream(ConnectorObject): |
|
57
|
|
|
|
|
58
|
|
|
def create(self, schema="{}", **kwargs): |
|
59
|
|
|
"""Creates a stream given an optional JSON schema encoded as a python dict. You can also add other properties |
|
60
|
|
|
of the stream, such as the icon, datatype or description. Create accepts both a string schema and |
|
61
|
|
|
a dict-encoded schema.""" |
|
62
|
|
|
if isinstance(schema, basestring): |
|
63
|
|
|
strschema = schema |
|
64
|
|
|
schema = json.loads(schema) |
|
65
|
|
|
else: |
|
66
|
|
|
strschema = json.dumps(schema) |
|
67
|
|
|
Draft4Validator.check_schema(schema) |
|
68
|
|
|
kwargs["schema"] = strschema |
|
69
|
|
|
self.metadata = self.db.create(self.path, kwargs).json() |
|
70
|
|
|
|
|
71
|
|
|
def insert_array(self, datapoint_array, restamp=False): |
|
72
|
|
|
"""given an array of datapoints, inserts them to the stream. This is different from insert(), |
|
73
|
|
|
because it requires an array of valid datapoints, whereas insert only requires the data portion |
|
74
|
|
|
of the datapoint, and fills out the rest:: |
|
75
|
|
|
|
|
76
|
|
|
s = cdb["mystream"] |
|
77
|
|
|
s.create({"type": "number"}) |
|
78
|
|
|
|
|
79
|
|
|
s.insert_array([{"d": 4, "t": time.time()},{"d": 5, "t": time.time()}], restamp=False) |
|
80
|
|
|
|
|
81
|
|
|
The optional `restamp` parameter specifies whether or not the database should rewrite the timestamps |
|
82
|
|
|
of datapoints which have a timestamp that is less than one that already exists in the database. |
|
83
|
|
|
|
|
84
|
|
|
That is, if restamp is False, and a datapoint has a timestamp less than a datapoint that already |
|
85
|
|
|
exists in the database, then the insert will fail. If restamp is True, then all datapoints |
|
86
|
|
|
with timestamps below the datapoints already in the database will have their timestamps overwritten |
|
87
|
|
|
to the same timestamp as the most recent datapoint hat already exists in the database, and the insert will |
|
88
|
|
|
succeed. |
|
89
|
|
|
""" |
|
90
|
|
|
|
|
91
|
|
|
# To be safe, we split into chunks |
|
92
|
|
|
while (len(datapoint_array) > DATAPOINT_INSERT_LIMIT): |
|
93
|
|
|
# We insert datapoints in chunks of a couple thousand so that they |
|
94
|
|
|
# fit in the insert size limit of ConnectorDB |
|
95
|
|
|
a = datapoint_array[:DATAPOINT_INSERT_LIMIT] |
|
96
|
|
|
|
|
97
|
|
|
if restamp: |
|
98
|
|
|
self.db.update(self.path + "/data", a) |
|
99
|
|
|
else: |
|
100
|
|
|
self.db.create(self.path + "/data", a) |
|
101
|
|
|
|
|
102
|
|
|
# Clear the written datapoints |
|
103
|
|
|
datapoint_array = datapoint_array[DATAPOINT_INSERT_LIMIT:] |
|
104
|
|
|
|
|
105
|
|
|
if restamp: |
|
106
|
|
|
self.db.update(self.path + "/data", datapoint_array) |
|
107
|
|
|
else: |
|
108
|
|
|
self.db.create(self.path + "/data", datapoint_array) |
|
109
|
|
|
|
|
110
|
|
|
def insert(self, data): |
|
111
|
|
|
"""insert inserts one datapoint with the given data, and appends it to |
|
112
|
|
|
the end of the stream:: |
|
113
|
|
|
|
|
114
|
|
|
s = cdb["mystream"] |
|
115
|
|
|
|
|
116
|
|
|
s.create({"type": "string"}) |
|
117
|
|
|
|
|
118
|
|
|
s.insert("Hello World!") |
|
119
|
|
|
|
|
120
|
|
|
""" |
|
121
|
|
|
self.insert_array([{"d": data, "t": time.time()}], restamp=True) |
|
122
|
|
|
|
|
123
|
|
|
def append(self, data): |
|
124
|
|
|
""" Same as insert, using the pythonic array name """ |
|
125
|
|
|
self.insert(data) |
|
126
|
|
|
|
|
127
|
|
|
def subscribe(self, callback, transform="", downlink=False): |
|
128
|
|
|
"""Subscribes to the stream, running the callback function each time datapoints are inserted into |
|
129
|
|
|
the given stream. There is an optional transform to the datapoints, and a downlink parameter.:: |
|
130
|
|
|
|
|
131
|
|
|
s = cdb["mystream"] |
|
132
|
|
|
|
|
133
|
|
|
def subscription_callback(stream,data): |
|
134
|
|
|
print stream, data |
|
135
|
|
|
|
|
136
|
|
|
s.subscribe(subscription_callback) |
|
137
|
|
|
|
|
138
|
|
|
The downlink parameter is for downlink streams - it allows to subscribe to the downlink substream, |
|
139
|
|
|
before it is acknowledged. This is especially useful for something like lights - have lights be |
|
140
|
|
|
a boolean downlink stream, and the light itself be subscribed to the downlink, so that other |
|
141
|
|
|
devices can write to the light, turning it on and off:: |
|
142
|
|
|
|
|
143
|
|
|
def light_control(stream,data): |
|
144
|
|
|
light_boolean = data[0]["d"] |
|
145
|
|
|
print "Setting light to", light_boolean |
|
146
|
|
|
set_light(light_boolean) |
|
147
|
|
|
|
|
148
|
|
|
#Acknowledge the write |
|
149
|
|
|
return True |
|
150
|
|
|
|
|
151
|
|
|
# We don't care about intermediate values, we only want the most recent setting |
|
152
|
|
|
# of the light, meaning we want the "if last" transform |
|
153
|
|
|
s.subscribe(light_control, downlink=True, transform="if last") |
|
154
|
|
|
|
|
155
|
|
|
""" |
|
156
|
|
|
streampath = self.path |
|
157
|
|
|
if downlink: |
|
158
|
|
|
streampath += "/downlink" |
|
159
|
|
|
|
|
160
|
|
|
return self.db.subscribe(streampath, callback, transform) |
|
161
|
|
|
|
|
162
|
|
|
def unsubscribe(self, transform="", downlink=False): |
|
163
|
|
|
"""Unsubscribes from a previously subscribed stream. Note that the same values of transform |
|
164
|
|
|
and downlink must be passed in order to do the correct unsubscribe:: |
|
165
|
|
|
|
|
166
|
|
|
s.subscribe(callback,transform="if last") |
|
167
|
|
|
s.unsubscribe(transform="if last") |
|
168
|
|
|
""" |
|
169
|
|
|
streampath = self.path |
|
170
|
|
|
if downlink: |
|
171
|
|
|
streampath += "/downlink" |
|
172
|
|
|
|
|
173
|
|
|
return self.db.unsubscribe(streampath, transform) |
|
174
|
|
|
|
|
175
|
|
|
def __call__(self, t1=None, t2=None, limit=None, i1=None, i2=None, downlink=False, transform=None): |
|
176
|
|
|
"""By calling the stream as a function, you can query it by either time range or index, |
|
177
|
|
|
and further you can perform a custom transform on the stream:: |
|
178
|
|
|
|
|
179
|
|
|
#Returns all datapoints with their data < 50 from the past minute |
|
180
|
|
|
stream(t1=time.time()-60, transform="if $ < 50") |
|
181
|
|
|
|
|
182
|
|
|
#Performs an aggregation on the stream, returning a single datapoint |
|
183
|
|
|
#which contains the sum of the datapoints |
|
184
|
|
|
stream(transform="sum | if last") |
|
185
|
|
|
|
|
186
|
|
|
""" |
|
187
|
|
|
params = query_maker(t1, t2, limit, i1, i2, transform, downlink) |
|
188
|
|
|
|
|
189
|
|
|
# In order to avoid accidental requests for full streams, ConnectorDB does not permit requests |
|
190
|
|
|
# without any url parameters, so we set i1=0 if we are requesting the |
|
191
|
|
|
# full stream |
|
192
|
|
|
if len(params) == 0: |
|
193
|
|
|
params["i1"] = 0 |
|
194
|
|
|
|
|
195
|
|
|
return DatapointArray(self.db.read(self.path + "/data", params).json()) |
|
196
|
|
|
|
|
197
|
|
|
def __getitem__(self, getrange): |
|
198
|
|
|
"""Allows accessing the stream just as if it were just one big python array. |
|
199
|
|
|
An example:: |
|
200
|
|
|
|
|
201
|
|
|
#Returns the most recent 5 datapoints from the stream |
|
202
|
|
|
stream[-5:] |
|
203
|
|
|
|
|
204
|
|
|
#Returns all the data the stream holds. |
|
205
|
|
|
stream[:] |
|
206
|
|
|
|
|
207
|
|
|
In order to perform transforms on the stream and to aggreagate data, look at __call__, |
|
208
|
|
|
which allows getting index ranges along with a transform. |
|
209
|
|
|
""" |
|
210
|
|
|
if not isinstance(getrange, slice): |
|
211
|
|
|
# Return the single datapoint |
|
212
|
|
|
return self(i1=getrange, i2=getrange + 1)[0] |
|
213
|
|
|
|
|
214
|
|
|
# The query is a slice - return the range |
|
215
|
|
|
return self(i1=getrange.start, i2=getrange.stop) |
|
216
|
|
|
|
|
217
|
|
|
def length(self, downlink=False): |
|
218
|
|
|
return int(self.db.read(self.path + "/data", {"q": "length", "downlink": downlink}).text) |
|
219
|
|
|
|
|
220
|
|
|
def __len__(self): |
|
221
|
|
|
"""taking len(stream) returns the number of datapoints saved within the database for the stream""" |
|
222
|
|
|
return self.length() |
|
223
|
|
|
|
|
224
|
|
|
def __repr__(self): |
|
225
|
|
|
"""Returns a string representation of the stream""" |
|
226
|
|
|
return "[Stream:%s]" % (self.path, ) |
|
227
|
|
|
|
|
228
|
|
|
def export(self, directory): |
|
229
|
|
|
"""Exports the stream to the given directory. The directory can't exist. |
|
230
|
|
|
You can later import this device by running import_stream on a device. |
|
231
|
|
|
""" |
|
232
|
|
|
if os.path.exists(directory): |
|
233
|
|
|
raise FileExistsError( |
|
234
|
|
|
"The stream export directory already exists") |
|
235
|
|
|
|
|
236
|
|
|
os.mkdir(directory) |
|
237
|
|
|
|
|
238
|
|
|
# Write the stream's info |
|
239
|
|
|
with open(os.path.join(directory, "stream.json"), "w") as f: |
|
240
|
|
|
json.dump(self.data, f) |
|
241
|
|
|
|
|
242
|
|
|
# Now write the stream's data |
|
243
|
|
|
# We sort it first, since older versions of ConnectorDB had a bug |
|
244
|
|
|
# where sometimes datapoints would be returned out of order. |
|
245
|
|
|
self[:].sort().writeJSON(os.path.join(directory, "data.json")) |
|
246
|
|
|
|
|
247
|
|
|
# And if the stream is a downlink, write the downlink data |
|
248
|
|
|
if self.downlink: |
|
249
|
|
|
self(i1=0, i2=0, downlink=True).sort().writeJSON(os.path.join(directory, "downlink.json")) |
|
250
|
|
|
|
|
251
|
|
|
# ----------------------------------------------------------------------- |
|
252
|
|
|
# Following are getters and setters of the stream's properties |
|
253
|
|
|
|
|
254
|
|
|
@property |
|
255
|
|
|
def datatype(self): |
|
256
|
|
|
"""returns the stream's registered datatype. The datatype suggests how the stream can be processed.""" |
|
257
|
|
|
if "datatype" in self.data: |
|
258
|
|
|
return self.data["datatype"] |
|
259
|
|
|
return "" |
|
260
|
|
|
|
|
261
|
|
|
@datatype.setter |
|
262
|
|
|
def datatype(self, set_datatype): |
|
263
|
|
|
self.set({"datatype": set_datatype}) |
|
264
|
|
|
|
|
265
|
|
|
@property |
|
266
|
|
|
def downlink(self): |
|
267
|
|
|
"""returns whether the stream is a downlink, meaning that it accepts input (like turning lights on/off)""" |
|
268
|
|
|
if "downlink" in self.data: |
|
269
|
|
|
return self.data["downlink"] |
|
270
|
|
|
return False |
|
271
|
|
|
|
|
272
|
|
|
@downlink.setter |
|
273
|
|
|
def downlink(self, is_downlink): |
|
274
|
|
|
self.set({"downlink": is_downlink}) |
|
275
|
|
|
|
|
276
|
|
|
@property |
|
277
|
|
|
def ephemeral(self): |
|
278
|
|
|
"""returns whether the stream is ephemeral, meaning that data is not saved, but just passes through the messaging system.""" |
|
279
|
|
|
if "ephemeral" in self.data: |
|
280
|
|
|
return self.data["ephemeral"] |
|
281
|
|
|
return False |
|
282
|
|
|
|
|
283
|
|
|
@ephemeral.setter |
|
284
|
|
|
def ephemeral(self, is_ephemeral): |
|
285
|
|
|
"""sets whether the stream is ephemeral, meaning that it sets whether the datapoints are saved in the database. |
|
286
|
|
|
an ephemeral stream is useful for things which are set very frequently, and which could want a subscription, but |
|
287
|
|
|
which are not important enough to be saved in the database""" |
|
288
|
|
|
self.set({"ephemeral": is_ephemeral}) |
|
289
|
|
|
|
|
290
|
|
|
@property |
|
291
|
|
|
def schema(self): |
|
292
|
|
|
"""Returns the JSON schema of the stream as a python dict.""" |
|
293
|
|
|
if "schema" in self.data: |
|
294
|
|
|
return json.loads(self.data["schema"]) |
|
295
|
|
|
return None |
|
296
|
|
|
|
|
297
|
|
|
@property |
|
298
|
|
|
def sschema(self): |
|
299
|
|
|
"""Returns the JSON schema of the stream as a string""" |
|
300
|
|
|
if "schema" in self.data: |
|
301
|
|
|
return self.data["schema"] |
|
302
|
|
|
return None |
|
303
|
|
|
|
|
304
|
|
|
@schema.setter |
|
305
|
|
|
def schema(self, schema): |
|
306
|
|
|
"""sets the stream's schema. An empty schema is "{}". The schemas allow you to set a specific data type. |
|
307
|
|
|
Both python dicts and strings are accepted.""" |
|
308
|
|
|
if isinstance(schema, basestring): |
|
309
|
|
|
strschema = schema |
|
310
|
|
|
schema = json.loads(schema) |
|
311
|
|
|
else: |
|
312
|
|
|
strschema = json.dumps(schema) |
|
313
|
|
|
Draft4Validator.check_schema(schema) |
|
314
|
|
|
self.set({"schema": strschema}) |
|
315
|
|
|
|
|
316
|
|
|
@property |
|
317
|
|
|
def user(self): |
|
318
|
|
|
"""user returns the user which owns the given stream""" |
|
319
|
|
|
return User(self.db, self.path.split("/")[0]) |
|
320
|
|
|
|
|
321
|
|
|
@property |
|
322
|
|
|
def device(self): |
|
323
|
|
|
"""returns the device which owns the given stream""" |
|
324
|
|
|
splitted_path = self.path.split("/") |
|
325
|
|
|
|
|
326
|
|
|
return Device(self.db, |
|
327
|
|
|
splitted_path[0] + "/" + splitted_path[1]) |
|
328
|
|
|
|
|
329
|
|
|
|
|
330
|
|
|
# The import has to go on the bottom because py3 imports are annoying |
|
331
|
|
|
from ._user import User |
|
332
|
|
|
from ._device import Device |
|
333
|
|
|
|