Completed
Pull Request — master (#544)
by
unknown
05:43
created

CubeSensorsMeasurementsSensor   A

Complexity

Total Complexity 25

Size/Duplication

Total Lines 143
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
dl 0
loc 143
rs 10
c 0
b 0
f 0
wmc 25
1
import time
2
3
from rauth import OAuth1Session
4
5
from st2common.util import isotime
6
from st2reactor.sensor.base import PollingSensor
7
8
__all__ = [
9
    'CubeSensorsMeasurementsSensor'
10
]
11
12
BASE_URL = 'https://api.cubesensors.com/v1'
13
FIELD_CONVERT_FUNCS = {
14
    'temp': lambda value: (float(value) / 100)
15
}
16
17
18
class CubeSensorsMeasurementsSensor(PollingSensor):
19
    DATASTORE_KEY_NAME = 'last_measurements_timestamp'
20
21
    def __init__(self, sensor_service, config=None, poll_interval=None):
22
        super(CubeSensorsMeasurementsSensor, self).__init__(sensor_service=sensor_service,
23
                                                            config=config,
24
                                                            poll_interval=poll_interval)
25
        self._device_uids = self._config['sensor'].get('device_uids', [])
26
27
        self._logger = self._sensor_service.get_logger(__name__)
28
        self._device_info_cache = {}
29
        self._last_measurement_timestamps = {}  # maps device_uid -> last mes. timestamp
30
31
    def setup(self):
32
        if not self._device_uids:
33
            raise ValueError('No "device_uids" configured!')
34
35
        self._session = self._get_session()
36
37
        # todo cache deviice names
38
        # Populate device info cache
39
        for device_uid in self._device_uids:
40
            data = self._get_device_info(device_uid=device_uid)
41
            self._device_info_cache[device_uid] = data
42
43
    def poll(self):
44
        for device_uid in self._device_uids:
45
            result = self._get_measurements(device_uid=device_uid)
46
47
            if not result:
48
                continue
49
50
            self._handle_result(device_uid=device_uid, result=result)
51
52
    def cleanup(self):
53
        pass
54
55
    def add_trigger(self, trigger):
56
        pass
57
58
    def update_trigger(self, trigger):
59
        pass
60
61
    def remove_trigger(self, trigger):
62
        pass
63
64
    def _handle_result(self, device_uid, result):
65
        existing_last_measurement_timestamp = self._get_last_measurement_timestamp(
66
            device_uid=device_uid)
67
        new_last_measurement_timestamp = isotime.parse(result['time'])
68
        new_last_measurement_timestamp = int(time.mktime(
69
            new_last_measurement_timestamp.timetuple()))  # pylint: disable=no-member
70
71
        if (existing_last_measurement_timestamp and
72
                new_last_measurement_timestamp <= existing_last_measurement_timestamp):
73
            # We have already seen this measurement, skip it
74
            self._logger.debug(('No new measurements, skipping results we have already seen'
75
                               'for device %s' % (device_uid)))
76
            return
77
78
        # Dispatch trigger
79
        self._dispatch_trigger(device_uid=device_uid, result=result)
80
81
        # Store last measurement timestamp
82
        self._set_last_measurement_timestamp(
83
            device_uid=device_uid, last_measurement_timestamp=new_last_measurement_timestamp)
84
85
    def _get_last_measurement_timestamp(self, device_uid):
86
        """
87
        Retrieve last measurement timestamp for a particular device.
88
89
        :rtype: ``int``
90
        """
91
        last_measurement_timestamp = self._last_measurement_timestamps.get(device_uid, None)
92
        if not last_measurement_timestamp:
93
            name = self._get_datastore_key_name(device_uid=device_uid)
94
            value = self._sensor_service.get_value(name=name)
95
            self._last_measurement_timestamps[device_uid] = int(value) if value else 0
96
97
        return self._last_measurement_timestamps[device_uid]
98
99
    def _set_last_measurement_timestamp(self, device_uid, last_measurement_timestamp):
100
        """
101
        Store a last measurement timestamp for a particular device.
102
        """
103
        self._last_measurement_timestamps[device_uid] = last_measurement_timestamp
104
105
        name = self._get_datastore_key_name(device_uid=device_uid)
106
        value = self._sensor_service.get_value(name=name)
107
        value = str(last_measurement_timestamp)
108
        self._sensor_service.set_value(name=name, value=value)
109
110
        return last_measurement_timestamp
111
112
    def _get_datastore_key_name(self, device_uid):
113
        name = self.DATASTORE_KEY_NAME + '.' + device_uid
114
        return name
115
116
    def _dispatch_trigger(self, device_uid, result):
117
        trigger = 'cubesensors.measurements'
118
119
        device_info = self._device_info_cache.get(device_uid, {})
120
        device_name = device_info.get('extra', {}).get('name', 'unknown')
121
        payload = {
122
            'device_uid': device_uid,
123
            'device_name': device_name,
124
            'measurements': result
125
        }
126
        self._sensor_service.dispatch(trigger=trigger, payload=payload)
127
128
    def _get_device_info(self, device_uid):
129
        response = self._session.get('%s/devices/%s' % (BASE_URL, device_uid))
130
        data = response.json()
131
        return data['device']
132
133
    def _get_measurements(self, device_uid):
134
        """
135
        Retrieve measurements for a particular device.
136
        """
137
        response = self._session.get('%s/devices/%s/current' % (BASE_URL, device_uid))
138
        data = response.json()
139
140
        values = data['results'][0]
141
        field_list = data['field_list']
142
143
        result = {}
144
        for index, field_name in enumerate(field_list):
145
            value = values[index]
146
147
            convert_func = FIELD_CONVERT_FUNCS.get(field_name, None)
148
            if convert_func:
149
                value = convert_func(value=value)
150
151
            result[field_name] = value
152
153
        return result
154
155
    def _get_session(self):
156
        session = OAuth1Session(consumer_key=self._config['consumer_key'],
157
                                consumer_secret=self._config['consumer_secret'],
158
                                access_token=self._config['access_token'],
159
                                access_token_secret=self._config['access_token_secret'])
160
        return session
161