FeatureRoutineTemplate   A
last analyzed

Complexity

Total Complexity 3

Size/Duplication

Total Lines 44
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
dl 0
loc 44
rs 10
c 0
b 0
f 0
wmc 3

3 Methods

Rating   Name   Duplication   Size   Complexity  
A clear() 0 5 1
A __init__() 0 11 1
A update() 0 16 1
1
import abc
2
import logging
3
import numpy as np
4
from ..logging import logging_name
5
6
logger = logging.getLogger(__name__)
7
8
9
# region Abstract FeatureRoutineTemplate Class
10
class FeatureRoutineTemplate(metaclass=abc.ABCMeta):
11
    """Feature Routine Class
12
13
    A routine that calculate statistical features every time the window slides.
14
15
    Attributes:
16
        name (:obj:`str`): Feature routine name.
17
        description (:obj:`str`): Feature routine description.
18
        enabled (:obj:`str`): Feature routine enable flag.
19
    """
20
    def __init__(self, name, description, enabled=True):
21
        """
22
        Initialization of Template Class
23
        :return:
24
        """
25
        # Name
26
        self.name = name
27
        # Description
28
        self.description = description
29
        # enable
30
        self.enabled = enabled
31
32
    @abc.abstractmethod
33
    def update(self, data_list, cur_index, window_size, sensor_info):
34
        """Abstract update method
35
36
        For some features, we will update some statistical data every time
37
        we move forward a data record, instead of going back through the whole
38
        window and try to find the answer. This function will be called every time
39
        we advance in data record.
40
41
        Args:
42
            data_list (:obj:`list`): List of sensor data.
43
            cur_index (:obj:`int`): Index of current data record.
44
            window_size (:obj:`int`): Sliding window size.
45
            sensor_info (:obj:`dict`): Dictionary containing sensor index information.
46
        """
47
        return NotImplementedError()
48
49
    @abc.abstractmethod
50
    def clear(self):
51
        """Clear Internal Data Structures if recalculation is needed
52
        """
53
        return NotImplementedError()
54
# endregion
55
56
57
# region Abstract FeatureTemplate Class
58
class FeatureTemplate(metaclass=abc.ABCMeta):
59
    """Statistical Feature Template
60
61
    Args:
62
        name (:obj:`str`): Feature name.
63
        description (:obj:`str`): Feature description.
64
        per_sensor (:obj:`bool`): If the feature is calculated for each sensor.
65
        enabled (:obj:`bool`): If the feature is enabled.
66
        routine (:obj:`.FeatureRoutineTemplate`): Routine structure.
67
        normalized (:obj:`bool`): If the value of feature needs to be normalized.
68
69
    Attributes:
70
        name (:obj:`str`): Feature name.
71
        description (:obj:`str`): Feature description.
72
        index (:obj:`int`): Feature index.
73
        normalized (:obj:`bool`): If the value of feature needs to be normalized.
74
        per_sensor (:obj:`bool`): If the feature is calculated for each sensor.
75
        enabled (:obj:`bool`): If the feature is enabled.
76
        routine (:obj:`.FeatureRoutineTemplate`): Routine structure.
77
        _is_value_valid (:obj:`bool`): If the value calculated is valid
78
    """
79
    def __init__(self, name, description, enabled=True, normalized=True, per_sensor=False, routine=None):
80
        self.name = name
81
        self.description = description
82
        self.index = -1
83
        self.normalized = normalized
84
        self.per_sensor = per_sensor
85
        self.enabled = enabled
86
        self._is_value_valid = False
87
        # update Routine
88
        # For some feature, we will update statistical data every time we move forward
89
        # a data record. Instead of going back through previous window, the update function
90
        # in this routine structure will be called each time we advance to next data record
91
        self.routine = routine
92
93
    @abc.abstractmethod
94
    def get_feature_value(self, data_list, cur_index, window_size, sensor_info, sensor_name=None):
95
        """Abstract method to get feature value
96
97
        Args:
98
            data_list (:obj:`list`): List of sensor data.
99
            cur_index (:obj:`int`): Index of current data record.
100
            window_size (:obj:`int`): Sliding window size.
101
            sensor_info (:obj:`dict`): Dictionary containing sensor index information.
102
            sensor_name (:obj:`str`): Sensor Name.
103
104
        Returns:
105
            :obj:`double`: feature value
106
        """
