1
|
|
|
from __future__ import absolute_import |
2
|
|
|
import json |
3
|
|
|
import os |
4
|
|
|
|
5
|
|
|
from ._connectorobject import ConnectorObject |
6
|
|
|
|
7
|
|
|
from jsonschema import Draft4Validator |
8
|
|
|
import json |
9
|
|
|
|
10
|
|
|
# https://github.com/oxplot/fysom/issues/1 |
11
|
|
|
try: |
12
|
|
|
unicode = unicode |
13
|
|
|
except NameError: |
14
|
|
|
basestring = (str, bytes) |
15
|
|
|
|
16
|
|
|
DATAPOINT_INSERT_LIMIT = 5000 |
17
|
|
|
|
18
|
|
|
|
19
|
|
|
def query_maker(t1=None, t2=None, limit=None, i1=None, i2=None, transform=None, downlink=False): |
20
|
|
|
"""query_maker takes the optional arguments and constructs a json query for a stream's |
21
|
|
|
datapoints using it:: |
22
|
|
|
#{"t1": 5, "transform": "if $ > 5"} |
23
|
|
|
print query_maker(t1=5,transform="if $ > 5") |
24
|
|
|
""" |
25
|
|
|
params = {} |
26
|
|
|
if t1 is not None: |
27
|
|
|
params["t1"] = t1 |
28
|
|
|
if t2 is not None: |
29
|
|
|
params["t2"] = t2 |
30
|
|
|
if limit is not None: |
31
|
|
|
params["limit"] = limit |
32
|
|
|
if i1 is not None or i2 is not None: |
33
|
|
|
if len(params) > 0: |
34
|
|
|
raise AssertionError( |
35
|
|
|
"Stream cannot be accessed both by index and by timestamp at the same time.") |
36
|
|
|
if i1 is not None: |
37
|
|
|
params["i1"] = i1 |
38
|
|
|
if i2 is not None: |
39
|
|
|
params["i2"] = i2 |
40
|
|
|
|
41
|
|
|
# If no range is given, query whole stream |
42
|
|
|
if len(params) == 0: |
43
|
|
|
params["i1"] = 0 |
44
|
|
|
params["i2"] = 0 |
45
|
|
|
|
46
|
|
|
if transform is not None: |
47
|
|
|
params["transform"] = transform |
48
|
|
|
if downlink: |
49
|
|
|
params["downlink"] = True |
50
|
|
|
|
51
|
|
|
return params |
52
|
|
|
|
53
|
|
|
|
54
|
|
|
class Stream(ConnectorObject): |
55
|
|
|
|
56
|
|
|
def create(self, schema="{}", **kwargs): |
57
|
|
|
"""Creates a stream given an optional JSON schema encoded as a python dict. You can also add other properties |
58
|
|
|
of the stream, such as the icon, datatype or description. Create accepts both a string schema and |
59
|
|
|
a dict-encoded schema.""" |
60
|
|
|
if isinstance(schema, basestring): |
61
|
|
|
strschema = schema |
62
|
|
|
schema = json.loads(schema) |
63
|
|
|
else: |
64
|
|
|
strschema = json.dumps(schema) |
65
|
|
|
Draft4Validator.check_schema(schema) |
66
|
|
|
kwargs["schema"] = strschema |
67
|
|
|
self.metadata = self.db.create(self.path, kwargs).json() |
68
|
|
|
|
69
|
|
|
def insert_array(self, datapoint_array, restamp=False): |
70
|
|
|
"""given an array of datapoints, inserts them to the stream. This is different from insert(), |
71
|
|
|
because it requires an array of valid datapoints, whereas insert only requires the data portion |
72
|
|
|
of the datapoint, and fills out the rest:: |
73
|
|
|
|
74
|
|
|
s = cdb["mystream"] |
75
|
|
|
s.create({"type": "number"}) |
76
|
|
|
|
77
|
|
|
s.insert_array([{"d": 4, "t": time.time()},{"d": 5, "t": time.time()}], restamp=False) |
78
|
|
|
|
79
|
|
|
The optional `restamp` parameter specifies whether or not the database should rewrite the timestamps |
80
|
|
|
of datapoints which have a timestamp that is less than one that already exists in the database. |
81
|
|
|
|
82
|
|
|
That is, if restamp is False, and a datapoint has a timestamp less than a datapoint that already |
83
|
|
|
exists in the database, then the insert will fail. If restamp is True, then all datapoints |
84
|
|
|
with timestamps below the datapoints already in the database will have their timestamps overwritten |
85
|
|
|
to the same timestamp as the most recent datapoint hat already exists in the database, and the insert will |
86
|
|
|
succeed. |
87
|
|
|
""" |
88
|
|
|
|
89
|
|
|
# To be safe, we split into chunks |
90
|
|
|
while (len(datapoint_array) > DATAPOINT_INSERT_LIMIT): |
91
|
|
|
# We insert datapoints in chunks of a couple thousand so that they |
92
|
|
|
# fit in the insert size limit of ConnectorDB |
93
|
|
|
a = datapoint_array[:DATAPOINT_INSERT_LIMIT] |
94
|
|
|
|
95
|
|
|
if restamp: |
96
|
|
|
self.db.update(self.path + "/data", a) |
97
|
|
|
else: |
98
|
|
|
self.db.create(self.path + "/data", a) |
99
|
|
|
|
100
|
|
|
# Clear the written datapoints |
101
|
|
|
datapoint_array = datapoint_array[DATAPOINT_INSERT_LIMIT:] |
102
|
|
|
|
103
|
|
|
if restamp: |
104
|
|
|
self.db.update(self.path + "/data", datapoint_array) |
105
|
|
|
else: |
106
|
|
|
self.db.create(self.path + "/data", datapoint_array) |
107
|
|
|
|
108
|
|
|
def insert(self, data): |
109
|
|
|
"""insert inserts one datapoint with the given data, and appends it to |
110
|
|
|
the end of the stream:: |
111
|
|
|
|
112
|
|
|
s = cdb["mystream"] |
113
|
|
|
|
114
|
|
|
s.create({"type": "string"}) |
115
|
|
|
|
116
|
|
|
s.insert("Hello World!") |
117
|
|
|
|
118
|
|
|
""" |
119
|
|
|
self.insert_array([{"d": data}], restamp=True) |
120
|
|
|
|
121
|
|
|
def append(self, data): |
122
|
|
|
""" Same as insert, using the pythonic array name """ |
123
|
|
|
self.insert(data) |
124
|
|
|
|
125
|
|
|
def subscribe(self, callback, transform="", downlink=False): |
126
|
|
|
"""Subscribes to the stream, running the callback function each time datapoints are inserted into |
127
|
|
|
the given stream. There is an optional transform to the datapoints, and a downlink parameter.:: |
128
|
|
|
|
129
|
|
|
s = cdb["mystream"] |
130
|
|
|
|
131
|
|
|
def subscription_callback(stream,data): |
132
|
|
|
print stream, data |
133
|
|
|
|
134
|
|
|
s.subscribe(subscription_callback) |
135
|
|
|
|
136
|
|
|
The downlink parameter is for downlink streams - it allows to subscribe to the downlink substream, |
137
|
|
|
before it is acknowledged. This is especially useful for something like lights - have lights be |
138
|
|
|
a boolean downlink stream, and the light itself be subscribed to the downlink, so that other |
139
|
|
|
devices can write to the light, turning it on and off:: |
140
|
|
|
|
141
|
|
|
def light_control(stream,data): |
142
|
|
|
light_boolean = data[0]["d"] |
143
|
|
|
print "Setting light to", light_boolean |
144
|
|
|
set_light(light_boolean) |
145
|
|
|
|
146
|
|
|
#Acknowledge the write |
147
|
|
|
return True |
148
|
|
|
|
149
|
|
|
# We don't care about intermediate values, we only want the most recent setting |
150
|
|
|
# of the light, meaning we want the "if last" transform |
151
|
|
|
s.subscribe(light_control, downlink=True, transform="if last") |
152
|
|
|
|
153
|
|
|
""" |
154
|
|
|
streampath = self.path |
155
|
|
|
if downlink: |
156
|
|
|
streampath += "/downlink" |
157
|
|
|
|
158
|
|
|
return self.db.subscribe(streampath, callback, transform) |
159
|
|
|
|
160
|
|
|
def unsubscribe(self, transform="", downlink=False): |
161
|
|
|
"""Unsubscribes from a previously subscribed stream. Note that the same values of transform |
162
|
|
|
and downlink must be passed in order to do the correct unsubscribe:: |
163
|
|
|
|
164
|
|
|
s.subscribe(callback,transform="if last") |
165
|
|
|
s.unsubscribe(transform="if last") |
166
|
|
|
""" |
167
|
|
|
streampath = self.path |
168
|
|
|
if downlink: |
169
|
|
|
streampath += "/downlink" |
170
|
|
|
|
171
|
|
|
return self.db.unsubscribe(streampath, transform) |
172
|
|
|
|
173
|
|
|
def __call__(self, t1=None, t2=None, limit=None, i1=None, i2=None, downlink=False, transform=None): |
174
|
|
|
"""By calling the stream as a function, you can query it by either time range or index, |
175
|
|
|
and further you can perform a custom transform on the stream:: |
176
|
|
|
|
177
|
|
|
#Returns all datapoints with their data < 50 from the past minute |
178
|
|
|
stream(t1=time.time()-60, transform="if $ < 50") |
179
|
|
|
|
180
|
|
|
#Performs an aggregation on the stream, returning a single datapoint |
181
|
|
|
#which contains the sum of the datapoints |
182
|
|
|
stream(transform="sum | if last") |
183
|
|
|
|
184
|
|
|
""" |
185
|
|
|
params = query_maker(t1, t2, limit, i1, i2, transform, downlink) |
186
|
|
|
|
187
|
|
|
# In order to avoid accidental requests for full streams, ConnectorDB does not permit requests |
188
|
|
|
# without any url parameters, so we set i1=0 if we are requesting the |
189
|
|
|
# full stream |
190
|
|
|
if len(params) == 0: |
191
|
|
|
params["i1"] = 0 |
192
|
|
|
|
193
|
|
|
return self.db.read(self.path + "/data", params).json() |
194
|
|
|
|
195
|
|
|
def __getitem__(self, getrange): |
196
|
|
|
"""Allows accessing the stream just as if it were just one big python array. |
197
|
|
|
An example:: |
198
|
|
|
|
199
|
|
|
#Returns the most recent 5 datapoints from the stream |
200
|
|
|
stream[-5:] |
201
|
|
|
|
202
|
|
|
#Returns all the data the stream holds. |
203
|
|
|
stream[:] |
204
|
|
|
|
205
|
|
|
In order to perform transforms on the stream and to aggreagate data, look at __call__, |
206
|
|
|
which allows getting index ranges along with a transform. |
207
|
|
|
""" |
208
|
|
|
if not isinstance(getrange, slice): |
209
|
|
|
# Return the single datapoint |
210
|
|
|
return self(i1=getrange, i2=getrange + 1)[0] |
211
|
|
|
|
212
|
|
|
# The query is a slice - return the range |
213
|
|
|
return self(i1=getrange.start, i2=getrange.stop) |
214
|
|
|
|
215
|
|
|
def length(self, downlink=False): |
216
|
|
|
return int(self.db.read(self.path + "/data", {"q": "length", "downlink": downlink}).text) |
217
|
|
|
|
218
|
|
|
def __len__(self): |
219
|
|
|
"""taking len(stream) returns the number of datapoints saved within the database for the stream""" |
220
|
|
|
return self.length() |
221
|
|
|
|
222
|
|
|
def __repr__(self): |
223
|
|
|
"""Returns a string representation of the stream""" |
224
|
|
|
return "[Stream:%s]" % (self.path, ) |
225
|
|
|
|
226
|
|
|
def export(self, directory): |
227
|
|
|
"""Exports the stream to the given directory. The directory can't exist. |
228
|
|
|
You can later import this device by running import_stream on a device. |
229
|
|
|
""" |
230
|
|
|
if os.path.exists(directory): |
231
|
|
|
raise FileExistsError( |
232
|
|
|
"The stream export directory already exists") |
233
|
|
|
|
234
|
|
|
os.mkdir(directory) |
235
|
|
|
|
236
|
|
|
# Write the stream's info |
237
|
|
|
with open(os.path.join(directory, "stream.json"), "w") as f: |
238
|
|
|
json.dump(self.data, f) |
239
|
|
|
|
240
|
|
|
# Now write the stream's data |
241
|
|
|
with open(os.path.join(directory, "data.json"), "w") as f: |
242
|
|
|
json.dump(self[:], f) |
243
|
|
|
|
244
|
|
|
# And if the stream is a downlink, write the downlink data |
245
|
|
|
if self.downlink: |
246
|
|
|
with open(os.path.join(directory, "downlink.json"), "w") as f: |
247
|
|
|
json.dump(self(i1=0, i2=0, downlink=True), f) |
248
|
|
|
|
249
|
|
|
# ----------------------------------------------------------------------- |
250
|
|
|
# Following are getters and setters of the stream's properties |
251
|
|
|
|
252
|
|
|
@property |
253
|
|
|
def datatype(self): |
254
|
|
|
"""returns the stream's registered datatype. The datatype suggests how the stream can be processed.""" |
255
|
|
|
if "datatype" in self.data: |
256
|
|
|
return self.data["datatype"] |
257
|
|
|
return "" |
258
|
|
|
|
259
|
|
|
@datatype.setter |
260
|
|
|
def datatype(self, set_datatype): |
261
|
|
|
self.set({"datatype": set_datatype}) |
262
|
|
|
|
263
|
|
|
@property |
264
|
|
|
def downlink(self): |
265
|
|
|
"""returns whether the stream is a downlink, meaning that it accepts input (like turning lights on/off)""" |
266
|
|
|
if "downlink" in self.data: |
267
|
|
|
return self.data["downlink"] |
268
|
|
|
return False |
269
|
|
|
|
270
|
|
|
@downlink.setter |
271
|
|
|
def downlink(self, is_downlink): |
272
|
|
|
self.set({"downlink": is_downlink}) |
273
|
|
|
|
274
|
|
|
@property |
275
|
|
|
def ephemeral(self): |
276
|
|
|
"""returns whether the stream is ephemeral, meaning that data is not saved, but just passes through the messaging system.""" |
277
|
|
|
if "ephemeral" in self.data: |
278
|
|
|
return self.data["ephemeral"] |
279
|
|
|
return False |
280
|
|
|
|
281
|
|
|
@ephemeral.setter |
282
|
|
|
def ephemeral(self, is_ephemeral): |
283
|
|
|
"""sets whether the stream is ephemeral, meaning that it sets whether the datapoints are saved in the database. |
284
|
|
|
an ephemeral stream is useful for things which are set very frequently, and which could want a subscription, but |
285
|
|
|
which are not important enough to be saved in the database""" |
286
|
|
|
self.set({"ephemeral": is_ephemeral}) |
287
|
|
|
|
288
|
|
|
@property |
289
|
|
|
def schema(self): |
290
|
|
|
"""Returns the JSON schema of the stream as a python dict.""" |
291
|
|
|
if "schema" in self.data: |
292
|
|
|
return json.loads(self.data["schema"]) |
293
|
|
|
return None |
294
|
|
|
|
295
|
|
|
@property |
296
|
|
|
def sschema(self): |
297
|
|
|
"""Returns the JSON schema of the stream as a string""" |
298
|
|
|
if "schema" in self.data: |
299
|
|
|
return self.data["schema"] |
300
|
|
|
return None |
301
|
|
|
|
302
|
|
|
@schema.setter |
303
|
|
|
def schema(self, schema): |
304
|
|
|
"""sets the stream's schema. An empty schema is "{}". The schemas allow you to set a specific data type. |
305
|
|
|
Both python dicts and strings are accepted.""" |
306
|
|
|
if isinstance(schema, basestring): |
307
|
|
|
strschema = schema |
308
|
|
|
schema = json.loads(schema) |
309
|
|
|
else: |
310
|
|
|
strschema = json.dumps(schema) |
311
|
|
|
Draft4Validator.check_schema(schema) |
312
|
|
|
self.set({"schema": strschema}) |
313
|
|
|
|
314
|
|
|
@property |
315
|
|
|
def user(self): |
316
|
|
|
"""user returns the user which owns the given stream""" |
317
|
|
|
return User(self.db, self.path.split("/")[0]) |
318
|
|
|
|
319
|
|
|
@property |
320
|
|
|
def device(self): |
321
|
|
|
"""returns the device which owns the given stream""" |
322
|
|
|
splitted_path = self.path.split("/") |
323
|
|
|
|
324
|
|
|
return Device(self.db, |
325
|
|
|
splitted_path[0] + "/" + splitted_path[1]) |
326
|
|
|
|
327
|
|
|
|
328
|
|
|
# The import has to go on the bottom because py3 imports are annoying |
329
|
|
|
from ._user import User |
330
|
|
|
from ._device import Device |
331
|
|
|
|