GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.

Stream   B
last analyzed

Complexity

Total Complexity 40

Size/Duplication

Total Lines 272
Duplicated Lines 0 %

Importance

Changes 3
Bugs 0 Features 0
Metric Value
c 3
b 0
f 0
dl 0
loc 272
rs 8.2608
wmc 40

19 Methods

Rating   Name   Duplication   Size   Complexity  
A datatype() 0 6 1
B subscribe() 0 34 2
A device() 0 7 1
A user() 0 4 1
A create() 0 12 2
A ephemeral() 0 6 1
A length() 0 2 1
A sschema() 0 6 2
A __call__() 0 21 2
A __getitem__() 0 19 2
B insert_array() 0 38 4
A unsubscribe() 0 12 2
A append() 0 3 1
A schema() 0 6 2
B export() 0 22 4
A downlink() 0 6 1
A __len__() 0 3 1
A __repr__() 0 3 1
A insert() 0 12 1

How to fix   Complexity   

Complex Class

Complex classes like Stream often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
from __future__ import absolute_import
2
import json
3
import os
4
5
from ._connectorobject import ConnectorObject
6
from ._datapointarray import DatapointArray
7
8
from jsonschema import Draft4Validator
9
import json
10
import time
11
12
# https://github.com/oxplot/fysom/issues/1
13
try:
14
    unicode = unicode
15
except NameError:
16
    basestring = (str, bytes)
17
18
DATAPOINT_INSERT_LIMIT = 5000
19
20
21
def query_maker(t1=None, t2=None, limit=None, i1=None, i2=None, transform=None, downlink=False):
22
    """query_maker takes the optional arguments and constructs a json query for a stream's
23
    datapoints using it::
24
        #{"t1": 5, "transform": "if $ > 5"}
25
        print query_maker(t1=5,transform="if $ > 5")
26
    """
27
    params = {}
28
    if t1 is not None:
29
        params["t1"] = t1
30
    if t2 is not None:
31
        params["t2"] = t2
32
    if limit is not None:
33
        params["limit"] = limit
34
    if i1 is not None or i2 is not None:
35
        if len(params) > 0:
36
            raise AssertionError(
37
                "Stream cannot be accessed both by index and by timestamp at the same time.")
38
        if i1 is not None:
39
            params["i1"] = i1
40
        if i2 is not None:
41
            params["i2"] = i2
42
43
    # If no range is given, query whole stream
44
    if len(params) == 0:
45
        params["i1"] = 0
46
        params["i2"] = 0
47
48
    if transform is not None:
49
        params["transform"] = transform
50
    if downlink:
51
        params["downlink"] = True
52
53
    return params
54
55
56
class Stream(ConnectorObject):
57
58
    def create(self, schema="{}", **kwargs):
59
        """Creates a stream given an optional JSON schema encoded as a python dict. You can also add other properties
60
        of the stream, such as the icon, datatype or description. Create accepts both a string schema and
61
        a dict-encoded schema."""
62
        if isinstance(schema, basestring):
63
            strschema = schema
64
            schema = json.loads(schema)
65
        else:
66
            strschema = json.dumps(schema)
67
        Draft4Validator.check_schema(schema)
68
        kwargs["schema"] = strschema
69
        self.metadata = self.db.create(self.path, kwargs).json()
70
71
    def insert_array(self, datapoint_array, restamp=False):
72
        """given an array of datapoints, inserts them to the stream. This is different from insert(),
73
        because it requires an array of valid datapoints, whereas insert only requires the data portion
74
        of the datapoint, and fills out the rest::
75
76
            s = cdb["mystream"]
77
            s.create({"type": "number"})
78
79
            s.insert_array([{"d": 4, "t": time.time()},{"d": 5, "t": time.time()}], restamp=False)
80
81
        The optional `restamp` parameter specifies whether or not the database should rewrite the timestamps
82
        of datapoints which have a timestamp that is less than one that already exists in the database.
83
84
        That is, if restamp is False, and a datapoint has a timestamp less than a datapoint that already
85
        exists in the database, then the insert will fail. If restamp is True, then all datapoints
86
        with timestamps below the datapoints already in the database will have their timestamps overwritten
87
        to the same timestamp as the most recent datapoint hat already exists in the database, and the insert will
88
        succeed.
89
        """
90
91
        # To be safe, we split into chunks