107
        return NotImplementedError()
108
109
    @property
110
    def is_value_valid(self):
111
        """Statistical feature value valid check
112
113
        Due to errors and failures of sensors, the statistical feature calculated
114
        may go out of bound. This abstract method is used to check if the value
115
        calculated is valid. If not, it will not be inserted into feature vectors.
116
117
        Returns:
118
            :obj:`bool`: True if the result is valid.
119
        """
120
        return self._is_value_valid
121
# endregion
122
123
124
class EventHour(FeatureTemplate):
125
    """Hour of last event.rst.
126
    
127
    It returns the hour of last sensor event.rst in the sliding window. If ``normalized`` is set to ``True``,
128
    the hour is divided by 24, so that the value is bounded between 0 to 1.
129
    
130
    Args:
131
        normalized (:obj:`bool`): If true, the hour is normalized between 0 to 1.
132
    """
133
    def __init__(self, normalized=False):
134
        super().__init__(name='lastEventHour',
135
                         description='Time of the last sensor event.rst in window (hour)',
136
                         normalized=normalized,
137
                         per_sensor=False,
138
                         enabled=True,
139
                         routine=None)
140
141
    def get_feature_value(self, data_list, cur_index, window_size, sensor_info, sensor_name=None):
142
        """Get the hour when the last sensor event.rst in the window occurred
143
144
        Note:
145
            Please refer to :meth:`~.FeatureTemplate.get_feature_value` for information about
146
            parameters.
147
        """
148
        self._is_value_valid = True
149
        if self.normalized:
150
            return np.float(data_list[cur_index]['datetime'].hour)/24
151
        else:
152
            return np.float(data_list[cur_index]['datetime'].hour)
153
154
155
class EventSeconds(FeatureTemplate):
156
    """Seconds of last event.rst.
157
    
158
    The time of the hour (in seconds) of the last sensor event.rst in the window. If ``normalized`` is ``True``,
159
    the seconds is divided by 3600.
160
    
161
    Args:
162
        normalized (:obj:`bool`): If true, the hour is normalized between 0 to 1.
163
    """
164
    def __init__(self, normalized=False):
165
        super().__init__(
166
            name='lastEventSeconds',
167
            description='Time of the last sensor event.rst in window in seconds',
168
            normalized=normalized,
169
            per_sensor=False,
170
            enabled=True,
171
            routine=None)
172
173
    def get_feature_value(self, data_list, cur_index, window_size, sensor_info, sensor_name=None):
174
        """Get the time within an hour when the last sensor event.rst in the window occurred (in seconds)
175
176
        Note:
177
            Please refer to :meth:`~.FeatureTemplate.get_feature_value` for information about
178
            parameters.
179
        """
180
        self._is_value_valid = True
181
        time = data_list[cur_index]['datetime']
182
        if self.normalized:
183
            return np.float((time.minute * 60) + time.second)/3600
184
        else:
185
            return np.float((time.minute * 60) + time.second)
186
187
188
class WindowDuration(FeatureTemplate):
189
    """Length of the window.
190
    
191
    Any sliding window should have a duration of less than half a day. If it is, it is probable that there
192
    some missing sensor events, so the statistical features calculated for such a window is invalid.
193
194
    Args:
195
        normalized (:obj:`bool`): If true, the hour is normalized between 0 to 1.
196
    """
197
    def __init__(self, normalized=False):
198
        super().__init__(name='windowDuration',
199
                         description='Duration of current window in seconds',
200
                         normalized=normalized,
201
                         per_sensor=False,
202
                         enabled=True,
203
                         routine=None)
204
205
    def get_feature_value(self, data_list, cur_index, window_size, sensor_info, sensor_name=None):
206
        """Get the duration of the window in seconds. Invalid if the duration is greater than half a day.
207
208
        Note:
209
            Please refer to :meth:`~.FeatureTemplate.get_feature_value` for information about
210
            parameters.
211
        """
212
        self._is_value_valid = True
213
        timedelta = data_list[cur_index]['datetime'] - data_list[cur_index - window_size + 1]['datetime']
214
        window_duration = timedelta.total_seconds()
215
        if window_duration > 3600 * 12:
216
            self._is_value_valid = False
