CASASHDF5._load_dataset_info()   F
last analyzed

Complexity

Conditions 17

Size

Total Lines 46

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 17
c 1
b 0
f 0
dl 0
loc 46
rs 2.6603

How to fix   Complexity   

Complexity

Complex classes like CASASHDF5._load_dataset_info() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
import h5py
2
import logging
3
import dateutil.parser
4
import numpy as np
5
from collections import OrderedDict
6
7
logger = logging.getLogger(__name__)
8
9
10
class CASASHDF5:
11
    """CASASHDF5 Class to create and retrieve CASAS smart home data from h5df file 
12
    
13
    The data saved to or retrieved from a H5PY data file are pre-calculated features by
14
    :class:`CASASData` class. The H5PY data file also contains meta-data about the
15
    dataset, which include description for each feature, splits by week and/or splits
16
    by days.
17
    
18
    Attributes:
19
        _file (:class:`h5py.File`): :class:`h5py.File` object that represents root group.
20
    
21
    Args:
22
        filename (:obj:`str`): HDF5 File Name
23
        mode (:obj:`str`): 'r' for load from the file, and 'w' for create a new h5py data
24
    """
25
    def __init__(self, filename, mode='r', driver=None):
26
        self._file = h5py.File(filename, mode=mode, driver=driver)
27
        if mode == 'w':
28
            self._sources = []
29
            self._weeks = OrderedDict()
30
            self._days = OrderedDict()
31
            self._feature_description = []
32
            self._target_description = []
33
            self._target_colors = []
34
            self._sensors = []
35
            self._comment = ''
36
            self._bg_target = ''
37
        elif mode == 'r':
38
            self._load_dataset_info()
39
        else:
40
            raise ValueError('mode should be \'w\' or \'r\', but got %s.' % mode)
41
42
    def fetch_data(self, start_split=None, stop_split=None, pre_load=0):
43
        """Fetch data between start and stop splits
44
        
45
        Args:
46
            start_split (:obj:`str`): Begin of data
47
            stop_split (:obj:`str`): End of data
48
            pre_load (:obj:`int`): Load extra number of data before start split.
49
        
50
        Returns:
51
            :obj:`tuple` of :obj:`numpy.ndarray`: Returns a tuple of all sources sliced by the split defined.
52
                The sources should be in the order of ('time', 'feature', 'target')
53
        """
54
        start, stop = self._get_split_range(start_split, stop_split, pre_load)
55
        # Get time into a array of datetime
56
        if 'time' in self._sources:
57
            time_list = [dateutil.parser.parse(date_string.decode('utf-8'))
58
                         for date_string in self._file['time'][start:stop]]
59
        else:
60
            time_list = None
61
        # Get feature array
62
        if 'features' in self._sources:
63
            features = self._file['features'][start:stop]
64
        else:
65
            features = None
66
        # Get label array
67
        if 'targets' in self._sources:
68
            targets = self._file['targets'][start:stop]
69
        else:
70
            targets = None
71
        return time_list, features, targets
72
73
    # region Metadata Auxiliary Functions
74
    def num_sensors(self):
75
        """Return the number of sensors in the sensor list
76
        """
77
        return len(self._sensors)
78
79
    def get_sensor_by_index(self, i):
80
        """Get sensor name by index
81
        
82
        Args:
83
            i (:obj:`int`): Index to sensor
84
        """
85
        return self._sensors[i]
86
87
    def num_features(self):
88
        """Get number of features in the dataset
89
        """
90
        return len(self._feature_description)
91
92
    def get_feature_description_by_index(self, i):
93
        """Get the description of feature column :math:`i`.
94
        
95
        Args:
96
            i (:obj:`int`): Column index.
97
        
98
        Returns:
99
            :obj:`str`: Corresponding column description.
100
        """
101
        return self._feature_description[i]
102
103
    def num_targets(self):
104
        """Total number of target classes.
105
        
106
        Returns:
107
            :obj:`int`: Total number of target classes.
108
        """
109
        return len(self._target_description)
110
111
    def get_target_descriptions(self):
112
        """Get list of target descriptions
113
        
114
        Returns:
115
            :obj:`list` of :obj:`str`: List of target class description strings.
116
        """
