etlt.helper.Type2Helper   C
last analyzed

Complexity

Total Complexity 55

Size/Duplication

Total Lines 326
Duplicated Lines 0 %

Test Coverage

Coverage 85.47%

Importance

Changes 0
Metric Value
eloc 135
dl 0
loc 326
ccs 100
cts 117
cp 0.8547
rs 6
c 0
b 0
f 0
wmc 55

12 Methods

Rating   Name   Duplication   Size   Complexity  
A Type2Helper.__init__() 0 35 1
A Type2Helper.enumerate() 0 14 3
A Type2Helper._rows_date2int() 0 14 3
A Type2Helper._rows_sort() 0 7 2
F Type2Helper._merge_adjacent_rows() 0 107 17
B Type2Helper._date2int() 0 21 6
A Type2Helper._rows_int2date() 0 18 5
A Type2Helper._get_pseudo_key() 0 11 2
A Type2Helper._equal() 0 13 4
A Type2Helper.get_rows() 0 12 3
A Type2Helper.prepare_data() 0 16 5
A Type2Helper._get_date_type() 0 17 4

How to fix   Complexity   

Complexity

Complex classes like etlt.helper.Type2Helper often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1 1
import copy
2 1
import datetime
3
from typing import Any, Dict, List, Tuple, Union
4 1
5
from etlt.helper.Allen import Allen
6
7 1
8
class Type2Helper:
9
    """
10
    A helper class for reference data with date intervals.
11
    """
12
13 1
    # ------------------------------------------------------------------------------------------------------------------
14
    def __init__(self, key_start_date: str, key_end_date: str, pseudo_key: List[str]):
15
        """
16
        Object constructor.
17
18
        :param key_start_date: The key of the start date in the rows.
19
        :param key_end_date: The key of the end date in the rows.
20
        :param pseudo_key: The keys of the columns that form the pseudo key.
21 1
        """
22
        self.copy: bool = True
23
        """
24
        If set to true a copy will be made from the original rows such that the original rows are not modified.
25
        """
26
27
        self._pseudo_key: List[str] = list(pseudo_key)
28 1
        """
29
        The keys of the columns that form the pseudo key.
30
        """
31
32
        self._key_end_date: str = key_end_date
33
        """
34
        The key of the end date in the rows.
35 1
        """
36
37
        self._key_start_date: str = key_start_date
38
        """
39
        The key of the start date in the rows.
40
        """
41 1
42
        self._rows: Dict = dict()
43
        """
44
        The data set.
45
        """
46
47
        self._date_type: str = ''
48 1
        """
49
        The type of the date fields.
50
        - date for datetime.date objects
51
        - str  for strings in ISO 8601 (YYYY-MM-DD) format
52
        - int for integers
53
        """
54
55 1
    # ------------------------------------------------------------------------------------------------------------------
56 1
    def _get_pseudo_key(self, row: Dict[str, Any]) -> Tuple:
57
        """
58
        Returns the pseudo key in a row.
59
60
        :param dict row: The row.
61
        """
62
        ret = list()
63
        for key in self._pseudo_key:
64
            ret.append(row[key])
65
66 1
        return tuple(ret)
67
68
    # ------------------------------------------------------------------------------------------------------------------
69
    @staticmethod
70
    def _date2int(date: Union[str, datetime.date, int]) -> int:
71
        """
72
        Returns an integer representation of a date.
73
74 1
        :param date: The date.
75 1
        """
76 1
        if isinstance(date, str):
77
            if date.endswith(' 00:00:00') or date.endswith('T00:00:00'):
78 1
                # Ignore time suffix.
79
                date = date[0:-9]
80
            tmp = datetime.datetime.strptime(date, '%Y-%m-%d')
81 1
            return tmp.toordinal()
82 1
83
        if isinstance(date, datetime.date):
84
            return date.toordinal()
85
86
        if isinstance(date, int):
87
            return date
88
89
        raise ValueError('Unexpected type {}'.format(date.__class__))
90 1
91 1
    # ------------------------------------------------------------------------------------------------------------------
92
    def _rows_date2int(self, rows: List[Dict[str, Any]]) -> None:
93 1
        """
94 1
        Replaces start and end dates in a row set with their integer representation
95 1
96
        :param rows: The list of rows.
97
        """
98
        for row in rows:
99
            # Determine the type of dates based on the first start date.
100
            if not self._date_type:
101
                self._date_type = self._get_date_type(row[self._key_start_date])
102
103
            # Convert dates to integers.
104
            row[self._key_start_date] = self._date2int(row[self._key_start_date])
105
            row[self._key_end_date] = self._date2int(row[self._key_end_date])
106 1
107
    # ------------------------------------------------------------------------------------------------------------------
108
    def _rows_int2date(self, rows: List[Dict[str, Any]]) -> None:
109
        """
110
        Replaces start and end dates in the row set with their integer representation
111
112 1
        :param rows: The list of rows.
113
        """
114 1
        for row in rows:
115 1
            if self._date_type == 'str':
116
                row[self._key_start_date] = datetime.date.fromordinal(row[self._key_start_date]).isoformat()
117
                row[self._key_end_date] = datetime.date.fromordinal(row[self._key_end_date]).isoformat()
118 1
            elif self._date_type == 'date':
119 1
                row[self._key_start_date] = datetime.date.fromordinal(row[self._key_start_date])
120
                row[self._key_end_date] = datetime.date.fromordinal(row[self._key_end_date])
121
            elif self._date_type == 'int':
122 1
                # Nothing to do.
123
                pass
124
            else:
125
                raise ValueError('Unexpected date type {0!s}'.format(self._date_type))
126
127
    # ------------------------------------------------------------------------------------------------------------------
128 1
    def _rows_sort(self, rows: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
129 1
        """
130 1
        Returns a list of rows sorted by start and end date.
131 1
132
        :param rows: The list of rows.
133
        """
134
        return sorted(rows, key=lambda row: (row[self._key_start_date], row[self._key_end_date]))
135
136
    # ------------------------------------------------------------------------------------------------------------------
137
    @staticmethod
138
    def _get_date_type(date: Union[str, datetime.date, int]) -> str:
139
        """
140
        Returns the typeof a date.
141
142 1
        :param date: The date.
143
        """
144
        if isinstance(date, str):
145
            return 'str'
146
147
        if isinstance(date, datetime.date):
148
            return 'date'
149
150 1
        if isinstance(date, int):
151
            return 'int'
152
153 1
        raise ValueError('Unexpected type {0!s}'.format(date.__class__))
154 1
155
    # ------------------------------------------------------------------------------------------------------------------
156
    def _equal(self, row1: Dict[str, Any], row2: Dict[str, Any]) -> bool:
157
        """
158
        Returns whether two rows are identical excluding start and end date.
159
160
        :param row1: The first row.
161
        :param row2: The second row.
162 1
        """
163 1
        for key in row1.keys():
164
            if key not in [self._key_start_date, self._key_end_date]:
165
                if row1[key] != row2[key]:
166
                    return False
167
168
        return True
169
170
    # ------------------------------------------------------------------------------------------------------------------
171
    def _merge_adjacent_rows(self, rows: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
172
        """
173
        Resolves adjacent and overlapping rows. Overlapping rows are resolved as follows:
174 1
        * The interval with the most recent begin date prevails for the overlapping period.
175
        * If the start dates are the same the interval with the most recent end date prevails.
176
        * If the start and end dates are equal the last row in the data set prevails.
177
        Identical (excluding begin and end date) adjacent rows are replace with a single row.
178
179
        :param rows: The rows in a group (i.e. with the same natural key).
180
        """
181
        ret = list()
182
183 1
        prev_row = None
184 1
        for row in rows:
185 1
            if prev_row:
186 1
                relation = Allen.relation(prev_row[self._key_start_date],
187
                                          prev_row[self._key_end_date],
188 1
                                          row[self._key_start_date],
189
                                          row[self._key_end_date])
190
                if relation is None:
191 1
                    # row holds an invalid interval (prev_row always holds a valid interval). Hence, the join is empty.
192
                    return []
193
194
                elif relation == Allen.X_BEFORE_Y:
195
                    # Two rows with distinct intervals.
196
                    # prev_row: |----|
197
                    # row:                 |-----|
198
                    ret.append(prev_row)
199
                    prev_row = row
200
201
                elif relation == Allen.X_MEETS_Y:
202
                    # The two rows are adjacent.
203 1
                    # prev_row: |-------|
204
                    # row:               |-------|
205 1
                    if self._equal(prev_row, row):
206 1
                        # The two rows are identical (except for start and end date) and adjacent. Combine the two rows
207 1
                        # into one row.
208 1
                        prev_row[self._key_end_date] = row[self._key_end_date]
209
                    else:
210
                        # Rows are adjacent but not identical.
211
                        ret.append(prev_row)
212 1
                        prev_row = row
213
214 1
                elif relation == Allen.X_OVERLAPS_WITH_Y:
215
                    # prev_row overlaps row. Should not occur with proper reference data.
216 1
                    # prev_row: |-----------|
217
                    # row:            |----------|
218
                    if self._equal(prev_row, row):
219
                        # The two rows are identical (except for start and end date) and overlapping. Combine the two
220 1
                        # rows into one row.
221 1
                        prev_row[self._key_end_date] = row[self._key_end_date]
222
                    else:
223 1
                        # Rows are overlapping but not identical.
224
                        prev_row[self._key_end_date] = row[self._key_start_date] - 1
225
                        ret.append(prev_row)
226
                        prev_row = row
227 1
228
                elif relation == Allen.X_STARTS_Y:
229
                    # prev_row start row. Should not occur with proper reference data.
230 1
                    # prev_row: |------|
231
                    # row:      |----------------|
232
                    prev_row = row
233 1
234 1
                elif relation == Allen.X_EQUAL_Y:
235
                    # Can happen when the reference data sets are joined without respect for date intervals.
236 1
                    # prev_row: |----------------|
237
                    # row:      |----------------|
238
                    prev_row = row
239
240 1
                elif relation == Allen.X_DURING_Y_INVERSE:
241
                    # row during prev_row. Should not occur with proper reference data.
242
                    # prev_row: |----------------|
243 1
                    # row:           |------|
244
                    # Note: the interval with the most recent start date prevails. Hence, the interval after
245
                    # row[self._key_end_date] is discarded.
246 1
                    if self._equal(prev_row, row):
247 1
                        prev_row[self._key_end_date] = row[self._key_end_date]
248 1
                    else:
249
                        prev_row[self._key_end_date] = row[self._key_start_date] - 1
250 1
                        ret.append(prev_row)
251
                        prev_row = row
252
253
                elif relation == Allen.X_FINISHES_Y_INVERSE:
254 1
                    # row finishes prev_row. Should not occur with proper reference data.
255
                    # prev_row: |----------------|
256 1
                    # row:                |------|
257
                    if not self._equal(prev_row, row):
258
                        prev_row[self._key_end_date] = row[self._key_start_date] - 1
259
                        ret.append(prev_row)
260 1
                        prev_row = row
261
262 1
                        # Note: if the two rows are identical (except for start and end date) nothing to do.
263
                else:
264
                    # Note: The rows are sorted such that prev_row[self._key_begin_date] <= row[self._key_begin_date].
265
                    # Hence the following relation should not occur: X_DURING_Y,  X_FINISHES_Y, X_BEFORE_Y_INVERSE,
266
                    # X_MEETS_Y_INVERSE, X_OVERLAPS_WITH_Y_INVERSE, and X_STARTS_Y_INVERSE. Hence, we covered all 13
267
                    # relations in Allen's interval algebra.
268 1
                    raise ValueError('Data is not sorted properly. Relation: {0}'.format(relation))
269 1
270
            elif row[self._key_start_date] <= row[self._key_end_date]:
271 1
                # row is the first valid row.
272 1
                prev_row = row
273 1
274
        if prev_row:
275 1
            ret.append(prev_row)
276
277
        return ret
278
279 1
    # ------------------------------------------------------------------------------------------------------------------
280 1
    def enumerate(self, name: str, start: int = 1) -> None:
281 1
        """
282 1
        Enumerates all rows such that the pseudo key and the ordinal number are a unique key.
283
284
        :param name: The key holding the ordinal number.
285
        :param start: The start of the ordinal numbers. Foreach pseudo key the first row has this ordinal number.
286
        """
287
        for pseudo_key, rows in self._rows.items():
288
            rows = self._rows_sort(rows)
289
            ordinal = start
290
            for row in rows:
291
                row[name] = ordinal
292 1
                ordinal += 1
293
            self._rows[pseudo_key] = rows
294 1
295
    # ------------------------------------------------------------------------------------------------------------------
296 1
    def get_rows(self, sort: bool = False) -> List:
297 1
        """
298
        Returns the rows of this Type2Helper.
299 1
300
        :param sort: Whether the rows must be sorted by the pseudo key.
301
        """
302 1
        ret = []
303
        for _, rows in sorted(self._rows.items()) if sort else self._rows.items():
304
            self._rows_int2date(rows)
305
            ret.extend(rows)
306
307
        return ret
308
309 1
    # ------------------------------------------------------------------------------------------------------------------
310 1
    def prepare_data(self, rows: List[Dict[str, Any]]) -> None:
311 1
        """
312 1
        Sets and prepares the rows. The rows are stored in groups in a dictionary. A group is a list of rows with the
313 1
        same pseudo key. The key in the dictionary is a tuple with the values of the pseudo key.
314 1
        """
315 1
        self._rows = dict()
316
        for row in copy.copy(rows) if self.copy else rows:
317
            pseudo_key = self._get_pseudo_key(row)
318 1
            if pseudo_key not in self._rows:
319
                self._rows[pseudo_key] = list()
320
            self._rows[pseudo_key].append(row)
321
322
        # Convert begin and end dates to integers.
323
        self._date_type = None
324 1
        for pseudo_key, rows in self._rows.items():
325 1
            self._rows_date2int(rows)
326 1
327
# ----------------------------------------------------------------------------------------------------------------------
328