GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Completed
Push — master ( 7ed725...c09028 )
by Daniel
01:06
created

Stream.length()   A

Complexity

Conditions 1

Size

Total Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
dl 0
loc 2
rs 10
c 0
b 0
f 0
1
from __future__ import absolute_import
2
3
from ._connectorobject import ConnectorObject
4
5
from jsonschema import Draft4Validator
6
import json
7
8
# https://github.com/oxplot/fysom/issues/1
9
try:
10
    unicode = unicode
11
except NameError:
12
    basestring = (str,bytes)
13
14
15
def query_maker(t1=None, t2=None, limit=None, i1=None, i2=None, transform=None, downlink=False):
16
    """query_maker takes the optional arguments and constructs a json query for a stream's
17
    datapoints using it::
18
        #{"t1": 5, "transform": "if $ > 5"}
19
        print query_maker(t1=5,transform="if $ > 5")
20
    """
21
    params = {}
22
    if t1 is not None:
23
        params["t1"] = t1
24
    if t2 is not None:
25
        params["t2"] = t2
26
    if limit is not None:
27
        params["limit"] = limit
28
    if i1 is not None or i2 is not None:
29
        if len(params) > 0:
30
            raise AssertionError(
31
                "Stream cannot be accessed both by index and by timestamp at the same time.")
32
        if i1 is not None:
33
            params["i1"] = i1
34
        if i2 is not None:
35
            params["i2"] = i2
36
    
37
    # If no range is given, query whole stream
38
    if len(params) == 0:
39
        params["i1"] = 0
40
        params["i2"] = 0
41
42
    if transform is not None:
43
        params["transform"] = transform
44
    if downlink:
45
        params["downlink"] = True
46
        
47
    return params
48
49
50
class Stream(ConnectorObject):
51
    def create(self, schema, **kwargs):
52
        """Creates a stream given a JSON schema encoded as a python dict. You can also add other properties
53
        of the stream, such as the icon, datatype or description. Create accepts both a string schema and
54
        a dict-encoded schema."""
55
        if isinstance(schema,basestring):
56
            strschema = schema
57
            schema = json.loads(schema)
58
        else:
59
            strschema = json.dumps(schema)
60
        Draft4Validator.check_schema(schema)
61
        kwargs["schema"] = strschema
62
        self.metadata = self.db.create(self.path, kwargs).json()
63
64
    def insert_array(self, datapoint_array, restamp=False):
65
        """given an array of datapoints, inserts them to the stream. This is different from insert(),
66
        because it requires an array of valid datapoints, whereas insert only requires the data portion
67
        of the datapoint, and fills out the rest::
68
69
            s = cdb["mystream"]
70
            s.create({"type": "number"})
71
72
            s.insert_array([{"d": 4, "t": time.time()},{"d": 5, "t": time.time()}], restamp=False)
73
74
        The optional `restamp` parameter specifies whether or not the database should rewrite the timestamps
75
        of datapoints which have a timestamp that is less than one that already exists in the database.
76
77
        That is, if restamp is False, and a datapoint has a timestamp less than a datapoint that already
78
        exists in the database, then the insert will fail. If restamp is True, then all datapoints
79
        with timestamps below the datapoints already in the database will have their timestamps overwritten
80
        to the same timestamp as the most recent datapoint hat already exists in the database, and the insert will
81
        succeed.
82
        """
83
        if restamp:
84
            self.db.update(self.path + "/data", datapoint_array)
85
        else:
86
            self.db.create(self.path + "/data", datapoint_array)
87
88
    def insert(self, data):
89
        """insert inserts one datapoint with the given data, and appends it to
90
        the end of the stream::
91
92
            s = cdb["mystream"]
93
94
            s.create({"type": "string"})
95
96
            s.insert("Hello World!")
97
98
        """
99
        self.insert_array([{"d": data}], restamp=True)
100
        
101
    def append(self,data):
102
        """ Same as insert, using the pythonic array name """
103
        self.insert(data)
104
105
    def subscribe(self, callback, transform="", downlink=False):
106
        """Subscribes to the stream, running the callback function each time datapoints are inserted into
107
        the given stream. There is an optional transform to the datapoints, and a downlink parameter.::
108
109
            s = cdb["mystream"]
110
111
            def subscription_callback(stream,data):
112
                print stream, data
113
114
            s.subscribe(subscription_callback)
115
116
        The downlink parameter is for downlink streams - it allows to subscribe to the downlink substream,
117
        before it is acknowledged. This is especially useful for something like lights - have lights be
118
        a boolean downlink stream, and the light itself be subscribed to the downlink, so that other
119
        devices can write to the light, turning it on and off::
120
121
            def light_control(stream,data):
122
                light_boolean = data[0]["d"]
123
                print "Setting light to", light_boolean
124
                set_light(light_boolean)
125
126
                #Acknowledge the write
127
                return True
128
129
            # We don't care about intermediate values, we only want the most recent setting
130
            # of the light, meaning we want the "if last" transform
131
            s.subscribe(light_control, downlink=True, transform="if last")
132
133
        """