117
        return self._target_description
118
119
    def get_target_description_by_index(self, i):
120
        """Get target description by class index :math:`i`.
121
        
122
        Args:
123
            i (:obj:`int`): Class index.
124
            
125
        Returns:
126
            :obj:`str`: Corresponding target class description.
127
        """
128
        return self._target_description[i]
129
130
    def get_target_colors(self):
131
        return self._target_colors
132
133
    def get_target_color_by_index(self, i):
134
        """Get the color string of target class :math:`i`.
135
        
136
        Args:
137
            i (:obj:`int`): Class index.
138
            
139
        Returns:
140
            :obj:`str`: Corresponding target class color string.        
141
        """
142
        return self._target_colors[i]
143
144
    def is_bg_target(self, i=None, label=None):
145
        """Check if the target class given by :param:`i` or :param:`label` is considered background
146
        
147
        Args:
148
            i (:obj:`int`): Class index.
149
            label (:obj:`str`): Class name.
150
            
151
        Returns:
152
            :obj:`bool`: True if it is considered background.
153
        """
154
        if i is not None:
155
            return i == self._target_description.index(self._bg_target)
156
        if label is not None:
157
            return label == self._bg_target
158
        return False
159
160
    def get_bg_target(self):
161
        """Get the description of the target class considered background in the dataset.
162
        
163
        Returns:
164
            :obj:`str`: Name of the class which is considered background in the dataset. Usually it is 'Other_Activity'.
165
        """
166
        return self._bg_target
167
168
    def get_bg_target_id(self):
169
        """Get the id of the target class considered background.
170
        
171
        Returns:
172
            :obj:`int`: The index of the target class which is considered background in the dataset.
173
        """
174
        return self._target_description.index(self._bg_target)
175
176
    def num_between_splits(self, start_split=None, stop_split=None):
177
        """Get the number of item between splits
178
        
179
        Args:
180
            start_split (:obj:`str`): Begin of data
181
            stop_split (:obj:`str`): End of data
182
        
183
        Returns:
184
            :obj:`int`: The number of items between two splits.
185
        """
186
        start, stop = self._get_split_range(start_split, stop_split)
187
        return stop - start
188
189
    def get_weeks_info(self):
190
        """Get splits by week.
191
        
192
        Returns:
193
            :obj:`List` of :obj:`tuple`: List of (key, value) tuple, where key is the name of the split and value is
194
                number of items in that split.
195
        """
196
        return [(week, self._weeks[week][1] - self._weeks[week][0]) for week in self._weeks]
197
198
    def get_days_info(self):
199
        """Get splits by day.
200
201
        Returns:
202
            :obj:`List` of :obj:`tuple`: List of (key, value) tuple, where key is the name of the split and value is
203
                number of items in that split.
204
        """
205
        return [(day, self._days[day][1] - self._days[day][0]) for day in self._days]
206
    # endregion
207
208
    # region CASASH5PY Dataset Creation
209
    def create_features(self, feature_array, feature_description):
210
        """ Create Feature Dataset
211
        
212
        Args:
213
            feature_array (:obj:`numpy.ndarray`): Numpy array holding calculated feature vectors
214
            feature_description (:obj:`list` of :obj:`str`): List of strings that describe each column of
215
                feature vectors.
216
        """
217
        if 'features' in self._sources:
218
            logger.error('Feature array already exists in the dataset.')
219
            return
220
        self._sources.append('features')
221
        self._feature_description = feature_description
222
        # Create feature array
223
        dset = self._file.create_dataset('features', data=feature_array,
224
                                         chunks=True, compression="gzip", compression_opts=9)
225
        dset.dims[0].label = 'batch'
226
        dset.dims[1].label = 'feature'
227
        # Add Feature Description as attributes
228
        self._file.attrs['features'] = [description.encode('utf-8')
229
                                        for description in feature_description]
230
231
    def create_targets(self, target_array, target_description, target_colors):
232
        """ Create Target Dataset
233
        
234
        Args:
235
            target_array (:obj:`numpy.ndarray`): Numpy array holding target labels
236
            target_description (:obj:`list` of :obj:`str`): List of strings that describe each each target class.
237
            target_colors (:obj:`list` of :obj:`str`): List of color values corresponding to each target class.
238
        """
