1
|
|
|
from __future__ import absolute_import |
2
|
|
|
|
3
|
|
|
from .._stream import Stream, query_maker |
4
|
|
|
from .merge import Merge, get_stream |
5
|
|
|
import six |
6
|
|
|
|
7
|
|
|
|
8
|
|
|
# param_stream adds the stream correctly into the query (depending on what stream parameter was given) |
9
|
|
|
def param_stream(cdb, params, stream): |
10
|
|
|
if isinstance(stream, Merge): |
11
|
|
|
params["merge"] = stream.query |
12
|
|
|
else: |
13
|
|
|
params["stream"] = get_stream(cdb, stream) |
14
|
|
|
|
15
|
|
|
|
16
|
|
|
class Dataset(object): |
17
|
|
|
"""ConnectorDB is capable of taking several separate unrelated streams, and based upon |
18
|
|
|
the chosen interpolation method, putting them all together to generate tabular data centered about |
19
|
|
|
either another stream's datapoints, or based upon time intervals. |
20
|
|
|
|
21
|
|
|
The underlying issue that Datasets solve is that in ConnectorDB, streams are inherently unrelated. |
22
|
|
|
In most data stores, such as standard relational (SQL) databases, and even excel spreadsheets, data is in tabular |
23
|
|
|
form. That is, if we have measurements of temperature in our house and our mood, we have a table: |
24
|
|
|
|
25
|
|
|
+--------------+----------------------+ |
26
|
|
|
| Mood Rating | Room Temperature (F) | |
27
|
|
|
+==============+======================+ |
28
|
|
|
| 7 | 73 | |
29
|
|
|
+--------------+----------------------+ |
30
|
|
|
| 3 | 84 | |
31
|
|
|
+--------------+----------------------+ |
32
|
|
|
| 5 | 79 | |
33
|
|
|
+--------------+----------------------+ |
34
|
|
|
|
35
|
|
|
The benefit of having such a table is that it is easy to perform data analysis. You know which temperature |
36
|
|
|
value corresponds to which mood rating. The downside of having such tables |
37
|
|
|
is that Mood Rating and Room Temperature must be directly related - a temperature measurement must be made |
38
|
|
|
each time a mood rating is given. ConnectorDB has no such restrictions. Mood Rating and Room Temperature |
39
|
|
|
can be entirely separate sensors, which update data at their own rate. In ConnectorDB, each stream |
40
|
|
|
can be inserted with any timestamp, and without regard for any other streams. |
41
|
|
|
|
42
|
|
|
This separation of Streams makes data require some preprocessing and interpolation before it can be used |
43
|
|
|
for analysis. This is the purpose of the Dataset query. ConnectorDB can put several streams together based |
44
|
|
|
upon chosen transforms and interpolators, returning a tabular structure which can readily be used for ML |
45
|
|
|
and statistical applications. |
46
|
|
|
|
47
|
|
|
There are two types of dataset queries |
48
|
|
|
|
49
|
|
|
:T-Dataset: |
50
|
|
|
|
51
|
|
|
T-Dataset: A dataset query which is generated based upon a time range. That is, you choose a time range and a |
52
|
|
|
time difference between elements of the dataset, and that is used to generate your dataset. |
53
|
|
|
|
54
|
|
|
+--------------+----------------------+ |
55
|
|
|
| Timestamp | Room Temperature (F) | |
56
|
|
|
+==============+======================+ |
57
|
|
|
| 1pm | 73 | |
58
|
|
|
+--------------+----------------------+ |
59
|
|
|
| 4pm | 84 | |
60
|
|
|
+--------------+----------------------+ |
61
|
|
|
| 8pm | 79 | |
62
|
|
|
+--------------+----------------------+ |
63
|
|
|
|
64
|
|
|
If I were to generate a T-dataset from 12pm to 8pm with dt=2 hours, using the interpolator "closest", |
65
|
|
|
I would get the following result: |
66
|
|
|
|
67
|
|
|
+--------------+----------------------+ |
68
|
|
|
| Timestamp | Room Temperature (F) | |
69
|
|
|
+==============+======================+ |
70
|
|
|
| 12pm | 73 | |
71
|
|
|
+--------------+----------------------+ |
72
|
|
|
| 2pm | 73 | |
73
|
|
|
+--------------+----------------------+ |
74
|
|
|
| 4pm | 84 | |
75
|
|
|
+--------------+----------------------+ |
76
|
|
|
| 6pm | 84 | |
77
|
|
|
+--------------+----------------------+ |
78
|
|
|
| 8pm | 79 | |
79
|
|
|
+--------------+----------------------+ |
80
|
|
|
|
81
|
|
|
The "closest" interpolator happens to return the datapoint closest to the given timestamp. There are many |
82
|
|
|
interpolators to choose from (described later). |
83
|
|
|
|
84
|
|
|
Hint: T-Datasets can be useful for plotting data (such as daily or weekly averages). |
85
|
|
|
|
86
|
|
|
:X-Dataset: |
87
|
|
|
X-datasets allow to generate datasets based not on evenly spaced timestamps, but based upon a stream's values |
88
|
|
|
|
89
|
|
|
Suppose you have the following data: |
90
|
|
|
|
91
|
|
|
+-----------+--------------+---+-----------+----------------------+ |
92
|
|
|
| Timestamp | Mood Rating | | Timestamp | Room Temperature (F) | |
93
|
|
|
+===========+==============+===+===========+======================+ |
94
|
|
|
| 1pm | 7 | | 2pm | 73 | |
95
|
|
|
+-----------+--------------+---+-----------+----------------------+ |
96
|
|
|
| 4pm | 3 | | 5pm | 84 | |
97
|
|
|
+-----------+--------------+---+-----------+----------------------+ |
98
|
|
|
| 11pm | 5 | | 8pm | 81 | |
99
|
|
|
+-----------+--------------+---+-----------+----------------------+ |
100
|
|
|
| | | | 11pm | 79 | |
101
|
|
|
+-----------+--------------+---+-----------+----------------------+ |
102
|
|
|
|
103
|
|
|
An X-dataset with X=Mood Rating, and the interpolator "closest" on Room Temperature would generate: |
104
|
|
|
|
105
|
|
|
+--------------+----------------------+ |
106
|
|
|
| Mood Rating | Room Temperature (F) | |
107
|
|
|
+==============+======================+ |
108
|
|
|
| 7 | 73 | |
109
|
|
|
+--------------+----------------------+ |
110
|
|
|
| 3 | 84 | |
111
|
|
|
+--------------+----------------------+ |
112
|
|
|
| 5 | 79 | |
113
|
|
|
+--------------+----------------------+ |
114
|
|
|
|
115
|
|
|
:Interpolators: |
116
|
|
|
|
117
|
|
|
Interpolators are special functions which specify how exactly the data is supposed to be combined |
118
|
|
|
into a dataset. There are several interpolators, such as "before", "after", "closest" which work |
119
|
|
|
on any type of datapoint, and there are more advanced interpolators which require a certain datatype |
120
|
|
|
such as the "sum" or "average" interpolator (which require numerical type). |
121
|
|
|
|
122
|
|
|
In order to get detailed documentation on the exact interpolators that the version of ConnectorDB you are |
123
|
|
|
are connected to supports, you can do the following:: |
124
|
|
|
|
125
|
|
|
cdb = connectordb.ConnectorDB(apikey) |
126
|
|
|
info = cdb.info() |
127
|
|
|
# Prints out all the supported interpolators and their associated documentation |
128
|
|
|
print info["interpolators"] |
129
|
|
|
|
130
|
|
|
""" |
131
|
|
|
|
132
|
|
View Code Duplication |
def __init__(self, cdb, x=None, t1=None, t2=None, dt=None, limit=None, i1=None, i2=None, transform=None, posttransform=None): |
|
|
|
|
133
|
|
|
"""In order to begin dataset generation, you need to specify the reference time range or stream. |
134
|
|
|
|
135
|
|
|
To generate a T-dataset:: |
136
|
|
|
d = Dataset(cdb, t1=start, t2=end, dt=tchange) |
137
|
|
|
To generate an X-dataset:: |
138
|
|
|
d = Dataset(cdb,"mystream", i1=start, i2=end) |
139
|
|
|
|
140
|
|
|
Note that everywhere you insert a stream name, you are also free to insert Stream objects |
141
|
|
|
or even Merge queries. The Dataset query in ConnectorDB supports merges natively for each field. |
142
|
|
|
|
143
|
|
|
The only "special" field in this query is the "posttransform". This is a special transform to run on the |
144
|
|
|
entire row of data after the all of the interpolations complete. |
145
|
|
|
""" |
146
|
|
|
self.cdb = cdb |
147
|
|
|
self.query = query_maker(t1, t2, limit, i1, i2, transform) |
148
|
|
|
|
149
|
|
|
if x is not None: |
150
|
|
|
if dt is not None: |
151
|
|
|
raise Exception( |
152
|
|
|
"Can't do both T-dataset and X-dataset at the same time") |
153
|
|
|
# Add the stream to the query as the X-dataset |
154
|
|
|
param_stream(self.cdb, self.query, x) |
155
|
|
|
elif dt is not None: |
156
|
|
|
self.query["dt"] = dt |
157
|
|
|
else: |
158
|
|
|
raise Exception("Dataset must have either x or dt parameter") |
159
|
|
|
|
160
|
|
|
if posttransform is not None: |
161
|
|
|
self.query["posttransform"] = posttransform |
162
|
|
|
|
163
|
|
|
self.query["dataset"] = {} |
164
|
|
|
|
165
|
|
View Code Duplication |
def addStream(self, stream, interpolator="closest", t1=None, t2=None, dt=None, limit=None, i1=None, i2=None, transform=None,colname=None): |
|
|
|
|
166
|
|
|
"""Adds the given stream to the query construction. Additionally, you can choose the interpolator to use for this stream, as well as a special name |
167
|
|
|
for the column in the returned dataset. If no column name is given, the full stream path will be used. |
168
|
|
|
|
169
|
|
|
addStream also supports Merge queries. You can insert a merge query instead of a stream, but be sure to name the column:: |
170
|
|
|
|
171
|
|
|
d = Dataset(cdb, t1=time.time()-1000,t2=time.time(),dt=10.) |
172
|
|
|
d.addStream("temperature","average") |
173
|
|
|
d.addStream("steps","sum") |
174
|
|
|
|
175
|
|
|
m = Merge(cdb) |
176
|
|
|
m.addStream("mystream") |
177
|
|
|
m.addStream("mystream2") |
178
|
|
|
d.addStream(m,colname="mycolumn") |
179
|
|
|
|
180
|
|
|
result = d.run() |
181
|
|
|
""" |
182
|
|
|
|
183
|
|
|
streamquery = query_maker(t1, t2, limit, i1, i2, transform) |
184
|
|
|
param_stream(self.cdb, streamquery, stream) |
185
|
|
|
|
186
|
|
|
streamquery["interpolator"] = interpolator |
187
|
|
|
|
188
|
|
|
if colname is None: |
189
|
|
|
# What do we call this column? |
190
|
|
|
if isinstance(stream, six.string_types): |
191
|
|
|
colname = stream |
192
|
|
|
elif isinstance(stream, Stream): |
193
|
|
|
colname = stream.path |
194
|
|
|
else: |
195
|
|
|
raise Exception( |
196
|
|
|
"Could not find a name for the column! use the 'colname' parameter.") |
197
|
|
|
|
198
|
|
|
if colname in self.query["dataset"] or colname is "x": |
199
|
|
|
raise Exception( |
200
|
|
|
"The column name either exists, or is labeled 'x'. Use the colname parameter to change the column name.") |
201
|
|
|
|
202
|
|
|
self.query["dataset"][colname] = streamquery |
203
|
|
|
|
204
|
|
|
def run(self): |
205
|
|
|
"""Runs the dataset query, and returns the result""" |
206
|
|
|
return self.cdb.db.query("dataset", self.query) |
207
|
|
|
|