GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Completed
Push — master ( 3d6413...7ed725 )
by Daniel
01:08
created

Stream.insert()   A

Complexity

Conditions 1

Size

Total Lines 12

Duplication

Lines 0
Ratio 0 %

Importance

Changes 3
Bugs 0 Features 0
Metric Value
cc 1
c 3
b 0
f 0
dl 0
loc 12
rs 9.4285
1
from __future__ import absolute_import
2
3
from ._connectorobject import ConnectorObject
4
5
from jsonschema import Draft4Validator
6
import json
7
8
# https://github.com/oxplot/fysom/issues/1
9
try:
10
    unicode = unicode
11
except NameError:
12
    basestring = (str,bytes)
13
14
15
def query_maker(t1=None, t2=None, limit=None, i1=None, i2=None, transform=None):
16
    """query_maker takes the optional arguments and constructs a json query for a stream's
17
    datapoints using it::
18
        #{"t1": 5, "transform": "if $ > 5"}
19
        print query_maker(t1=5,transform="if $ > 5")
20
    """
21
    params = {}
22
    if t1 is not None:
23
        params["t1"] = t1
24
    if t2 is not None:
25
        params["t2"] = t2
26
    if limit is not None:
27
        params["limit"] = limit
28
    if i1 is not None or i2 is not None:
29
        if len(params) > 0:
30
            raise AssertionError(
31
                "Stream cannot be accessed both by index and by timestamp at the same time.")
32
        if i1 is not None:
33
            params["i1"] = i1
34
        if i2 is not None:
35
            params["i2"] = i2
36
37
    if transform is not None:
38
        params["transform"] = transform
39
    return params
40
41
42
class Stream(ConnectorObject):
43
    def create(self, schema, **kwargs):
44
        """Creates a stream given a JSON schema encoded as a python dict. You can also add other properties
45
        of the stream, such as the icon, datatype or description. Create accepts both a string schema and
46
        a dict-encoded schema."""
47
        if isinstance(schema,basestring):
48
            strschema = schema
49
            schema = json.loads(schema)
50
        else:
51
            strschema = json.dumps(schema)
52
        Draft4Validator.check_schema(schema)
53
        kwargs["schema"] = strschema
54
        self.metadata = self.db.create(self.path, kwargs).json()
55
56
    def insert_array(self, datapoint_array, restamp=False):
57
        """given an array of datapoints, inserts them to the stream. This is different from insert(),
58
        because it requires an array of valid datapoints, whereas insert only requires the data portion
59
        of the datapoint, and fills out the rest::
60
61
            s = cdb["mystream"]
62
            s.create({"type": "number"})
63
64
            s.insert_array([{"d": 4, "t": time.time()},{"d": 5, "t": time.time()}], restamp=False)
65
66
        The optional `restamp` parameter specifies whether or not the database should rewrite the timestamps
67
        of datapoints which have a timestamp that is less than one that already exists in the database.
68
69
        That is, if restamp is False, and a datapoint has a timestamp less than a datapoint that already
70
        exists in the database, then the insert will fail. If restamp is True, then all datapoints
71
        with timestamps below the datapoints already in the database will have their timestamps overwritten
72
        to the same timestamp as the most recent datapoint hat already exists in the database, and the insert will
73
        succeed.
74
        """
75
        if restamp:
76
            self.db.update(self.path + "/data", datapoint_array)
77
        else:
78
            self.db.create(self.path + "/data", datapoint_array)
79
80
    def insert(self, data):
81
        """insert inserts one datapoint with the given data, and appends it to
82
        the end of the stream::
83
84
            s = cdb["mystream"]
85
86
            s.create({"type": "string"})
87
88
            s.insert("Hello World!")
89
90
        """
91
        self.insert_array([{"d": data}], restamp=True)
92
93
    def subscribe(self, callback, transform="", downlink=False):
94
        """Subscribes to the stream, running the callback function each time datapoints are inserted into
95
        the given stream. There is an optional transform to the datapoints, and a downlink parameter.::
96
97
            s = cdb["mystream"]
98
99
            def subscription_callback(stream,data):
100
                print stream, data
101
102
            s.subscribe(subscription_callback)
103
104
        The downlink parameter is for downlink streams - it allows to subscribe to the downlink substream,
105
        before it is acknowledged. This is especially useful for something like lights - have lights be
106
        a boolean downlink stream, and the light itself be subscribed to the downlink, so that other
107
        devices can write to the light, turning it on and off::
108
109
            def light_control(stream,data):
110
                light_boolean = data[0]["d"]
111
                print "Setting light to", light_boolean
112
                set_light(light_boolean)
113
114
                #Acknowledge the write
115
                return True
116
117
            # We don't care about intermediate values, we only want the most recent setting
118
            # of the light, meaning we want the "if last" transform
119
            s.subscribe(light_control, downlink=True, transform="if last")
120
121
        """