217
            # Window Duration is greater than a day - not possible
218
            # print('Warning: curIndex: %d; windowSize: %d; windowDuration: %f' %
219
            # (curIndex, windowSize, window_duration))
220
            window_duration -= 3600 * 12 * (int(window_duration) / (3600 * 12))
221
            # print('Fixed window duration %f' % window_duration)
222
            if data_list[cur_index]['datetime'].month != data_list[cur_index - 1]['datetime'].month or \
223
                    data_list[cur_index]['datetime'].day != data_list[cur_index - 1]['datetime'].day:
224
                date_advanced = (data_list[cur_index]['datetime'] - data_list[cur_index - 1]['datetime']).days
225
                hour_advanced = data_list[cur_index]['datetime'].hour - data_list[cur_index - 1]['datetime'].hour
226
                logger.warning(logging_name(self) + ': line %d - %d: %s' %
227
                               (cur_index, cur_index + 1, data_list[cur_index - 1]['datetime'].isoformat()))
228
                logger.warning(logging_name(self) + ': Date Advanced: %d; hour gap: %d' %
229
                               (date_advanced, hour_advanced))
230
        if self.normalized:
231
            # Normalized to 12 hours
232
            return np.float(window_duration) / (3600 * 12)
233
        else:
234
            return np.float(window_duration)
235
236
237
class LastSensor(FeatureTemplate):
238
    """Sensor ID of the last sensor event.rst of the window.
239
    
240
    For algorithms like decision trees and hidden markov model, sensor ID can be directly used as features.
241
    However, in other algorithms such as multi-layer perceptron, or support vector machine, the sensor ID
242
    needs to be binary coded.
243
    
244
    Args:
245
        per_sensor (:obj:`bool`): True if the sensor ID needs to be binary coded.
246
    """
247
    def __init__(self, per_sensor=False):
248
        super().__init__(name='lastSensorInWindow',
249
                         description='Sensor ID in the current window',
250
                         per_sensor=per_sensor,
251
                         enabled=True,
252
                         routine=None)
253
254
    def get_feature_value(self, data_list, cur_index, window_size, sensor_info, sensor_name=None):
255
        """Get the sensor which fired the last event.rst in the sliding window.
256
257
        If it is configured as per-sensor feature, it returns 1 if the sensor specified
258
        triggers the last event.rst in the window. Otherwise returns 0.
259
        If it is configured as a non-per-sensor feature, it returns the index of the
260
        index corresponding to the dominant sensor name that triggered the last event.rst.
261
262
        Note:
263
            Please refer to :meth:`~.FeatureTemplate.get_feature_value` for information about
264
            parameters.
265
        """
266
        self._is_value_valid = True
267
        sensor_label = data_list[cur_index]['sensor_id']
268
        if self.per_sensor:
269
            if sensor_name is not None:
270
                if sensor_name == sensor_label:
271
                    return 1
272
                else:
273
                    return 0
274
        else:
275
            if sensor_info.get(sensor_label, None) is None:
276
                self._is_value_valid = False
277
                logger.warning(logging_name(self) + ': Cannot find sensor %s in sensor_info' % sensor_label)
278
                logger.debug(logging_name(self) + ': Available sensors are: ' + str(sensor_info.keys()))
279
                return 0
280
            else:
281
                return sensor_info[sensor_label]['index']
282
283
284
class SensorCountRoutine(FeatureRoutineTemplate):
285
    """Routine to count occurance of each sensor
286
287
    Attributes:
288
        sensor_count (:obj:`dict`): Dictionary that counts the occurrance of each sensor
289
    """
290
    def __init__(self):
291
        super().__init__(
292
            name='SensorCountRoutine',
293
            description='Count Occurrence of all sensors in current event.rst window',
294
            enabled=True
295
        )
296
        # Dominant Sensor
297
        self.sensor_count = {}
298
299
    def update(self, data_list, cur_index, window_size, sensor_info):
300
        """Record the number of occurrence of each sensor in the sensor count dictionary.
301
        """
302
        self.sensor_count = {}
303
        for sensor_label in sensor_info.keys():
304
            if sensor_info[sensor_label]['enable']:
305
                self.sensor_count[sensor_label] = 0
306
        for index in range(0, window_size):
