1
|
|
|
from __future__ import absolute_import |
2
|
|
|
|
3
|
|
|
from ._connectorobject import ConnectorObject |
4
|
|
|
|
5
|
|
|
from jsonschema import Draft4Validator |
6
|
|
|
import json |
7
|
|
|
|
8
|
|
|
# https://github.com/oxplot/fysom/issues/1 |
9
|
|
|
try: |
10
|
|
|
unicode = unicode |
11
|
|
|
except NameError: |
12
|
|
|
basestring = (str,bytes) |
13
|
|
|
|
14
|
|
|
|
15
|
|
|
def query_maker(t1=None, t2=None, limit=None, i1=None, i2=None, transform=None): |
16
|
|
|
"""query_maker takes the optional arguments and constructs a json query for a stream's |
17
|
|
|
datapoints using it:: |
18
|
|
|
#{"t1": 5, "transform": "if $ > 5"} |
19
|
|
|
print query_maker(t1=5,transform="if $ > 5") |
20
|
|
|
""" |
21
|
|
|
params = {} |
22
|
|
|
if t1 is not None: |
23
|
|
|
params["t1"] = t1 |
24
|
|
|
if t2 is not None: |
25
|
|
|
params["t2"] = t2 |
26
|
|
|
if limit is not None: |
27
|
|
|
params["limit"] = limit |
28
|
|
|
if i1 is not None or i2 is not None: |
29
|
|
|
if len(params) > 0: |
30
|
|
|
raise AssertionError( |
31
|
|
|
"Stream cannot be accessed both by index and by timestamp at the same time.") |
32
|
|
|
if i1 is not None: |
33
|
|
|
params["i1"] = i1 |
34
|
|
|
if i2 is not None: |
35
|
|
|
params["i2"] = i2 |
36
|
|
|
|
37
|
|
|
if transform is not None: |
38
|
|
|
params["transform"] = transform |
39
|
|
|
return params |
40
|
|
|
|
41
|
|
|
|
42
|
|
|
class Stream(ConnectorObject): |
43
|
|
|
def create(self, schema, **kwargs): |
44
|
|
|
"""Creates a stream given a JSON schema encoded as a python dict. You can also add other properties |
45
|
|
|
of the stream, such as the icon, datatype or description. Create accepts both a string schema and |
46
|
|
|
a dict-encoded schema.""" |
47
|
|
|
if isinstance(schema,basestring): |
48
|
|
|
strschema = schema |
49
|
|
|
schema = json.loads(schema) |
50
|
|
|
else: |
51
|
|
|
strschema = json.dumps(schema) |
52
|
|
|
Draft4Validator.check_schema(schema) |
53
|
|
|
kwargs["schema"] = strschema |
54
|
|
|
self.metadata = self.db.create(self.path, kwargs).json() |
55
|
|
|
|
56
|
|
|
def insert_array(self, datapoint_array, restamp=False): |
57
|
|
|
"""given an array of datapoints, inserts them to the stream. This is different from insert(), |
58
|
|
|
because it requires an array of valid datapoints, whereas insert only requires the data portion |
59
|
|
|
of the datapoint, and fills out the rest:: |
60
|
|
|
|
61
|
|
|
s = cdb["mystream"] |
62
|
|
|
s.create({"type": "number"}) |
63
|
|
|
|
64
|
|
|
s.insert_array([{"d": 4, "t": time.time()},{"d": 5, "t": time.time()}], restamp=False) |
65
|
|
|
|
66
|
|
|
The optional `restamp` parameter specifies whether or not the database should rewrite the timestamps |
67
|
|
|
of datapoints which have a timestamp that is less than one that already exists in the database. |
68
|
|
|
|
69
|
|
|
That is, if restamp is False, and a datapoint has a timestamp less than a datapoint that already |
70
|
|
|
exists in the database, then the insert will fail. If restamp is True, then all datapoints |
71
|
|
|
with timestamps below the datapoints already in the database will have their timestamps overwritten |
72
|
|
|
to the same timestamp as the most recent datapoint hat already exists in the database, and the insert will |
73
|
|
|
succeed. |
74
|
|
|
""" |
75
|
|
|
if restamp: |
76
|
|
|
self.db.update(self.path + "/data", datapoint_array) |
77
|
|
|
else: |
78
|
|
|
self.db.create(self.path + "/data", datapoint_array) |
79
|
|
|
|
80
|
|
|
def insert(self, data): |
81
|
|
|
"""insert inserts one datapoint with the given data, and appends it to |
82
|
|
|
the end of the stream:: |
83
|
|
|
|
84
|
|
|
s = cdb["mystream"] |
85
|
|
|
|
86
|
|
|
s.create({"type": "string"}) |
87
|
|
|
|
88
|
|
|
s.insert("Hello World!") |
89
|
|
|
|
90
|
|
|
""" |
91
|
|
|
self.insert_array([{"d": data}], restamp=True) |
92
|
|
|
|
93
|
|
|
def subscribe(self, callback, transform="", downlink=False): |
94
|
|
|
"""Subscribes to the stream, running the callback function each time datapoints are inserted into |
95
|
|
|
the given stream. There is an optional transform to the datapoints, and a downlink parameter.:: |
96
|
|
|
|
97
|
|
|
s = cdb["mystream"] |
98
|
|
|
|
99
|
|
|
def subscription_callback(stream,data): |
100
|
|
|
print stream, data |
101
|
|
|
|
102
|
|
|
s.subscribe(subscription_callback) |
103
|
|
|
|
104
|
|
|
The downlink parameter is for downlink streams - it allows to subscribe to the downlink substream, |
105
|
|
|
before it is acknowledged. This is especially useful for something like lights - have lights be |
106
|
|
|
a boolean downlink stream, and the light itself be subscribed to the downlink, so that other |
107
|
|
|
devices can write to the light, turning it on and off:: |
108
|
|
|
|
109
|
|
|
def light_control(stream,data): |
110
|
|
|
light_boolean = data[0]["d"] |
111
|
|
|
print "Setting light to", light_boolean |
112
|
|
|
set_light(light_boolean) |
113
|
|
|
|
114
|
|
|
#Acknowledge the write |
115
|
|
|
return True |
116
|
|
|
|
117
|
|
|
# We don't care about intermediate values, we only want the most recent setting |
118
|
|
|
# of the light, meaning we want the "if last" transform |
119
|
|
|
s.subscribe(light_control, downlink=True, transform="if last") |
120
|
|
|
|
121
|
|
|
""" |
122
|
|
|
streampath = self.path |
123
|
|
|
if downlink: |
124
|
|
|
streampath += "/downlink" |
125
|
|
|
|
126
|
|
|
return self.db.subscribe(streampath, callback, transform) |
127
|
|
|
|
128
|
|
|
def unsubscribe(self, transform="", downlink=False): |
129
|
|
|
"""Unsubscribes from a previously subscribed stream. Note that the same values of transform |
130
|
|
|
and downlink must be passed in order to do the correct unsubscribe:: |
131
|
|
|
|
132
|
|
|
s.subscribe(callback,transform="if last") |
133
|
|
|
s.unsubscribe(transform="if last") |
134
|
|
|
""" |
135
|
|
|
streampath = self.path |
136
|
|
|
if downlink: |
137
|
|
|
streampath += "/downlink" |
138
|
|
|
|
139
|
|
|
return self.db.unsubscribe(streampath, transform) |
140
|
|
|
|
141
|
|
|
def __call__(self, t1=None, t2=None, limit=None, i1=None, i2=None, transform=None): |
142
|
|
|
"""By calling the stream as a function, you can query it by either time range or index, |
143
|
|
|
and further you can perform a custom transform on the stream:: |
144
|
|
|
|
145
|
|
|
#Returns all datapoints with their data < 50 from the past minute |
146
|
|
|
stream(t1=time.time()-60, transform="if $ < 50") |
147
|
|
|
|
148
|
|
|
#Performs an aggregation on the stream, returning a single datapoint |
149
|
|
|
#which contains the sum of the datapoints |
150
|
|
|
stream(transform="sum | if last") |
151
|
|
|
|
152
|
|
|
""" |
153
|
|
|
params = query_maker(t1, t2, limit, i1, i2, transform) |
154
|
|
|
|
155
|
|
|
# In order to avoid accidental requests for full streams, ConnectorDB does not permit requests |
156
|
|
|
# without any url parameters, so we set i1=0 if we are requesting the full stream |
157
|
|
|
if len(params) == 0: |
158
|
|
|
params["i1"] = 0 |
159
|
|
|
|
160
|
|
|
return self.db.read(self.path + "/data", params).json() |
161
|
|
|
|
162
|
|
|
def __getitem__(self, getrange): |
163
|
|
|
"""Allows accessing the stream just as if it were just one big python array. |
164
|
|
|
An example:: |
165
|
|
|
|
166
|
|
|
#Returns the most recent 5 datapoints from the stream |
167
|
|
|
stream[-5:] |
168
|
|
|
|
169
|
|
|
#Returns all the data the stream holds. |
170
|
|
|
stream[:] |
171
|
|
|
|
172
|
|
|
In order to perform transforms on the stream and to aggreagate data, look at __call__, |
173
|
|
|
which allows getting index ranges along with a transform. |
174
|
|
|
""" |
175
|
|
|
if not isinstance(getrange, slice): |
176
|
|
|
# Return the single datapoint |
177
|
|
|
return self(i1=getrange, i2=getrange + 1)[0] |
178
|
|
|
|
179
|
|
|
# The query is a slice - return the range |
180
|
|
|
return self(i1=getrange.start, i2=getrange.stop) |
181
|
|
|
|
182
|
|
|
def __len__(self): |
183
|
|
|
"""taking len(stream) returns the number of datapoints saved within the database for the stream""" |
184
|
|
|
return int(self.db.read(self.path + "/data", {"q": "length"}).text) |
185
|
|
|
|
186
|
|
|
def __repr__(self): |
187
|
|
|
"""Returns a string representation of the stream""" |
188
|
|
|
return "[Stream:%s]" % (self.path, ) |
189
|
|
|
|
190
|
|
|
# ----------------------------------------------------------------------- |
191
|
|
|
# Following are getters and setters of the stream's properties |
192
|
|
|
|
193
|
|
|
@property |
194
|
|
|
def datatype(self): |
195
|
|
|
"""returns the stream's registered datatype. The datatype suggests how the stream can be processed.""" |
196
|
|
|
if "datatype" in self.data: |
197
|
|
|
return self.data["datatype"] |
198
|
|
|
return "" |
199
|
|
|
@datatype.setter |
200
|
|
|
def datatype(self,set_datatype): |
201
|
|
|
self.set({"datatype": set_datatype}) |
202
|
|
|
|
203
|
|
|
@property |
204
|
|
|
def downlink(self): |
205
|
|
|
"""returns whether the stream is a downlink, meaning that it accepts input (like turning lights on/off)""" |
206
|
|
|
if "downlink" in self.data: |
207
|
|
|
return self.data["downlink"] |
208
|
|
|
return False |
209
|
|
|
|
210
|
|
|
@downlink.setter |
211
|
|
|
def downlink(self, is_downlink): |
212
|
|
|
self.set({"downlink": is_downlink}) |
213
|
|
|
|
214
|
|
|
@property |
215
|
|
|
def ephemeral(self): |
216
|
|
|
"""returns whether the stream is ephemeral, meaning that data is not saved, but just passes through the messaging system.""" |
217
|
|
|
if "ephemeral" in self.data: |
218
|
|
|
return self.data["ephemeral"] |
219
|
|
|
return False |
220
|
|
|
|
221
|
|
|
@ephemeral.setter |
222
|
|
|
def ephemeral(self, is_ephemeral): |
223
|
|
|
"""sets whether the stream is ephemeral, meaning that it sets whether the datapoints are saved in the database. |
224
|
|
|
an ephemeral stream is useful for things which are set very frequently, and which could want a subscription, but |
225
|
|
|
which are not important enough to be saved in the database""" |
226
|
|
|
self.set({"ephemeral": is_ephemeral}) |
227
|
|
|
|
228
|
|
|
@property |
229
|
|
|
def schema(self): |
230
|
|
|
"""Returns the JSON schema of the stream as a python dict""" |
231
|
|
|
if "schema" in self.data: |
232
|
|
|
return json.loads(self.data["schema"]) |
233
|
|
|
return None |
234
|
|
|
|
235
|
|
|
@property |
236
|
|
|
def user(self): |
237
|
|
|
"""user returns the user which owns the given stream""" |
238
|
|
|
return User(self.db, self.path.split("/")[0]) |
239
|
|
|
|
240
|
|
|
@property |
241
|
|
|
def device(self): |
242
|
|
|
"""returns the device which owns the given stream""" |
243
|
|
|
splitted_path = self.path.split("/") |
244
|
|
|
|
245
|
|
|
return Device(self.db, |
246
|
|
|
splitted_path[0] + "/" + splitted_path[1]) |
247
|
|
|
|
248
|
|
|
|
249
|
|
|
# The import has to go on the bottom because py3 imports are annoying |
250
|
|
|
from ._user import User |
251
|
|
|
from ._device import Device |
252
|
|
|
|