122
        streampath = self.path
123
        if downlink:
124
            streampath += "/downlink"
125
126
        return self.db.subscribe(streampath, callback, transform)
127
128
    def unsubscribe(self, transform="", downlink=False):
129
        """Unsubscribes from a previously subscribed stream. Note that the same values of transform
130
        and downlink must be passed in order to do the correct unsubscribe::
131
132
            s.subscribe(callback,transform="if last")
133
            s.unsubscribe(transform="if last")
134
        """
135
        streampath = self.path
136
        if downlink:
137
            streampath += "/downlink"
138
139
        return self.db.unsubscribe(streampath, transform)
140
141
    def __call__(self, t1=None, t2=None, limit=None, i1=None, i2=None, transform=None):
142
        """By calling the stream as a function, you can query it by either time range or index,
143
        and further you can perform a custom transform on the stream::
144
145
            #Returns all datapoints with their data < 50 from the past minute
146
            stream(t1=time.time()-60, transform="if $ < 50")
147
148
            #Performs an aggregation on the stream, returning a single datapoint
149
            #which contains the sum of the datapoints
150
            stream(transform="sum | if last")
151
152
        """
153
        params = query_maker(t1, t2, limit, i1, i2, transform)
154
155
        # In order to avoid accidental requests for full streams, ConnectorDB does not permit requests
156
        # without any url parameters, so we set i1=0 if we are requesting the full stream
157
        if len(params) == 0:
158
            params["i1"] = 0
159
160
        return self.db.read(self.path + "/data", params).json()
161
162
    def __getitem__(self, getrange):
163
        """Allows accessing the stream just as if it were just one big python array.
164
        An example::
165
166
            #Returns the most recent 5 datapoints from the stream
167
            stream[-5:]
168
169
            #Returns all the data the stream holds.
170
            stream[:]
171
172
        In order to perform transforms on the stream and to aggreagate data, look at __call__,
173
        which allows getting index ranges along with a transform.
174
        """
175
        if not isinstance(getrange, slice):
176
            # Return the single datapoint
177
            return self(i1=getrange, i2=getrange + 1)[0]
178
179
        # The query is a slice - return the range
180
        return self(i1=getrange.start, i2=getrange.stop)
181
182
    def __len__(self):
183
        """taking len(stream) returns the number of datapoints saved within the database for the stream"""
184
        return int(self.db.read(self.path + "/data", {"q": "length"}).text)
185
186
    def __repr__(self):
187
        """Returns a string representation of the stream"""
188
        return "[Stream:%s]" % (self.path, )
189
190
    # -----------------------------------------------------------------------
191
    # Following are getters and setters of the stream's properties
192
193
    @property
194
    def datatype(self):
195
        """returns the stream's registered datatype. The datatype suggests how the stream can be processed."""
196
        if "datatype" in self.data:
197
            return self.data["datatype"]
198
        return ""
199
    @datatype.setter
200
    def datatype(self,set_datatype):
201
        self.set({"datatype": set_datatype})
202
203
    @property
204
    def downlink(self):
205
        """returns whether the stream is a downlink, meaning that it accepts input (like turning lights on/off)"""
206
        if "downlink" in self.data:
207
            return self.data["downlink"]
208
        return False
209
210
    @downlink.setter
211
    def downlink(self, is_downlink):
212
        self.set({"downlink": is_downlink})
213
214
    @property
215
    def ephemeral(self):
216
        """returns whether the stream is ephemeral, meaning that data is not saved, but just passes through the messaging system."""
217
        if "ephemeral" in self.data:
218
            return self.data["ephemeral"]
219
        return False
220
221
    @ephemeral.setter
222
    def ephemeral(self, is_ephemeral):
223
        """sets whether the stream is ephemeral, meaning that it sets whether the datapoints are saved in the database.
224
        an ephemeral stream is useful for things which are set very frequently, and which could want a subscription, but
225
        which are not important enough to be saved in the database"""
226
        self.set({"ephemeral": is_ephemeral})
227
228
    @property
229
    def schema(self):
230
        """Returns the JSON schema of the stream as a python dict"""
231
        if "schema" in self.data:
232
            return json.loads(self.data["schema"])
233
        return None
234
235
    @property
236
    def user(self):
237
        """user returns the user which owns the given stream"""
238
        return User(self.db, self.path.split("/")[0])
239
240
    @property
241
    def device(self):
242
        """returns the device which owns the given stream"""
243
        splitted_path = self.path.split("/")
244
245
        return Device(self.db,
246
                              splitted_path[0] + "/" + splitted_path[1])
247
248
249
# The import has to go on the bottom because py3 imports are annoying
250
from ._user import User
251
from ._device import Device
252