239
        if 'targets' in self._sources:
240
            logger.error('Target array already exists in the dataset.')
241
            return
242
        self._sources.append('targets')
243
        self._target_description = target_description
244
        self._target_colors = target_colors
245
        # Create feature array
246
        dset = self._file.create_dataset('targets', data=target_array.reshape((target_array.size, 1)))
247
        dset.dims[0].label = 'batch'
248
        dset.dims[1].label = 'target'
249
        # Add Target Description as attributes
250
        self._file.attrs['targets'] = [description.encode('utf-8')
251
                                       for description in target_description]
252
        # Add Target Color as attributes
253
        self._file.attrs['target_colors'] = [color_string.encode('utf-8')
254
                                             for color_string in target_colors]
255
256
    def create_time_list(self, time_array):
257
        """ Create Time List
258
        
259
        Args:
260
            time_array (:obj:`list` of :obj:`datetime`): datetime corresponding to each feature vector in feature
261
                dataset.
262
        """
263
        if 'time' in self._sources:
264
            logger.error('Time list already exists in the dataset.')
265
            return
266
        self._sources.append('time')
267
        # Create Time lists
268
        num_items = len(time_array)
269
        dt = h5py.special_dtype(vlen=bytes)
270
        dset = self._file.create_dataset('time', (num_items,), dtype=dt)
271
        for i in range(num_items):
272
            dset[i] = time_array[i].isoformat().encode('utf-8')
273
274
    def create_splits(self, days, weeks):
275
        """ Create splits by days and weeks
276
        
277
        Args:
278
            days (:obj:`list` of :obj:`int`): Start index for each day
279
            weeks (:obj:`list` of :obj:`int`): Start index for week
280
        """
281
        if len(self._days) != 0 or len(self._weeks) != 0:
282
            logger.error('Splits already exist.')
283
            return
284
        self._days = OrderedDict()
285
        self._weeks = OrderedDict()
286
        max_name_len = len('week_%d' % len(days))
287
        # Create days numpy array
288
        days_array = np.empty(
289
            len(days) - 1,
290
            dtype=np.dtype([
291
                ('name', 'a', max_name_len),
292
                ('start', np.int64, 1),
293
                ('stop', np.int64, 1)]
294
            ))
295
        # Create days numpy array
296
        weeks_array = np.empty(
297
            len(weeks) - 1,
298
            dtype=np.dtype([
299
                ('name', 'a', max_name_len),
300
                ('start', np.int64, 1),
301
                ('stop', np.int64, 1)]
302
            ))
303
        # Populate days_array
304
        for i in range(len(days) - 1):
305
            days_array[i]['name'] = ('day_%d' % i).encode('utf-8')
306
            days_array[i]['start'] = days[i]
307
            days_array[i]['stop'] = days[i+1]
308
            self._days[('day_%d' % i)] = [days[i], days[i+1]]
309
        # Populate weeks array
310
        for i in range(len(weeks) - 1):
311
            weeks_array[i]['name'] = ('week_%d' % i).encode('utf-8')
312
            weeks_array[i]['start'] = weeks[i]
313
            weeks_array[i]['stop'] = weeks[i+1]
314
            self._weeks[('week_%d' % i)] = [weeks[i], weeks[i+1]]
315
        # Set attributes
316
        self._file.attrs['days'] = days_array
317
        self._file.attrs['weeks'] = weeks_array
318
319
    def create_comments(self, comment):
320
        """ Add comments to dataset
321
        
322
        Args:
323
            comment (:obj:`str`): Comments to the dataset
324
        """
325
        self._file.attrs['comment'] = comment.encode('utf-8')
326
327
    def create_sensors(self, sensors):
328
        """ Add sensors list to attributes
329
        
330
        If the sensor IDs in the dataset is not binary coded, there is a need to provide the sensor list to go along
331
        with the feature vectors.
332
        
333
        Args:
334
            sensors (:obj:`list` of :obj:`str`): List of sensor name corresponds to the id in the feature array.
335
        """
336
        self._file.attrs['sensors'] = [sensor.encode('utf-8') for sensor in sensors]
