1
|
|
|
from __future__ import absolute_import |
2
|
|
|
|
3
|
|
|
import datetime |
4
|
|
|
import json |
5
|
|
|
import os.path |
6
|
|
|
|
7
|
|
|
class DatapointArray(list): |
8
|
|
|
""" Sometimes you might want to generate a stream by combining multiple disparate |
9
|
|
|
data sources. Since ConnectorDB streams currently only permit appending, |
10
|
|
|
a stream's data must be ordered correctly. |
11
|
|
|
|
12
|
|
|
The DatapointArray allows you to load all the data you'd need into a single object, |
13
|
|
|
and it merges the data together and formats it to be compatible with ConnectorDB |
14
|
|
|
""" |
15
|
|
|
|
16
|
|
|
def __init__(self, data = []): |
17
|
|
|
"""The data |
18
|
|
|
|
19
|
|
|
""" |
20
|
|
|
list.__init__(self,data) |
21
|
|
|
|
22
|
|
|
def __add__(self,other): |
23
|
|
|
return DatapointArray(self).merge(other) |
24
|
|
|
def __radd__(self,other): |
25
|
|
|
return DatapointArray(self).merge(other) |
26
|
|
|
|
27
|
|
|
def __getitem__(self,key): |
28
|
|
|
if (key=="t"): |
29
|
|
|
return self.t() |
30
|
|
|
if (key=="d"): |
31
|
|
|
return self.d() |
32
|
|
|
d = list.__getitem__(self,key) |
33
|
|
|
if isinstance(key, slice): |
34
|
|
|
d = DatapointArray(d) |
35
|
|
|
# If the data is unchanged, don't recompute the keys |
36
|
|
|
if not self._dataChanged: |
37
|
|
|
d._dataChanged = False |
38
|
|
|
d._d = self._d[key] |
39
|
|
|
d._t = self._t[key] |
40
|
|
|
return d |
41
|
|
|
|
42
|
|
|
def sort(self,f = lambda d: d["t"]): |
43
|
|
|
"""Sort here works by sorting by timestamp by default""" |
44
|
|
|
list.sort(self,key=f) |
45
|
|
|
return self |
46
|
|
|
|
47
|
|
|
def d(self): |
48
|
|
|
"""Returns just the data portion of the datapoints as a list""" |
49
|
|
|
return list(map(lambda x: x["d"],self.raw())) |
50
|
|
|
def t(self): |
51
|
|
|
"""Returns just the timestamp portion of the datapoints as a list. |
52
|
|
|
The timestamps are in python datetime's date format.""" |
53
|
|
|
return list(map(lambda x: datetime.datetime.fromtimestamp(x["t"]),self.raw())) |
54
|
|
|
|
55
|
|
|
def merge(self,array): |
56
|
|
|
"""Adds the given array of datapoints to the generator. |
57
|
|
|
It assumes that the datapoints are formatted correctly for ConnectorDB, meaning |
58
|
|
|
that they are in the format:: |
59
|
|
|
|
60
|
|
|
[{"t": unix timestamp, "d": data}] |
61
|
|
|
|
62
|
|
|
The data does NOT need to be sorted by timestamp - this function sorts it for you |
63
|
|
|
""" |
64
|
|
|
self.extend(array) |
65
|
|
|
self.sort() |
66
|
|
|
|
67
|
|
|
return self |
68
|
|
|
|
69
|
|
|
def raw(self): |
70
|
|
|
"""Returns array as a raw python array. For cases where for some reason |
71
|
|
|
the DatapointArray wrapper does not work for you |
72
|
|
|
|
73
|
|
|
""" |
74
|
|
|
return list.__getitem__(self,slice(None,None)) |
75
|
|
|
|
76
|
|
|
def writeJSON(self,filename): |
77
|
|
|
"""Writes the data to the given file:: |
78
|
|
|
|
79
|
|
|
DatapointArray([{"t": unix timestamp, "d": data}]).writeJSON("myfile.json") |
80
|
|
|
|
81
|
|
|
The data can later be loaded using loadJSON. |
82
|
|
|
""" |
83
|
|
|
with open(filename, "w") as f: |
84
|
|
|
json.dump(self, f) |
85
|
|
|
|
86
|
|
|
def loadJSON(self,filename): |
87
|
|
|
"""Adds the data from a JSON file. The file is expected to be in datapoint format:: |
88
|
|
|
|
89
|
|
|
d = DatapointArray().loadJSON("myfile.json") |
90
|
|
|
""" |
91
|
|
|
with open(filename, "r") as f: |
92
|
|
|
self.merge(json.load(f)) |
93
|
|
|
return self |
94
|
|
|
|
95
|
|
|
def loadExport(self,folder): |
96
|
|
|
"""Adds the data from a ConnectorDB export. If it is a stream export, then the folder |
97
|
|
|
is the location of the export. If it is a device export, then the folder is the export folder |
98
|
|
|
with the stream name as a subdirectory |
99
|
|
|
|
100
|
|
|
If it is a user export, you will use the path of the export folder, with the user/device/stream |
101
|
|
|
appended to the end:: |
102
|
|
|
|
103
|
|
|
myuser.export("./exportdir") |
104
|
|
|
DatapointArray().loadExport("./exportdir/username/devicename/streamname") |
105
|
|
|
""" |
106
|
|
|
self.loadJSON(os.path.join(folder,"data.json")) |
107
|
|
|
return self |
108
|
|
|
|
109
|
|
|
def tshift(self,t): |
110
|
|
|
"""Shifts all timestamps in the datapoint array by the given number of seconds. |
111
|
|
|
It is the same as the 'tshift' pipescript transform. |
112
|
|
|
|
113
|
|
|
Warning: The shift is performed in-place! This means that it modifies the underlying array:: |
114
|
|
|
|
115
|
|
|
d = DatapointArray([{"t":56,"d":1}]) |
116
|
|
|
d.tshift(20) |
117
|
|
|
print(d) # [{"t":76,"d":1}] |
118
|
|
|
""" |
119
|
|
|
raw = self.raw() |
120
|
|
|
for i in range(len(raw)): |
121
|
|
|
raw[i]["t"] += t |
122
|
|
|
return self |
123
|
|
|
|
124
|
|
|
def sum(self): |
125
|
|
|
"""Gets the sum of the data portions of all datapoints within""" |
126
|
|
|
raw = self.raw() |
127
|
|
|
s = 0 |
128
|
|
|
for i in range(len(raw)): |
129
|
|
|
s += raw[i]["d"] |
130
|
|
|
return s |
131
|
|
|
|
132
|
|
|
def mean(self): |
133
|
|
|
"""Gets the mean of the data portions of all datapoints within""" |
134
|
|
|
return self.sum()/float(len(self)) |
135
|
|
|
|