92
        while (len(datapoint_array) > DATAPOINT_INSERT_LIMIT):
93
            # We insert datapoints in chunks of a couple thousand so that they
94
            # fit in the insert size limit of ConnectorDB
95
            a = datapoint_array[:DATAPOINT_INSERT_LIMIT]
96
97
            if restamp:
98
                self.db.update(self.path + "/data", a)
99
            else:
100
                self.db.create(self.path + "/data", a)
101
102
            # Clear the written datapoints
103
            datapoint_array = datapoint_array[DATAPOINT_INSERT_LIMIT:]
104
105
        if restamp:
106
            self.db.update(self.path + "/data", datapoint_array)
107
        else:
108
            self.db.create(self.path + "/data", datapoint_array)
109
110
    def insert(self, data):
111
        """insert inserts one datapoint with the given data, and appends it to
112
        the end of the stream::
113
114
            s = cdb["mystream"]
115
116
            s.create({"type": "string"})
117
118
            s.insert("Hello World!")
119
120
        """
121
        self.insert_array([{"d": data, "t": time.time()}], restamp=True)
122
123
    def append(self, data):
124
        """ Same as insert, using the pythonic array name """
125
        self.insert(data)
126
127
    def subscribe(self, callback, transform="", downlink=False):
128
        """Subscribes to the stream, running the callback function each time datapoints are inserted into
129
        the given stream. There is an optional transform to the datapoints, and a downlink parameter.::
130
131
            s = cdb["mystream"]
132
133
            def subscription_callback(stream,data):
134
                print stream, data
135
136
            s.subscribe(subscription_callback)
137
138
        The downlink parameter is for downlink streams - it allows to subscribe to the downlink substream,
139
        before it is acknowledged. This is especially useful for something like lights - have lights be
140
        a boolean downlink stream, and the light itself be subscribed to the downlink, so that other
141
        devices can write to the light, turning it on and off::
142
143
            def light_control(stream,data):
144
                light_boolean = data[0]["d"]
145
                print "Setting light to", light_boolean
146
                set_light(light_boolean)
147
148
                #Acknowledge the write
149
                return True
150
151
            # We don't care about intermediate values, we only want the most recent setting
152
            # of the light, meaning we want the "if last" transform
153
            s.subscribe(light_control, downlink=True, transform="if last")
154
155
        """
156
        streampath = self.path
157
        if downlink:
158
            streampath += "/downlink"
159
160
        return self.db.subscribe(streampath, callback, transform)
161
162
    def unsubscribe(self, transform="", downlink=False):
163
        """Unsubscribes from a previously subscribed stream. Note that the same values of transform
164
        and downlink must be passed in order to do the correct unsubscribe::
165
166
            s.subscribe(callback,transform="if last")
167
            s.unsubscribe(transform="if last")
168
        """
169
        streampath = self.path
170
        if downlink:
171
            streampath += "/downlink"
172
173
        return self.db.unsubscribe(streampath, transform)
174
175
    def __call__(self, t1=None, t2=None, limit=None, i1=None, i2=None, downlink=False, transform=None):
176
        """By calling the stream as a function, you can query it by either time range or index,
177
        and further you can perform a custom transform on the stream::
178
179
            #Returns all datapoints with their data < 50 from the past minute
180
            stream(t1=time.time()-60, transform="if $ < 50")
181
182
            #Performs an aggregation on the stream, returning a single datapoint
183
            #which contains the sum of the datapoints
184
            stream(transform="sum | if last")
185
186
        """
187
        params = query_maker(t1, t2, limit, i1, i2, transform, downlink)
188
189
        # In order to avoid accidental requests for full streams, ConnectorDB does not permit requests
190
        # without any url parameters, so we set i1=0 if we are requesting the
191
        # full stream
192
        if len(params) == 0:
193
            params["i1"] = 0
194
195
        return DatapointArray(self.db.read(self.path + "/data", params).json())
196
197
    def __getitem__(self, getrange):
198
        """Allows accessing the stream just as if it were just one big python array.
199
        An example::
200
201
            #Returns the most recent 5 datapoints from the stream
202
            stream[-5:]
203
204
            #Returns all the data the stream holds.
205
            stream[:]
206
207
        In order to perform transforms on the stream and to aggreagate data, look at __call__,
208
        which allows getting index ranges along with a transform.
209
        """
210
        if not isinstance(getrange, slice):
