GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Completed
Push — master ( 40fbfa...d5d9b4 )
by Daniel
51s
created

Stream.create()   A

Complexity

Conditions 2

Size

Total Lines 12

Duplication

Lines 0
Ratio 0 %

Importance

Changes 3
Bugs 0 Features 0
Metric Value
cc 2
c 3
b 0
f 0
dl 0
loc 12
rs 9.4285
1
from __future__ import absolute_import
2
import json
3
import os
4
5
from ._connectorobject import ConnectorObject
6
7
from jsonschema import Draft4Validator
8
import json
9
10
# https://github.com/oxplot/fysom/issues/1
11
try:
12
    unicode = unicode
13
except NameError:
14
    basestring = (str, bytes)
15
16
DATAPOINT_INSERT_LIMIT = 5000
17
18
19
def query_maker(t1=None, t2=None, limit=None, i1=None, i2=None, transform=None, downlink=False):
20
    """query_maker takes the optional arguments and constructs a json query for a stream's
21
    datapoints using it::
22
        #{"t1": 5, "transform": "if $ > 5"}
23
        print query_maker(t1=5,transform="if $ > 5")
24
    """
25
    params = {}
26
    if t1 is not None:
27
        params["t1"] = t1
28
    if t2 is not None:
29
        params["t2"] = t2
30
    if limit is not None:
31
        params["limit"] = limit
32
    if i1 is not None or i2 is not None:
33
        if len(params) > 0:
34
            raise AssertionError(
35
                "Stream cannot be accessed both by index and by timestamp at the same time.")
36
        if i1 is not None:
37
            params["i1"] = i1
38
        if i2 is not None:
39
            params["i2"] = i2
40
41
    # If no range is given, query whole stream
42
    if len(params) == 0:
43
        params["i1"] = 0
44
        params["i2"] = 0
45
46
    if transform is not None:
47
        params["transform"] = transform
48
    if downlink:
49
        params["downlink"] = True
50
51
    return params
52
53
54
class Stream(ConnectorObject):
55
56
    def create(self, schema="{}", **kwargs):
57
        """Creates a stream given an optional JSON schema encoded as a python dict. You can also add other properties
58
        of the stream, such as the icon, datatype or description. Create accepts both a string schema and
59
        a dict-encoded schema."""
60
        if isinstance(schema, basestring):
61
            strschema = schema
62
            schema = json.loads(schema)
63
        else:
64
            strschema = json.dumps(schema)
65
        Draft4Validator.check_schema(schema)
66
        kwargs["schema"] = strschema
67
        self.metadata = self.db.create(self.path, kwargs).json()
68
69
    def insert_array(self, datapoint_array, restamp=False):
70
        """given an array of datapoints, inserts them to the stream. This is different from insert(),
71
        because it requires an array of valid datapoints, whereas insert only requires the data portion
72
        of the datapoint, and fills out the rest::
73
74
            s = cdb["mystream"]
75
            s.create({"type": "number"})
76
77
            s.insert_array([{"d": 4, "t": time.time()},{"d": 5, "t": time.time()}], restamp=False)
78
79
        The optional `restamp` parameter specifies whether or not the database should rewrite the timestamps
80
        of datapoints which have a timestamp that is less than one that already exists in the database.
81
82
        That is, if restamp is False, and a datapoint has a timestamp less than a datapoint that already
83
        exists in the database, then the insert will fail. If restamp is True, then all datapoints
84
        with timestamps below the datapoints already in the database will have their timestamps overwritten
85
        to the same timestamp as the most recent datapoint hat already exists in the database, and the insert will
86
        succeed.
87
        """
88
89
        # To be safe, we split into chunks
90
        while (len(datapoint_array) > DATAPOINT_INSERT_LIMIT):
91
            # We insert datapoints in chunks of a couple thousand so that they
92
            # fit in the insert size limit of ConnectorDB
93
            a = datapoint_array[:DATAPOINT_INSERT_LIMIT]
94
95
            if restamp:
96
                self.db.update(self.path + "/data", a)
97
            else:
98
                self.db.create(self.path + "/data", a)