337
338
    def set_background_target(self, target_name):
339
        """ Set 'target_name' as background target
340
        
341
        Args:
342
            target_name (:obj:`str`): Name of background target
343
        """
344
        if self._bg_target != '':
345
            logger.error('background target label has been set to %s.' % self._bg_target)
346
            return
347
        self._bg_target = target_name
348
        self._file.attrs['bg_target'] = target_name.encode('utf-8')
349
350
    def flush(self):
351
        """ Write To File
352
        """
353
        self._file.attrs['sources'] = [source.encode('utf-8') for source in self._sources]
354
        self._file.flush()
355
    # endregion
356
357
    def close(self):
358
        """ Close Dataset
359
        """
360
        self._file.close()
361
362
    # region InternalSupportRoutines
363
    def _get_split_range(self, start_split=None, stop_split=None, pre_load=0):
364
        """Get the requested splits range
365
366
        Args:
367
            start_split (:obj:`str`): Begin of data
368
            stop_split (:obj:`str`): End of data
369
            pre_load (:obj:`int`): Load extra number of data before start split.
370
371
        Returns:
372
            :obj:`tuple` of :obj:`int`: Returns a tuple of the start and stop index.            
373
        """
374
        # Determine the start index
375
        if start_split is None:
376
            start = 0
377
            stop = self._file[self._sources[0]].shape[0]
378
        elif start_split in self._weeks:
379
            start = self._weeks[start_split][0]
380
            stop = self._weeks[start_split][1]
381
        elif start_split in self._days:
382
            start = self._days[start_split][0]
383
            stop = self._days[start_split][1]
384
        else:
385
            raise ValueError('start_split error: Cannot find %s in splitting array.' % start_split)
386
        # Determine the stop index
387
        if stop_split is not None:
388
            if stop_split in self._weeks:
389
                stop = self._weeks[stop_split][1]
390
            elif stop_split in self._days:
391
                stop = self._weeks[stop_split][1]
392
            else:
393
                raise ValueError('stop_split error: Cannot find %s in splitting array.' % stop_split)
394
        # Compensate pre-load
395
        start = start - pre_load
396
        if start < 0:
397
            start = 0
398
        return start, stop
399
400
    def _load_dataset_info(self):
401
        """Populate attributes of current class based on meta-data from h5py file
402
        """
403
        attrs = self._file.attrs.keys()
404
        # Check sources set
405
        if 'sources' in attrs:
406
            self._sources = [source.decode('utf-8') for source in self._file.attrs['sources']]
407
        else:
408
            self._sources = []
409
        # Parse splits
410
        self._weeks = OrderedDict()
411
        self._days = OrderedDict()
412
        if 'weeks' in attrs and 'days' in attrs:
413
            for row in self._file.attrs['weeks']:
414
                self._weeks[row['name'].decode('utf-8')] = [row['start'], row['stop']]
415
            for row in self._file.attrs['days']:
416
                self._days[row['name'].decode('utf-8')] = [row['start'], row['stop']]
417
        # Meta-data about dataset
418
        if 'features' in attrs:
419
            self._feature_description = [description.decode('utf-8')
420
                                         for description in self._file.attrs['features']]
421
        else:
422
            self._feature_description = []
423
        if 'targets' in attrs:
424
            self._target_description = [description.decode('utf-8')
425
                                        for description in self._file.attrs['targets']]
426
        else:
427
            self._target_description = []
428
        if 'target_colors' in attrs:
429
            self._target_colors = [color_string.decode('utf-8')
430
                                   for color_string in self._file.attrs['target_colors']]
431
        else:
432
            self._target_colors = []
433
        if 'sensors' in attrs:
434
            self._sensors = [sensor.decode('utf-8') for sensor in self._file.attrs['sensors']]
435
        else:
436
            self._sensors = []
437
        # Load Comments and Background task
438
        if 'bg_target' in attrs:
439
            self._bg_target = self._file.attrs['bg_target'].decode('utf-8')
440
        else:
441
            self._bg_target = ''
442
        if 'comment' in attrs:
443
            self._comment = self._file.attrs['comment'].decode('utf-8')
444
        else:
445
            self._comment = ''
446
    # endregion
447