211
            # Return the single datapoint
212
            return self(i1=getrange, i2=getrange + 1)[0]
213
214
        # The query is a slice - return the range
215
        return self(i1=getrange.start, i2=getrange.stop)
216
217
    def length(self, downlink=False):
218
        return int(self.db.read(self.path + "/data", {"q": "length", "downlink": downlink}).text)
219
220
    def __len__(self):
221
        """taking len(stream) returns the number of datapoints saved within the database for the stream"""
222
        return self.length()
223
224
    def __repr__(self):
225
        """Returns a string representation of the stream"""
226
        return "[Stream:%s]" % (self.path, )
227
228
    def export(self, directory):
229
        """Exports the stream to the given directory. The directory can't exist. 
230
        You can later import this device by running import_stream on a device.
231
        """
232
        if os.path.exists(directory):
233
            raise FileExistsError(
234
                "The stream export directory already exists")
235
236
        os.mkdir(directory)
237
238
        # Write the stream's info
239
        with open(os.path.join(directory, "stream.json"), "w") as f:
240
            json.dump(self.data, f)
241
242
        # Now write the stream's data
243
        # We sort it first, since older versions of ConnectorDB had a bug
244
        # where sometimes datapoints would be returned out of order.
245
        self[:].sort().writeJSON(os.path.join(directory, "data.json"))
246
247
        # And if the stream is a downlink, write the downlink data
248
        if self.downlink:
249
            self(i1=0, i2=0, downlink=True).sort().writeJSON(os.path.join(directory, "downlink.json"))
250
251
    # -----------------------------------------------------------------------
252
    # Following are getters and setters of the stream's properties
253
254
    @property
255
    def datatype(self):
256
        """returns the stream's registered datatype. The datatype suggests how the stream can be processed."""
257
        if "datatype" in self.data:
258
            return self.data["datatype"]
259
        return ""
260
261
    @datatype.setter
262
    def datatype(self, set_datatype):
263
        self.set({"datatype": set_datatype})
264
265
    @property
266
    def downlink(self):
267
        """returns whether the stream is a downlink, meaning that it accepts input (like turning lights on/off)"""
268
        if "downlink" in self.data:
269
            return self.data["downlink"]
270
        return False
271
272
    @downlink.setter
273
    def downlink(self, is_downlink):
274
        self.set({"downlink": is_downlink})
275
276
    @property
277
    def ephemeral(self):
278
        """returns whether the stream is ephemeral, meaning that data is not saved, but just passes through the messaging system."""
279
        if "ephemeral" in self.data:
280
            return self.data["ephemeral"]
281
        return False
282
283
    @ephemeral.setter
284
    def ephemeral(self, is_ephemeral):
285
        """sets whether the stream is ephemeral, meaning that it sets whether the datapoints are saved in the database.
286
        an ephemeral stream is useful for things which are set very frequently, and which could want a subscription, but
287
        which are not important enough to be saved in the database"""
288
        self.set({"ephemeral": is_ephemeral})
289
290
    @property
291
    def schema(self):
292
        """Returns the JSON schema of the stream as a python dict."""
293
        if "schema" in self.data:
294
            return json.loads(self.data["schema"])
295
        return None
296
297
    @property
298
    def sschema(self):
299
        """Returns the JSON schema of the stream as a string"""
300
        if "schema" in self.data:
301
            return self.data["schema"]
302
        return None
303
304
    @schema.setter
305
    def schema(self, schema):
306
        """sets the stream's schema. An empty schema is "{}". The schemas allow you to set a specific data type. 
307
        Both python dicts and strings are accepted."""
308
        if isinstance(schema, basestring):
309
            strschema = schema
310
            schema = json.loads(schema)
311
        else:
312
            strschema = json.dumps(schema)
313
        Draft4Validator.check_schema(schema)
314
        self.set({"schema": strschema})
315
316
    @property
317
    def user(self):
318
        """user returns the user which owns the given stream"""
319
        return User(self.db, self.path.split("/")[0])
320
321
    @property
322
    def device(self):
323
        """returns the device which owns the given stream"""
324
        splitted_path = self.path.split("/")
325
326
        return Device(self.db,
327
                      splitted_path[0] + "/" + splitted_path[1])
328
329
330
# The import has to go on the bottom because py3 imports are annoying
331
from ._user import User
332
from ._device import Device
333