1
|
|
|
from .._stream import Stream, query_maker
|
2
|
|
|
|
3
|
|
|
|
4
|
|
|
def get_stream(cdb, stream):
|
5
|
|
|
if isinstance(stream, Stream):
|
6
|
|
|
return stream.path
|
7
|
|
|
elif stream.count("/") == 0:
|
8
|
|
|
return cdb.path + "/" + stream
|
9
|
|
|
elif stream.count("/") == 2:
|
10
|
|
|
return stream
|
11
|
|
|
else:
|
12
|
|
|
raise Exception("Stream '%s' invalid" % (stream, ))
|
13
|
|
|
|
14
|
|
|
|
15
|
|
|
class Merge(object):
|
16
|
|
|
"""Merge represents a query which allows to merge multiple streams into one
|
17
|
|
|
when reading, with all the streams merged together by increasing timestamp.
|
18
|
|
|
The merge query is used as a constructor-type object::
|
19
|
|
|
|
20
|
|
|
m = Merge(cdb)
|
21
|
|
|
m.addStream("mystream1",t1=time.time()-10)
|
22
|
|
|
m.addStream("mystream2",t1=time.time()-10)
|
23
|
|
|
result = m.run()
|
24
|
|
|
"""
|
25
|
|
|
|
26
|
|
|
def __init__(self, cdb):
|
27
|
|
|
"""Given a ConnectorDB object, begins the construction of a Merge query"""
|
28
|
|
|
self.cdb = cdb
|
29
|
|
|
|
30
|
|
|
self.query = []
|
31
|
|
|
|
32
|
|
|
def addStream(self, stream, t1=None, t2=None, limit=None, i1=None, i2=None, transform=None):
|
33
|
|
|
"""Adds the given stream to the query construction. The function supports both stream
|
34
|
|
|
names and Stream objects."""
|
35
|
|
|
params = query_maker(t1, t2, limit, i1, i2, transform)
|
36
|
|
|
|
37
|
|
|
params["stream"] = get_stream(self.cdb, stream)
|
38
|
|
|
|
39
|
|
|
# Now add the stream to the query parameters
|
40
|
|
|
self.query.append(params)
|
41
|
|
|
|
42
|
|
|
def run(self):
|
43
|
|
|
"""Runs the merge query, and returns the result"""
|
44
|
|
|
return self.cdb.db.query("merge", self.query)
|
45
|
|
|
|