1
|
|
|
from __future__ import absolute_import |
2
|
|
|
|
3
|
|
|
from ._connectorobject import ConnectorObject |
4
|
|
|
|
5
|
|
|
from jsonschema import Draft4Validator |
6
|
|
|
import json |
7
|
|
|
|
8
|
|
|
# https://github.com/oxplot/fysom/issues/1 |
9
|
|
|
try: |
10
|
|
|
unicode = unicode |
11
|
|
|
except NameError: |
12
|
|
|
basestring = (str,bytes) |
13
|
|
|
|
14
|
|
|
|
15
|
|
|
def query_maker(t1=None, t2=None, limit=None, i1=None, i2=None, transform=None, downlink=False): |
16
|
|
|
"""query_maker takes the optional arguments and constructs a json query for a stream's |
17
|
|
|
datapoints using it:: |
18
|
|
|
#{"t1": 5, "transform": "if $ > 5"} |
19
|
|
|
print query_maker(t1=5,transform="if $ > 5") |
20
|
|
|
""" |
21
|
|
|
params = {} |
22
|
|
|
if t1 is not None: |
23
|
|
|
params["t1"] = t1 |
24
|
|
|
if t2 is not None: |
25
|
|
|
params["t2"] = t2 |
26
|
|
|
if limit is not None: |
27
|
|
|
params["limit"] = limit |
28
|
|
|
if i1 is not None or i2 is not None: |
29
|
|
|
if len(params) > 0: |
30
|
|
|
raise AssertionError( |
31
|
|
|
"Stream cannot be accessed both by index and by timestamp at the same time.") |
32
|
|
|
if i1 is not None: |
33
|
|
|
params["i1"] = i1 |
34
|
|
|
if i2 is not None: |
35
|
|
|
params["i2"] = i2 |
36
|
|
|
|
37
|
|
|
# If no range is given, query whole stream |
38
|
|
|
if len(params) == 0: |
39
|
|
|
params["i1"] = 0 |
40
|
|
|
params["i2"] = 0 |
41
|
|
|
|
42
|
|
|
if transform is not None: |
43
|
|
|
params["transform"] = transform |
44
|
|
|
if downlink: |
45
|
|
|
params["downlink"] = True |
46
|
|
|
|
47
|
|
|
return params |
48
|
|
|
|
49
|
|
|
|
50
|
|
|
class Stream(ConnectorObject): |
51
|
|
|
def create(self, schema="{}", **kwargs): |
52
|
|
|
"""Creates a stream given an optional JSON schema encoded as a python dict. You can also add other properties |
53
|
|
|
of the stream, such as the icon, datatype or description. Create accepts both a string schema and |
54
|
|
|
a dict-encoded schema.""" |
55
|
|
|
if isinstance(schema,basestring): |
56
|
|
|
strschema = schema |
57
|
|
|
schema = json.loads(schema) |
58
|
|
|
else: |
59
|
|
|
strschema = json.dumps(schema) |
60
|
|
|
Draft4Validator.check_schema(schema) |
61
|
|
|
kwargs["schema"] = strschema |
62
|
|
|
self.metadata = self.db.create(self.path, kwargs).json() |
63
|
|
|
|
64
|
|
|
def insert_array(self, datapoint_array, restamp=False): |
65
|
|
|
"""given an array of datapoints, inserts them to the stream. This is different from insert(), |
66
|
|
|
because it requires an array of valid datapoints, whereas insert only requires the data portion |
67
|
|
|
of the datapoint, and fills out the rest:: |
68
|
|
|
|
69
|
|
|
s = cdb["mystream"] |
70
|
|
|
s.create({"type": "number"}) |
71
|
|
|
|
72
|
|
|
s.insert_array([{"d": 4, "t": time.time()},{"d": 5, "t": time.time()}], restamp=False) |
73
|
|
|
|
74
|
|
|
The optional `restamp` parameter specifies whether or not the database should rewrite the timestamps |
75
|
|
|
of datapoints which have a timestamp that is less than one that already exists in the database. |
76
|
|
|
|
77
|
|
|
That is, if restamp is False, and a datapoint has a timestamp less than a datapoint that already |
78
|
|
|
exists in the database, then the insert will fail. If restamp is True, then all datapoints |
79
|
|
|
with timestamps below the datapoints already in the database will have their timestamps overwritten |
80
|
|
|
to the same timestamp as the most recent datapoint hat already exists in the database, and the insert will |
81
|
|
|
succeed. |
82
|
|
|
""" |
83
|
|
|
if restamp: |
84
|
|
|
self.db.update(self.path + "/data", datapoint_array) |
85
|
|
|
else: |
86
|
|
|
self.db.create(self.path + "/data", datapoint_array) |
87
|
|
|
|
88
|
|
|
def insert(self, data): |
89
|
|
|
"""insert inserts one datapoint with the given data, and appends it to |
90
|
|
|
the end of the stream:: |
91
|
|
|
|
92
|
|
|
s = cdb["mystream"] |
93
|
|
|
|
94
|
|
|
s.create({"type": "string"}) |
95
|
|
|
|
96
|
|
|
s.insert("Hello World!") |
97
|
|
|
|
98
|
|
|
""" |
99
|
|
|
self.insert_array([{"d": data}], restamp=True) |
100
|
|
|
|
101
|
|
|
def append(self,data): |
102
|
|
|
""" Same as insert, using the pythonic array name """ |
103
|
|
|
self.insert(data) |
104
|
|
|
|
105
|
|
|
def subscribe(self, callback, transform="", downlink=False): |
106
|
|
|
"""Subscribes to the stream, running the callback function each time datapoints are inserted into |
107
|
|
|
the given stream. There is an optional transform to the datapoints, and a downlink parameter.:: |
108
|
|
|
|
109
|
|
|
s = cdb["mystream"] |
110
|
|
|
|
111
|
|
|
def subscription_callback(stream,data): |
112
|
|
|
print stream, data |
113
|
|
|
|
114
|
|
|
s.subscribe(subscription_callback) |
115
|
|
|
|
116
|
|
|
The downlink parameter is for downlink streams - it allows to subscribe to the downlink substream, |
117
|
|
|
before it is acknowledged. This is especially useful for something like lights - have lights be |
118
|
|
|
a boolean downlink stream, and the light itself be subscribed to the downlink, so that other |
119
|
|
|
devices can write to the light, turning it on and off:: |
120
|
|
|
|
121
|
|
|
def light_control(stream,data): |
122
|
|
|
light_boolean = data[0]["d"] |
123
|
|
|
print "Setting light to", light_boolean |
124
|
|
|
set_light(light_boolean) |
125
|
|
|
|
126
|
|
|
#Acknowledge the write |
127
|
|
|
return True |
128
|
|
|
|
129
|
|
|
# We don't care about intermediate values, we only want the most recent setting |
130
|
|
|
# of the light, meaning we want the "if last" transform |
131
|
|
|
s.subscribe(light_control, downlink=True, transform="if last") |
132
|
|
|
|
133
|
|
|
""" |
134
|
|
|
streampath = self.path |
135
|
|
|
if downlink: |
136
|
|
|
streampath += "/downlink" |
137
|
|
|
|
138
|
|
|
return self.db.subscribe(streampath, callback, transform) |
139
|
|
|
|
140
|
|
|
def unsubscribe(self, transform="", downlink=False): |
141
|
|
|
"""Unsubscribes from a previously subscribed stream. Note that the same values of transform |
142
|
|
|
and downlink must be passed in order to do the correct unsubscribe:: |
143
|
|
|
|
144
|
|
|
s.subscribe(callback,transform="if last") |
145
|
|
|
s.unsubscribe(transform="if last") |
146
|
|
|
""" |
147
|
|
|
streampath = self.path |
148
|
|
|
if downlink: |
149
|
|
|
streampath += "/downlink" |
150
|
|
|
|
151
|
|
|
return self.db.unsubscribe(streampath, transform) |
152
|
|
|
|
153
|
|
|
def __call__(self, t1=None, t2=None, limit=None, i1=None, i2=None, downlink=False, transform=None): |
154
|
|
|
"""By calling the stream as a function, you can query it by either time range or index, |
155
|
|
|
and further you can perform a custom transform on the stream:: |
156
|
|
|
|
157
|
|
|
#Returns all datapoints with their data < 50 from the past minute |
158
|
|
|
stream(t1=time.time()-60, transform="if $ < 50") |
159
|
|
|
|
160
|
|
|
#Performs an aggregation on the stream, returning a single datapoint |
161
|
|
|
#which contains the sum of the datapoints |
162
|
|
|
stream(transform="sum | if last") |
163
|
|
|
|
164
|
|
|
""" |
165
|
|
|
params = query_maker(t1, t2, limit, i1, i2, transform,downlink) |
166
|
|
|
|
167
|
|
|
# In order to avoid accidental requests for full streams, ConnectorDB does not permit requests |
168
|
|
|
# without any url parameters, so we set i1=0 if we are requesting the full stream |
169
|
|
|
if len(params) == 0: |
170
|
|
|
params["i1"] = 0 |
171
|
|
|
|
172
|
|
|
return self.db.read(self.path + "/data", params).json() |
173
|
|
|
|
174
|
|
|
def __getitem__(self, getrange): |
175
|
|
|
"""Allows accessing the stream just as if it were just one big python array. |
176
|
|
|
An example:: |
177
|
|
|
|
178
|
|
|
#Returns the most recent 5 datapoints from the stream |
179
|
|
|
stream[-5:] |
180
|
|
|
|
181
|
|
|
#Returns all the data the stream holds. |
182
|
|
|
stream[:] |
183
|
|
|
|
184
|
|
|
In order to perform transforms on the stream and to aggreagate data, look at __call__, |
185
|
|
|
which allows getting index ranges along with a transform. |
186
|
|
|
""" |
187
|
|
|
if not isinstance(getrange, slice): |
188
|
|
|
# Return the single datapoint |
189
|
|
|
return self(i1=getrange, i2=getrange + 1)[0] |
190
|
|
|
|
191
|
|
|
# The query is a slice - return the range |
192
|
|
|
return self(i1=getrange.start, i2=getrange.stop) |
193
|
|
|
|
194
|
|
|
def length(self,downlink=False): |
195
|
|
|
return int(self.db.read(self.path + "/data", {"q": "length","downlink":downlink}).text) |
196
|
|
|
|
197
|
|
|
def __len__(self): |
198
|
|
|
"""taking len(stream) returns the number of datapoints saved within the database for the stream""" |
199
|
|
|
return self.length() |
200
|
|
|
|
201
|
|
|
def __repr__(self): |
202
|
|
|
"""Returns a string representation of the stream""" |
203
|
|
|
return "[Stream:%s]" % (self.path, ) |
204
|
|
|
|
205
|
|
|
# ----------------------------------------------------------------------- |
206
|
|
|
# Following are getters and setters of the stream's properties |
207
|
|
|
|
208
|
|
|
@property |
209
|
|
|
def datatype(self): |
210
|
|
|
"""returns the stream's registered datatype. The datatype suggests how the stream can be processed.""" |
211
|
|
|
if "datatype" in self.data: |
212
|
|
|
return self.data["datatype"] |
213
|
|
|
return "" |
214
|
|
|
@datatype.setter |
215
|
|
|
def datatype(self,set_datatype): |
216
|
|
|
self.set({"datatype": set_datatype}) |
217
|
|
|
|
218
|
|
|
@property |
219
|
|
|
def downlink(self): |
220
|
|
|
"""returns whether the stream is a downlink, meaning that it accepts input (like turning lights on/off)""" |
221
|
|
|
if "downlink" in self.data: |
222
|
|
|
return self.data["downlink"] |
223
|
|
|
return False |
224
|
|
|
|
225
|
|
|
@downlink.setter |
226
|
|
|
def downlink(self, is_downlink): |
227
|
|
|
self.set({"downlink": is_downlink}) |
228
|
|
|
|
229
|
|
|
@property |
230
|
|
|
def ephemeral(self): |
231
|
|
|
"""returns whether the stream is ephemeral, meaning that data is not saved, but just passes through the messaging system.""" |
232
|
|
|
if "ephemeral" in self.data: |
233
|
|
|
return self.data["ephemeral"] |
234
|
|
|
return False |
235
|
|
|
|
236
|
|
|
@ephemeral.setter |
237
|
|
|
def ephemeral(self, is_ephemeral): |
238
|
|
|
"""sets whether the stream is ephemeral, meaning that it sets whether the datapoints are saved in the database. |
239
|
|
|
an ephemeral stream is useful for things which are set very frequently, and which could want a subscription, but |
240
|
|
|
which are not important enough to be saved in the database""" |
241
|
|
|
self.set({"ephemeral": is_ephemeral}) |
242
|
|
|
|
243
|
|
|
@property |
244
|
|
|
def schema(self): |
245
|
|
|
"""Returns the JSON schema of the stream as a python dict.""" |
246
|
|
|
if "schema" in self.data: |
247
|
|
|
return json.loads(self.data["schema"]) |
248
|
|
|
return None |
249
|
|
|
|
250
|
|
|
@property |
251
|
|
|
def sschema(self): |
252
|
|
|
"""Returns the JSON schema of the stream as a string""" |
253
|
|
|
if "schema" in self.data: |
254
|
|
|
return self.data["schema"] |
255
|
|
|
return None |
256
|
|
|
|
257
|
|
|
@schema.setter |
258
|
|
|
def schema(self, schema): |
259
|
|
|
"""sets the stream's schema. An empty schema is "{}". The schemas allow you to set a specific data type. |
260
|
|
|
Both python dicts and strings are accepted.""" |
261
|
|
|
if isinstance(schema,basestring): |
262
|
|
|
strschema = schema |
263
|
|
|
schema = json.loads(schema) |
264
|
|
|
else: |
265
|
|
|
strschema = json.dumps(schema) |
266
|
|
|
Draft4Validator.check_schema(schema) |
267
|
|
|
self.set({"schema": strschema}) |
268
|
|
|
|
269
|
|
|
@property |
270
|
|
|
def user(self): |
271
|
|
|
"""user returns the user which owns the given stream""" |
272
|
|
|
return User(self.db, self.path.split("/")[0]) |
273
|
|
|
|
274
|
|
|
@property |
275
|
|
|
def device(self): |
276
|
|
|
"""returns the device which owns the given stream""" |
277
|
|
|
splitted_path = self.path.split("/") |
278
|
|
|
|
279
|
|
|
return Device(self.db, |
280
|
|
|
splitted_path[0] + "/" + splitted_path[1]) |
281
|
|
|
|
282
|
|
|
|
283
|
|
|
# The import has to go on the bottom because py3 imports are annoying |
284
|
|
|
from ._user import User |
285
|
|
|
from ._device import Device |
286
|
|
|
|