134
        streampath = self.path
135
        if downlink:
136
            streampath += "/downlink"
137
138
        return self.db.subscribe(streampath, callback, transform)
139
140
    def unsubscribe(self, transform="", downlink=False):
141
        """Unsubscribes from a previously subscribed stream. Note that the same values of transform
142
        and downlink must be passed in order to do the correct unsubscribe::
143
144
            s.subscribe(callback,transform="if last")
145
            s.unsubscribe(transform="if last")
146
        """
147
        streampath = self.path
148
        if downlink:
149
            streampath += "/downlink"
150
151
        return self.db.unsubscribe(streampath, transform)
152
153
    def __call__(self, t1=None, t2=None, limit=None, i1=None, i2=None, downlink=False, transform=None):
154
        """By calling the stream as a function, you can query it by either time range or index,
155
        and further you can perform a custom transform on the stream::
156
157
            #Returns all datapoints with their data < 50 from the past minute
158
            stream(t1=time.time()-60, transform="if $ < 50")
159
160
            #Performs an aggregation on the stream, returning a single datapoint
161
            #which contains the sum of the datapoints
162
            stream(transform="sum | if last")
163
164
        """
165
        params = query_maker(t1, t2, limit, i1, i2, transform,downlink)
166
167
        # In order to avoid accidental requests for full streams, ConnectorDB does not permit requests
168
        # without any url parameters, so we set i1=0 if we are requesting the full stream
169
        if len(params) == 0:
170
            params["i1"] = 0
171
172
        return self.db.read(self.path + "/data", params).json()
173
174
    def __getitem__(self, getrange):
175
        """Allows accessing the stream just as if it were just one big python array.
176
        An example::
177
178
            #Returns the most recent 5 datapoints from the stream
179
            stream[-5:]
180
181
            #Returns all the data the stream holds.
182
            stream[:]
183
184
        In order to perform transforms on the stream and to aggreagate data, look at __call__,
185
        which allows getting index ranges along with a transform.
186
        """
187
        if not isinstance(getrange, slice):
188
            # Return the single datapoint
189
            return self(i1=getrange, i2=getrange + 1)[0]
190
191
        # The query is a slice - return the range
192
        return self(i1=getrange.start, i2=getrange.stop)
193
        
194
    def length(self,downlink=False):
195
        return int(self.db.read(self.path + "/data", {"q": "length","downlink":downlink}).text)
196
197
    def __len__(self):
198
        """taking len(stream) returns the number of datapoints saved within the database for the stream"""
199
        return self.length()
200
201
    def __repr__(self):
202
        """Returns a string representation of the stream"""
203
        return "[Stream:%s]" % (self.path, )
204
205
    # -----------------------------------------------------------------------
206
    # Following are getters and setters of the stream's properties
207
208
    @property
209
    def datatype(self):
210
        """returns the stream's registered datatype. The datatype suggests how the stream can be processed."""
211
        if "datatype" in self.data:
212
            return self.data["datatype"]
213
        return ""
214
    @datatype.setter
215
    def datatype(self,set_datatype):
216
        self.set({"datatype": set_datatype})
217
218
    @property
219
    def downlink(self):
220
        """returns whether the stream is a downlink, meaning that it accepts input (like turning lights on/off)"""
221
        if "downlink" in self.data:
222
            return self.data["downlink"]
223
        return False
224
225
    @downlink.setter
226
    def downlink(self, is_downlink):
227
        self.set({"downlink": is_downlink})
228
229
    @property
230
    def ephemeral(self):
231
        """returns whether the stream is ephemeral, meaning that data is not saved, but just passes through the messaging system."""
232
        if "ephemeral" in self.data:
233
            return self.data["ephemeral"]
234
        return False
235
236
    @ephemeral.setter
237
    def ephemeral(self, is_ephemeral):
238
        """sets whether the stream is ephemeral, meaning that it sets whether the datapoints are saved in the database.
239
        an ephemeral stream is useful for things which are set very frequently, and which could want a subscription, but
240
        which are not important enough to be saved in the database"""
241
        self.set({"ephemeral": is_ephemeral})
242
243
    @property
244
    def schema(self):
245
        """Returns the JSON schema of the stream as a python dict"""
246
        if "schema" in self.data:
247
            return json.loads(self.data["schema"])
248
        return None
249
250
    @property
251
    def user(self):
252
        """user returns the user which owns the given stream"""
253
        return User(self.db, self.path.split("/")[0])
254
255
    @property
256
    def device(self):
257
        """returns the device which owns the given stream"""
258
        splitted_path = self.path.split("/")
259
260
        return Device(self.db,
261
                              splitted_path[0] + "/" + splitted_path[1])
262
263
264
# The import has to go on the bottom because py3 imports are annoying
265
from ._user import User
266
from ._device import Device
267