99
100
            # Clear the written datapoints
101
            datapoint_array = datapoint_array[DATAPOINT_INSERT_LIMIT:]
102
103
        if restamp:
104
            self.db.update(self.path + "/data", datapoint_array)
105
        else:
106
            self.db.create(self.path + "/data", datapoint_array)
107
108
    def insert(self, data):
109
        """insert inserts one datapoint with the given data, and appends it to
110
        the end of the stream::
111
112
            s = cdb["mystream"]
113
114
            s.create({"type": "string"})
115
116
            s.insert("Hello World!")
117
118
        """
119
        self.insert_array([{"d": data}], restamp=True)
120
121
    def append(self, data):
122
        """ Same as insert, using the pythonic array name """
123
        self.insert(data)
124
125
    def subscribe(self, callback, transform="", downlink=False):
126
        """Subscribes to the stream, running the callback function each time datapoints are inserted into
127
        the given stream. There is an optional transform to the datapoints, and a downlink parameter.::
128
129
            s = cdb["mystream"]
130
131
            def subscription_callback(stream,data):
132
                print stream, data
133
134
            s.subscribe(subscription_callback)
135
136
        The downlink parameter is for downlink streams - it allows to subscribe to the downlink substream,
137
        before it is acknowledged. This is especially useful for something like lights - have lights be
138
        a boolean downlink stream, and the light itself be subscribed to the downlink, so that other
139
        devices can write to the light, turning it on and off::
140
141
            def light_control(stream,data):
142
                light_boolean = data[0]["d"]
143
                print "Setting light to", light_boolean
144
                set_light(light_boolean)
145
146
                #Acknowledge the write
147
                return True
148
149
            # We don't care about intermediate values, we only want the most recent setting
150
            # of the light, meaning we want the "if last" transform
151
            s.subscribe(light_control, downlink=True, transform="if last")
152
153
        """
154
        streampath = self.path
155
        if downlink:
156
            streampath += "/downlink"
157
158
        return self.db.subscribe(streampath, callback, transform)
159
160
    def unsubscribe(self, transform="", downlink=False):
161
        """Unsubscribes from a previously subscribed stream. Note that the same values of transform
162
        and downlink must be passed in order to do the correct unsubscribe::
163
164
            s.subscribe(callback,transform="if last")
165
            s.unsubscribe(transform="if last")
166
        """
167
        streampath = self.path
168
        if downlink:
169
            streampath += "/downlink"
170
171
        return self.db.unsubscribe(streampath, transform)
172
173
    def __call__(self, t1=None, t2=None, limit=None, i1=None, i2=None, downlink=False, transform=None):
174
        """By calling the stream as a function, you can query it by either time range or index,
175
        and further you can perform a custom transform on the stream::
176
177
            #Returns all datapoints with their data < 50 from the past minute
178
            stream(t1=time.time()-60, transform="if $ < 50")
179
180
            #Performs an aggregation on the stream, returning a single datapoint
181
            #which contains the sum of the datapoints
182
            stream(transform="sum | if last")
183
184
        """
185
        params = query_maker(t1, t2, limit, i1, i2, transform, downlink)
186
187
        # In order to avoid accidental requests for full streams, ConnectorDB does not permit requests
188
        # without any url parameters, so we set i1=0 if we are requesting the
189
        # full stream
190
        if len(params) == 0:
191
            params["i1"] = 0
192
193
        return self.db.read(self.path + "/data", params).json()
194
195
    def __getitem__(self, getrange):
196
        """Allows accessing the stream just as if it were just one big python array.
197
        An example::
198
199
            #Returns the most recent 5 datapoints from the stream
200
            stream[-5:]
201
202
            #Returns all the data the stream holds.
203
            stream[:]
204
205
        In order to perform transforms on the stream and to aggreagate data, look at __call__,
206
        which allows getting index ranges along with a transform.
207
        """
208
        if not isinstance(getrange, slice):
209
            # Return the single datapoint
210
            return self(i1=getrange, i2=getrange + 1)[0]
211
212
        # The query is a slice - return the range
213
        return self(i1=getrange.start, i2=getrange.stop)