307
            if data_list[cur_index - index]['sensor_id'] in self.sensor_count.keys():
308
                self.sensor_count[data_list[cur_index - index]['sensor_id']] += 1
309
310
    def clear(self):
311
        self.sensor_count = {}
312
313
sensor_count_routine = SensorCountRoutine()
314
315
316
class SensorCount(FeatureTemplate):
317
    """Counts the occurrence of each sensor within the sliding window.
318
    
319
    The count of occurrence of each sensor is normalized to the length (total number of events) of the window if the
320
    ``normalized`` is set to True.
321
322
    Args:
323
        normalized (:obj:`bool`): If true, the count of each sensor is normalized between 0 to 1.
324
    """
325
    def __init__(self, normalized=False):
326
        super().__init__(name='sensorCount',
327
                         description='Number of Events in the window related to the sensor',
328
                         normalized=normalized,
329
                         per_sensor=True,
330
                         enabled=True,
331
                         routine=sensor_count_routine)
332
333
    def get_feature_value(self, data_list, cur_index, window_size, sensor_info, sensor_name=None):
334
        """Counts the number of occurrence of the sensor specified in current window.
335
        """
336
        count = self.routine.sensor_count.get(sensor_name, None)
337
        if count is None:
338
            logger.error(logging_name(self) + ': Cannot find sensor %s in sensor list' % sensor_name)
339
            self._is_value_valid = False
340
        else:
341
            self._is_value_valid = True
342
            if self.normalized:
343
                return float(count)/(window_size * 2)
344
            else:
345
                return float(count)
346
347
348
class SensorElapseTimeRoutine(FeatureRoutineTemplate):
349
    """Routine to record last occurrence of each sensor
350
351
    Attributes:
352
        sensor_fire_log (:obj:`dict`): Dictionary that record the last firing state of each sensor
353
    """
354
    def __init__(self):
355
        super().__init__(name='SensorElapseTimeUpdateRoutine',
356
                         description='Update Sensor Elapse Time for all enabled sensors',
357
                         enabled=True)
358
        # Sensor Fire Log
359
        self.sensor_fire_log = {}
360
361
    def update(self, data_list, cur_index, window_size, sensor_info):
362
        """Record the number of occurrence of each sensor in the sensor count dictionary.
363
        """
364
        if not self.sensor_fire_log:
365
            for sensor_label in sensor_info.keys():
366
                self.sensor_fire_log[sensor_label] = data_list[cur_index - window_size + 1]['datetime']
367
            for i in range(0, window_size):
368
                self.sensor_fire_log[data_list[cur_index - i]['sensor_id']] = data_list[cur_index - i]['datetime']
369
        self.sensor_fire_log[data_list[cur_index]['sensor_id']] = data_list[cur_index]['datetime']
370
371
    def clear(self):
372
        self.sensor_fire_log = {}
373
374
sensor_elapse_time_routine = SensorElapseTimeRoutine()
375
376
377
class SensorElapseTime(FeatureTemplate):
378
    """The time elapsed since last firing (in seconds)
379
    """
380
    def __init__(self, normalized=False):
381
        super().__init__(name='sensorElapseTime',
382
                         description='Time since each sensor fired (in seconds)',
383
                         normalized=normalized,
384
                         per_sensor=True,
385
                         enabled=True,
386
                         routine=sensor_elapse_time_routine)
387
388
    def get_feature_value(self, data_list, cur_index, window_size, sensor_info, sensor_name=None):
389
        """Get elapse time of specified sensor in seconds
390
        """
391
        self._is_value_valid = True
392
        timedelta = data_list[cur_index]['datetime'] - self.routine.sensor_fire_log[sensor_name]
393
        sensor_duration = timedelta.total_seconds()
394
        if self.normalized:
395
            elapse_time = float(sensor_duration)/(12*3600)
396
            # If the sensor is not fired in past 12 hours, just round it up to 12 hours
397
            if elapse_time > 1:
398
                elapse_time = 1.
399
            return elapse_time
400
        else:
401
            return float(sensor_duration)
402
403
404
class DominantSensorRoutine(FeatureRoutineTemplate):
405
    """Routine to record the occurance of each sensor within the sliding window
406
407
    Attributes:
408
        dominant_sensor_list (:obj:`dict`): Dictionary that record the last firing state of each sensor
409
    """
