|
1
|
|
|
from __future__ import absolute_import |
|
2
|
|
|
|
|
3
|
|
|
from .._stream import Stream, query_maker |
|
4
|
|
|
from .merge import Merge, get_stream |
|
5
|
|
|
import six |
|
6
|
|
|
|
|
7
|
|
|
|
|
8
|
|
|
# param_stream adds the stream correctly into the query (depending on what stream parameter was given) |
|
9
|
|
|
def param_stream(cdb, params, stream): |
|
10
|
|
|
if isinstance(stream, Merge): |
|
11
|
|
|
params["merge"] = stream.query |
|
12
|
|
|
else: |
|
13
|
|
|
params["stream"] = get_stream(cdb, stream) |
|
14
|
|
|
|
|
15
|
|
|
|
|
16
|
|
|
class Dataset(object): |
|
17
|
|
|
"""ConnectorDB is capable of taking several separate unrelated streams, and based upon |
|
18
|
|
|
the chosen interpolation method, putting them all together to generate tabular data centered about |
|
19
|
|
|
either another stream's datapoints, or based upon time intervals. |
|
20
|
|
|
|
|
21
|
|
|
The underlying issue that Datasets solve is that in ConnectorDB, streams are inherently unrelated. |
|
22
|
|
|
In most data stores, such as standard relational (SQL) databases, and even excel spreadsheets, data is in tabular |
|
23
|
|
|
form. That is, if we have measurements of temperature in our house and our mood, we have a table: |
|
24
|
|
|
|
|
25
|
|
|
+--------------+----------------------+ |
|
26
|
|
|
| Mood Rating | Room Temperature (F) | |
|
27
|
|
|
+==============+======================+ |
|
28
|
|
|
| 7 | 73 | |
|
29
|
|
|
+--------------+----------------------+ |
|
30
|
|
|
| 3 | 84 | |
|
31
|
|
|
+--------------+----------------------+ |
|
32
|
|
|
| 5 | 79 | |
|
33
|
|
|
+--------------+----------------------+ |
|
34
|
|
|
|
|
35
|
|
|
The benefit of having such a table is that it is easy to perform data analysis. You know which temperature |
|
36
|
|
|
value corresponds to which mood rating. The downside of having such tables |
|
37
|
|
|
is that Mood Rating and Room Temperature must be directly related - a temperature measurement must be made |
|
38
|
|
|
each time a mood rating is given. ConnectorDB has no such restrictions. Mood Rating and Room Temperature |
|
39
|
|
|
can be entirely separate sensors, which update data at their own rate. In ConnectorDB, each stream |
|
40
|
|
|
can be inserted with any timestamp, and without regard for any other streams. |
|
41
|
|
|
|
|
42
|
|
|
This separation of Streams makes data require some preprocessing and interpolation before it can be used |
|
43
|
|
|
for analysis. This is the purpose of the Dataset query. ConnectorDB can put several streams together based |
|
44
|
|
|
upon chosen transforms and interpolators, returning a tabular structure which can readily be used for ML |
|
45
|
|
|
and statistical applications. |
|
46
|
|
|
|
|
47
|
|
|
There are two types of dataset queries |
|
48
|
|
|
|
|
49
|
|
|
:T-Dataset: |
|
50
|
|
|
|
|
51
|
|
|
T-Dataset: A dataset query which is generated based upon a time range. That is, you choose a time range and a |
|
52
|
|
|
time difference between elements of the dataset, and that is used to generate your dataset. |
|
53
|
|
|
|
|
54
|
|
|
+--------------+----------------------+ |
|
55
|
|
|
| Timestamp | Room Temperature (F) | |
|
56
|
|
|
+==============+======================+ |
|
57
|
|
|
| 1pm | 73 | |
|
58
|
|
|
+--------------+----------------------+ |
|
59
|
|
|
| 4pm | 84 | |
|
60
|
|
|
+--------------+----------------------+ |
|
61
|
|
|
| 8pm | 79 | |
|
62
|
|
|
+--------------+----------------------+ |
|
63
|
|
|
|
|
64
|
|
|
If I were to generate a T-dataset from 12pm to 8pm with dt=2 hours, using the interpolator "closest", |
|
65
|
|
|
I would get the following result: |
|
66
|
|
|
|
|
67
|
|
|
+--------------+----------------------+ |
|
68
|
|
|
| Timestamp | Room Temperature (F) | |
|
69
|
|
|
+==============+======================+ |
|
70
|
|
|
| 12pm | 73 | |
|
71
|
|
|
+--------------+----------------------+ |
|
72
|
|
|
| 2pm | 73 | |
|
73
|
|
|
+--------------+----------------------+ |
|
74
|
|
|
| 4pm | 84 | |
|
75
|
|
|
+--------------+----------------------+ |
|
76
|
|
|
| 6pm | 84 | |
|
77
|
|
|
+--------------+----------------------+ |
|
78
|
|
|
| 8pm | 79 | |
|
79
|
|
|
+--------------+----------------------+ |
|
80
|
|
|
|
|
81
|
|
|
The "closest" interpolator happens to return the datapoint closest to the given timestamp. There are many |
|
82
|
|
|
interpolators to choose from (described later). |
|
83
|
|
|
|
|
84
|
|
|
Hint: T-Datasets can be useful for plotting data (such as daily or weekly averages). |
|
85
|
|
|
|
|
86
|
|
|
:X-Dataset: |
|
87
|
|
|
X-datasets allow to generate datasets based not on evenly spaced timestamps, but based upon a stream's values |
|
88
|
|
|
|
|
89
|
|
|
Suppose you have the following data: |
|
90
|
|
|
|
|
91
|
|
|
+-----------+--------------+---+-----------+----------------------+ |
|
92
|
|
|
| Timestamp | Mood Rating | | Timestamp | Room Temperature (F) | |
|
93
|
|
|
+===========+==============+===+===========+======================+ |
|
94
|
|
|
| 1pm | 7 | | 2pm | 73 | |
|
95
|
|
|
+-----------+--------------+---+-----------+----------------------+ |
|
96
|
|
|
| 4pm | 3 | | 5pm | 84 | |
|
97
|
|
|
+-----------+--------------+---+-----------+----------------------+ |
|
98
|
|
|
| 11pm | 5 | | 8pm | 81 | |
|
99
|
|
|
+-----------+--------------+---+-----------+----------------------+ |
|
100
|
|
|
| | | | 11pm | 79 | |
|
101
|
|
|
+-----------+--------------+---+-----------+----------------------+ |
|
102
|
|
|
|
|
103
|
|
|
An X-dataset with X=Mood Rating, and the interpolator "closest" on Room Temperature would generate: |
|
104
|
|
|
|
|
105
|
|
|
+--------------+----------------------+ |
|
106
|
|
|
| Mood Rating | Room Temperature (F) | |
|
107
|
|
|
+==============+======================+ |
|
108
|
|
|
| 7 | 73 | |
|
109
|
|
|
+--------------+----------------------+ |
|
110
|
|
|
| 3 | 84 | |
|
111
|
|
|
+--------------+----------------------+ |
|
112
|
|
|
| 5 | 79 | |
|
113
|
|
|
+--------------+----------------------+ |
|
114
|
|
|
|
|
115
|
|
|
:Interpolators: |
|
116
|
|
|
|
|
117
|
|
|
Interpolators are special functions which specify how exactly the data is supposed to be combined |
|
118
|
|
|
into a dataset. There are several interpolators, such as "before", "after", "closest" which work |
|
119
|
|
|
on any type of datapoint, and there are more advanced interpolators which require a certain datatype |
|
120
|
|
|
such as the "sum" or "average" interpolator (which require numerical type). |
|
121
|
|
|
|
|
122
|
|
|
In order to get detailed documentation on the exact interpolators that the version of ConnectorDB you are |
|
123
|
|
|
are connected to supports, you can do the following:: |
|
124
|
|
|
|
|
125
|
|
|
cdb = connectordb.ConnectorDB(apikey) |
|
126
|
|
|
info = cdb.info() |
|
127
|
|
|
# Prints out all the supported interpolators and their associated documentation |
|
128
|
|
|
print info["interpolators"] |
|
129
|
|
|
|
|
130
|
|
|
""" |
|
131
|
|
|
|
|
132
|
|
View Code Duplication |
def __init__(self, cdb, x=None, t1=None, t2=None, dt=None, limit=None, i1=None, i2=None, transform=None, posttransform=None): |
|
|
|
|
|
|
133
|
|
|
"""In order to begin dataset generation, you need to specify the reference time range or stream. |
|
134
|
|
|
|
|
135
|
|
|
To generate a T-dataset:: |
|
136
|
|
|
d = Dataset(cdb, t1=start, t2=end, dt=tchange) |
|
137
|
|
|
To generate an X-dataset:: |
|
138
|
|
|
d = Dataset(cdb,"mystream", i1=start, i2=end) |
|
139
|
|
|
|
|
140
|
|
|
Note that everywhere you insert a stream name, you are also free to insert Stream objects |
|
141
|
|
|
or even Merge queries. The Dataset query in ConnectorDB supports merges natively for each field. |
|
142
|
|
|
|
|
143
|
|
|
The only "special" field in this query is the "posttransform". This is a special transform to run on the |
|
144
|
|
|
entire row of data after the all of the interpolations complete. |
|
145
|
|
|
""" |
|
146
|
|
|
self.cdb = cdb |
|
147
|
|
|
self.query = query_maker(t1, t2, limit, i1, i2, transform) |
|
148
|
|
|
|
|
149
|
|
|
if x is not None: |
|
150
|
|
|
if dt is not None: |
|
151
|
|
|
raise Exception( |
|
152
|
|
|
"Can't do both T-dataset and X-dataset at the same time") |
|
153
|
|
|
# Add the stream to the query as the X-dataset |
|
154
|
|
|
param_stream(self.cdb, self.query, x) |
|
155
|
|
|
elif dt is not None: |
|
156
|
|
|
self.query["dt"] = dt |
|
157
|
|
|
else: |
|
158
|
|
|
raise Exception("Dataset must have either x or dt parameter") |
|
159
|
|
|
|
|
160
|
|
|
if posttransform is not None: |
|
161
|
|
|
self.query["posttransform"] = posttransform |
|
162
|
|
|
|
|
163
|
|
|
self.query["dataset"] = {} |
|
164
|
|
|
|
|
165
|
|
View Code Duplication |
def addStream(self, stream, interpolator="closest", t1=None, t2=None, dt=None, limit=None, i1=None, i2=None, transform=None,colname=None): |
|
|
|
|
|
|
166
|
|
|
"""Adds the given stream to the query construction. Additionally, you can choose the interpolator to use for this stream, as well as a special name |
|
167
|
|
|
for the column in the returned dataset. If no column name is given, the full stream path will be used. |
|
168
|
|
|
|
|
169
|
|
|
addStream also supports Merge queries. You can insert a merge query instead of a stream, but be sure to name the column:: |
|
170
|
|
|
|
|
171
|
|
|
d = Dataset(cdb, t1=time.time()-1000,t2=time.time(),dt=10.) |
|
172
|
|
|
d.addStream("temperature","average") |
|
173
|
|
|
d.addStream("steps","sum") |
|
174
|
|
|
|
|
175
|
|
|
m = Merge(cdb) |
|
176
|
|
|
m.addStream("mystream") |
|
177
|
|
|
m.addStream("mystream2") |
|
178
|
|
|
d.addStream(m,colname="mycolumn") |
|
179
|
|
|
|
|
180
|
|
|
result = d.run() |
|
181
|
|
|
""" |
|
182
|
|
|
|
|
183
|
|
|
streamquery = query_maker(t1, t2, limit, i1, i2, transform) |
|
184
|
|
|
param_stream(self.cdb, streamquery, stream) |
|
185
|
|
|
|
|
186
|
|
|
streamquery["interpolator"] = interpolator |
|
187
|
|
|
|
|
188
|
|
|
if colname is None: |
|
189
|
|
|
# What do we call this column? |
|
190
|
|
|
if isinstance(stream, six.string_types): |
|
191
|
|
|
colname = stream |
|
192
|
|
|
elif isinstance(stream, Stream): |
|
193
|
|
|
colname = stream.path |
|
194
|
|
|
else: |
|
195
|
|
|
raise Exception( |
|
196
|
|
|
"Could not find a name for the column! use the 'colname' parameter.") |
|
197
|
|
|
|
|
198
|
|
|
if colname in self.query["dataset"] or colname is "x": |
|
199
|
|
|
raise Exception( |
|
200
|
|
|
"The column name either exists, or is labeled 'x'. Use the colname parameter to change the column name.") |
|
201
|
|
|
|
|
202
|
|
|
self.query["dataset"][colname] = streamquery |
|
203
|
|
|
|
|
204
|
|
|
def run(self): |
|
205
|
|
|
"""Runs the dataset query, and returns the result""" |
|
206
|
|
|
return self.cdb.db.query("dataset", self.query) |
|
207
|
|
|
|