214
215
    def length(self, downlink=False):
216
        return int(self.db.read(self.path + "/data", {"q": "length", "downlink": downlink}).text)
217
218
    def __len__(self):
219
        """taking len(stream) returns the number of datapoints saved within the database for the stream"""
220
        return self.length()
221
222
    def __repr__(self):
223
        """Returns a string representation of the stream"""
224
        return "[Stream:%s]" % (self.path, )
225
226
    def export(self, directory):
227
        """Exports the stream to the given directory. The directory can't exist. 
228
        You can later import this device by running import_stream on a device.
229
        """
230
        if os.path.exists(directory):
231
            raise FileExistsError(
232
                "The stream export directory already exists")
233
234
        os.mkdir(directory)
235
236
        # Write the stream's info
237
        with open(os.path.join(directory, "stream.json"), "w") as f:
238
            json.dump(self.data, f)
239
240
        # Now write the stream's data
241
        with open(os.path.join(directory, "data.json"), "w") as f:
242
            json.dump(self[:], f)
243
244
        # And if the stream is a downlink, write the downlink data
245
        if self.downlink:
246
            with open(os.path.join(directory, "downlink.json"), "w") as f:
247
                json.dump(self(i1=0, i2=0, downlink=True), f)
248
249
    # -----------------------------------------------------------------------
250
    # Following are getters and setters of the stream's properties
251
252
    @property
253
    def datatype(self):
254
        """returns the stream's registered datatype. The datatype suggests how the stream can be processed."""
255
        if "datatype" in self.data:
256
            return self.data["datatype"]
257
        return ""
258
259
    @datatype.setter
260
    def datatype(self, set_datatype):
261
        self.set({"datatype": set_datatype})
262
263
    @property
264
    def downlink(self):
265
        """returns whether the stream is a downlink, meaning that it accepts input (like turning lights on/off)"""
266
        if "downlink" in self.data:
267
            return self.data["downlink"]
268
        return False
269
270
    @downlink.setter
271
    def downlink(self, is_downlink):
272
        self.set({"downlink": is_downlink})
273
274
    @property
275
    def ephemeral(self):
276
        """returns whether the stream is ephemeral, meaning that data is not saved, but just passes through the messaging system."""
277
        if "ephemeral" in self.data:
278
            return self.data["ephemeral"]
279
        return False
280
281
    @ephemeral.setter
282
    def ephemeral(self, is_ephemeral):
283
        """sets whether the stream is ephemeral, meaning that it sets whether the datapoints are saved in the database.
284
        an ephemeral stream is useful for things which are set very frequently, and which could want a subscription, but
285
        which are not important enough to be saved in the database"""
286
        self.set({"ephemeral": is_ephemeral})
287
288
    @property
289
    def schema(self):
290
        """Returns the JSON schema of the stream as a python dict."""
291
        if "schema" in self.data:
292
            return json.loads(self.data["schema"])
293
        return None
294
295
    @property
296
    def sschema(self):
297
        """Returns the JSON schema of the stream as a string"""
298
        if "schema" in self.data:
299
            return self.data["schema"]
300
        return None
301
302
    @schema.setter
303
    def schema(self, schema):
304
        """sets the stream's schema. An empty schema is "{}". The schemas allow you to set a specific data type. 
305
        Both python dicts and strings are accepted."""
306
        if isinstance(schema, basestring):
307
            strschema = schema
308
            schema = json.loads(schema)
309
        else:
310
            strschema = json.dumps(schema)
311
        Draft4Validator.check_schema(schema)
312
        self.set({"schema": strschema})
313
314
    @property
315
    def user(self):
316
        """user returns the user which owns the given stream"""
317
        return User(self.db, self.path.split("/")[0])
318
319
    @property
320
    def device(self):
321
        """returns the device which owns the given stream"""
322
        splitted_path = self.path.split("/")
323
324
        return Device(self.db,
325
                      splitted_path[0] + "/" + splitted_path[1])
326
327
328
# The import has to go on the bottom because py3 imports are annoying
329
from ._user import User
330
from ._device import Device
331