410
    def __init__(self):
411
        super().__init__(name='DominantSensorRoutine',
412
                         description='DominantSensorUpdateRoutine',
413
                         enabled=True)
414
        # Dominant Sensor
415
        self.dominant_sensor_list = {}
416
417
    def update(self, data_list, cur_index, window_size, sensor_info):
418
        """Calculate the dominant sensor of current window and store
419
        the name of the sensor in the dominant sensor array. The
420
        information is fetched by dominant sensor features.
421
        """
422
        if cur_index < window_size:
423
            logger.warning(logging_name(self) + ': current index %d is smaller than window size %d.' %
424
                           (cur_index, window_size))
425
        sensor_count = {}
426
        for index in range(0, window_size):
427
            if data_list[cur_index - index]['sensor_id'] in sensor_count.keys():
428
                sensor_count[data_list[cur_index - index]['sensor_id']] += 1
429
            else:
430
                sensor_count[data_list[cur_index - index]['sensor_id']] = 1
431
        # Find the Dominant one
432
        max_count = 0
433
        for sensor_label in sensor_count.keys():
434
            if sensor_count[sensor_label] > max_count:
435
                max_count = sensor_count[sensor_label]
436
                self.dominant_sensor_list[cur_index] = sensor_label
437
438
    def clear(self):
439
        self.dominant_sensor_list = {}
440
441
dominant_sensor_routine = DominantSensorRoutine()
442
443
444 View Code Duplication
class DominantSensor(FeatureTemplate):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
445
    """Dominant Sensor of current window.
446
    
447
    The sensor that fires the most amount of sensor event.rst in the current window.
448
    
449
    Args:
450
        per_sensor (:obj:`bool`): True if the sensor ID needs to be binary coded.
451
    """
452
    def __init__(self, per_sensor=False):
453
        super().__init__(name='DominantSensor',
454
                         description='Dominant Sensor in the window',
455
                         normalized=True,
456
                         per_sensor=per_sensor,
457
                         enabled=True,
458
                         routine=dominant_sensor_routine)
459
460
    def get_feature_value(self, data_list, cur_index, window_size, sensor_info, sensor_name=None):
461
        """If per_sensor is True, returns 1 with corresponding sensor Id.
462
        otherwise, return the index of last sensor in the window
463
        """
464
        self._is_value_valid = True
465
        dominant_sensor_label = self.routine.dominant_sensor_list.get(cur_index, None)
466
        if dominant_sensor_label is None:
467
            logger.warning(logging_name(self) + ': cannot find dominant sensor label for window index %d' % cur_index)
468
        if self.per_sensor:
469
            if sensor_name is not None:
470
                if sensor_name == dominant_sensor_label:
471
                    return 1
472
                else:
473
                    return 0
474
        else:
475
            return sensor_info[dominant_sensor_label]['index']
476
477
478 View Code Duplication
class DominantSensorPreviousWindow(FeatureTemplate):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
479
    """Dominant Sensor of previous window.
480
    
481
    The sensor that fires the most amount of sensor event.rst in the current window.
482
    
483
    Args:
484
        per_sensor (:obj:`bool`): True if the sensor ID needs to be binary coded.    
485
    """
486
    def __init__(self, per_sensor=False):
487
        super().__init__(name='DominantSensorPreviousWindow',
488
                         description='Dominant Sensor in the previous window',
489
                         normalized=True,
490
                         per_sensor=per_sensor,
491
                         enabled=True,
492
                         routine=dominant_sensor_routine)
493
494
    def get_feature_value(self, data_list, cur_index, window_size, sensor_info, sensor_name=None):
495
        """If per_sensor is True, returns 1 with corresponding sensor Id.
496
        otherwise, return the index of last sensor in the window
497
        """
498
        dominant_sensor_label = self.routine.dominant_sensor_list.get([cur_index-1], None)
499
        if dominant_sensor_label is None:
500
            logger.warning(logging_name(self) + ': cannot find dominant sensor label for window index %d' % cur_index)
501
        if self.per_sensor:
502
            if sensor_name is not None:
503
                if sensor_name == dominant_sensor_label:
504
                    return 1
505
                else:
506
                    return 0
507
        else:
508
            return sensor_info[dominant_sensor_label]['index']
509