Passed
Branch master (697a4f)
by P.R.
01:30
created

Type2Helper   D

Complexity

Total Complexity 61

Size/Duplication

Total Lines 369
Duplicated Lines 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
wmc 61
c 1
b 0
f 0
dl 0
loc 369
rs 4.054

15 Methods

Rating   Name   Duplication   Size   Complexity  
B __init__() 0 43 1
A prepare_data() 0 13 3
C detect_overlap() 0 30 7
C _merge_pass2() 0 32 7
A _intersect() 0 19 2
A _get_date_type() 0 16 3
A _get_natural_key() 0 13 2
D _merge_pass4() 0 57 11
A _sort_data() 0 6 3
A _date2int() 0 17 3
A _merge_pass3() 0 9 2
A _merge_pass5() 0 15 4
A _equal() 0 15 4
B _merge_pass1() 0 29 6
A merge() 0 21 3

How to fix   Complexity   

Complex Class

Complex classes like Type2Helper often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""
2
ETLT
3
4
Copyright 2016 Set Based IT Consultancy
5
6
Licence MIT
7
"""
8
import copy
9
import datetime
10
11
from etlt.helper.Allen import Allen
12
13
14
class Type2Helper:
15
    """
16
    A helper class for reference data with date intervals.
17
    """
18
19
    # ------------------------------------------------------------------------------------------------------------------
20
    def __init__(self, key_start_date, key_end_date, natural_key):
21
        """
22
        Object constructor.
23
24
        :param str key_start_date: The key of the start date in the rows.
25
        :param str key_end_date: The key of the end date in the rows.
26
        :param list[str] natural_key: The keys of the columns that form the natural key.
27
        """
28
        self._natural_key = list(natural_key)
29
        """
30
        The keys of the columns that form the natural key.
31
32
        :type list[str]:
33
        """
34
35
        self._key_end_date = key_end_date
36
        """
37
        The key of the end date in the rows.
38
39
        :type str:
40
        """
41
        self._key_start_date = key_start_date
42
        """
43
        The key of the start date in the rows.
44
45
        :type str:
46
        """
47
48
        self.rows = dict()
49
        """
50
        The data set.
51
52
        :type dict:
53
        """
54
55
        self._date_type = ''
56
        """
57
        The type of the date fields.
58
        - date for datetime.date objects
59
        - str  for strings in ISO 8601 (YYYY-MM-DD) format.
60
61
        :type str:
62
        """
63
64
    # ------------------------------------------------------------------------------------------------------------------
65
    def _get_natural_key(self, row):
66
        """
67
        Returns the natural key in a row.
68
69
        :param dict row: The row.
70
71
        :rtype: tuple
72
        """
73
        ret = list()
74
        for key in self._natural_key:
75
            ret.append(row[key])
76
77
        return tuple(ret)
78
79
    # ------------------------------------------------------------------------------------------------------------------
80
    @staticmethod
81
    def _date2int(date):
82
        """
83
        Returns an integer representation of a date.
84
85
        :param str|datetime.date date: The date.
86
87
        :rtype: int
88
        """
89
        if isinstance(date, str):
90
            tmp = datetime.datetime.strptime(date, '%Y-%m-%d')
91
            return tmp.toordinal()
92
93
        if isinstance(date, datetime.date):
94
            return date.toordinal()
95
96
        raise ValueError('Unexpected type %s' % date.__class__)
97
98
    # ------------------------------------------------------------------------------------------------------------------
99
    @staticmethod
100
    def _get_date_type(date):
101
        """
102
        Returns the type of a date.
103
104
        :param str|datetime.date date: The date.
105
106
        :rtype: str
107
        """
108
        if isinstance(date, str):
109
            return 'str'
110
111
        if isinstance(date, datetime.date):
112
            return 'date'
113
114
        raise ValueError('Unexpected type %s' % date.__class__)
115
116
    # ------------------------------------------------------------------------------------------------------------------
117
    def _sort_data(self):
118
        """
119
        Sorts all rows in all groups by start and end date.
120
        """
121
        for natural_key, rows in self.rows.items():
122
            self.rows[natural_key] = sorted(rows, key=lambda row: (row[self._key_start_date], row[self._key_end_date]))
123
124
    # ------------------------------------------------------------------------------------------------------------------
125
    def prepare_data(self, rows):
126
        """
127
        Sets and prepares the rows. The rows are stored in groups in a dictionary. A group is a list of rows with the
128
        same natural key. The key in the dictionary is a tuple with the values of the natural key.
129
130
        :param list[dict] rows: The rows
131
        """
132
        self.rows = dict()
133
        for row in rows:
134
            natural_key = self._get_natural_key(row)
135
            if natural_key not in self.rows:
136
                self.rows[natural_key] = list()
137
            self.rows[natural_key].append(row)
138
139
    # ------------------------------------------------------------------------------------------------------------------
140
    def detect_overlap(self):
141
        """
142
        Detects if two or more rows in a group (i.e. rows with the same natural key) have overlap. Returns the rows with
143
        overlap.
144
        """
145
        ret = dict()
146
        for (natural_key, rows) in self.rows:
147
            prev_row = None
148
            added_prev_row = False
149
            overlapping = list()
150
            for row in rows:
151
                if prev_row:
152
                    relation = Allen.relation(self._date2int(row[self._key_start_date]),
153
                                              self._date2int(row[self._key_end_date]),
154
                                              self._date2int(prev_row[self._key_start_date]),
155
                                              self._date2int(prev_row[self._key_end_date]))
156
                    if relation not in [Allen.X_BEFORE_Y, Allen.X_MEETS_Y]:
157
                        if not added_prev_row:
158
                            overlapping.append(prev_row)
159
                        overlapping.append(row)
160
                        added_prev_row = True
161
                    else:
162
                        added_prev_row = False
163
164
                    prev_row = row
165
166
            if overlapping:
167
                ret[natural_key] = overlapping
168
169
        return ret
170
171
    # ------------------------------------------------------------------------------------------------------------------
172
    @staticmethod
173
    def _intersect(start1, end1, start2, end2):
174
        """
175
        Returns the intersection of two intervals. Returns (None,None) if the intersection is empty.
176
177
        :param str|datetime.date start1: The start date of the first interval.
178
        :param str|datetime.date end1: The end date of the first interval.
179
        :param str|datetime.date start2: The start date of the second interval.
180
        :param str|datetime.date end2: The end date of the second interval.
181
182
        :rtype: tuple[datetime.date|None,datetime.date|None]
183
        """
184
        start = max(start1, start2)
185
        end = min(end1, end2)
186
187
        if start > end:
188
            return None, None
189
190
        return start, end
191
192
    # ------------------------------------------------------------------------------------------------------------------
193
    def _equal(self, row1, row2):
194
        """
195
        Returns True if two rows are identical excluding start and end date. Returns False otherwise.
196
197
        :param dict[str,T] row1: The first row.
198
        :param dict[str,T] row2: The second row.
199
200
        :rtype: bool
201
        """
202
        for key in row1.keys():
203
            if key not in [self._key_start_date, self._key_end_date]:
204
                if row1[key] != row2[key]:
205
                    return False
206
207
        return True
208
209
    # ------------------------------------------------------------------------------------------------------------------
210
    def _merge_pass1(self, keys, rows):
211
        """
212
        Replaces start and end dates in the row set with their integer representation
213
214
        :param list[tuple[str,str]] keys: The other keys with start and end date.
215
        :param list[dict[str,T]] rows: The list of rows.
216
217
        :rtype: list[dict[str,T]]
218
        """
219
        ret = list()
220
        for row in rows:
221
            # Make a copy of the row such that self._rows is not affected by merge.
222
            tmp = copy.copy(row)
223
224
            # Determine the type of dates based on the first start date.
225
            if not self._date_type:
226
                self._date_type = self._get_date_type(tmp[self._key_start_date])
227
228
            # Convert dates to integers.
229
            tmp[self._key_start_date] = self._date2int(tmp[self._key_start_date])
230
            tmp[self._key_end_date] = self._date2int(tmp[self._key_end_date])
231
            for key_start_date, key_end_date in keys:
232
                if key_start_date != self._key_start_date:
233
                    tmp[key_start_date] = self._date2int(tmp[key_start_date])
234
                if key_end_date != self._key_end_date:
235
                    tmp[key_end_date] = self._date2int(tmp[key_end_date])
236
            ret.append(tmp)
237
238
        return ret
239
240
    # ------------------------------------------------------------------------------------------------------------------
241
    def _merge_pass2(self, keys, rows):
242
        """
243
        Computes the intersection of the date intervals of two or more reference data sets. If the intersection is empty
244
        the row is removed from the group.
245
246
        :param list[tuple[str,str]] keys: The other keys with start and end date.
247
        :param list[dict[str,T]] rows: The list of rows.
248
249
        :rtype: list[dict[str,T]]
250
        """
251
        ret = list()
252
        for row in rows:
253
            start_date = row[self._key_start_date]
254
            end_date = row[self._key_end_date]
255
            for key_start_date, key_end_date in keys:
256
                start_date, end_date = Type2Helper._intersect(start_date,
257
                                                              end_date,
258
                                                              row[key_start_date],
259
                                                              row[key_end_date])
260
                if not start_date:
261
                    break
262
                if self._key_start_date != key_start_date:
263
                    del row[key_start_date]
264
                if self._key_end_date != key_end_date:
265
                    del row[key_end_date]
266
267
            if start_date:
268
                row[self._key_start_date] = start_date
269
                row[self._key_end_date] = end_date
270
                ret.append(row)
271
272
        return ret
273
274
    # ------------------------------------------------------------------------------------------------------------------
275
    def _merge_pass3(self, rows):
276
        """
277
        Returns a list of rows sorted by start and end date.
278
279
        :param list[dict[str,T]] rows: The list of rows.
280
281
        :rtype: list[dict[str,T]]
282
        """
283
        return sorted(rows, key=lambda row: (row[self._key_start_date], row[self._key_end_date]))
284
285
    # ------------------------------------------------------------------------------------------------------------------
286
    def _merge_pass4(self, rows):
287
        """
288
        Merges adjacent and overlapping rows in the same group (i.e. with the same natural key).
289
290
        :param list[dict[str,T]] rows: The rows in a group (i.e. with the same natural key).
291
        .
292
        :rtype: list[dict[str,T]]
293
        """
294
        ret = list()
295
296
        prev_row = None
297
        for row in rows:
298
            if prev_row:
299
                relation = Allen.relation(prev_row[self._key_start_date],
300
                                          prev_row[self._key_end_date],
301
                                          row[self._key_start_date],
302
                                          row[self._key_end_date])
303
                if relation == Allen.X_BEFORE_Y:
304
                    # Two rows with distinct intervals.
305
                    ret.append(prev_row)
306
                    prev_row = row
307
                elif relation == Allen.X_MEETS_Y:
308
                    # The two rows are adjacent.
309
                    if self._equal(prev_row, row):
310
                        # The two rows are identical (except for start and end date) and adjacent. Combine the two rows
311
                        # into one row.
312
                        prev_row[self._key_end_date] = row[self._key_end_date]
313
                    else:
314
                        # Rows are adjacent but not identical.
315
                        ret.append(prev_row)
316
                        prev_row = row
317
                elif relation == Allen.X_OVERLAPS_WITH_Y:
318
                    # Should not occur with proper reference data.
319
                    if self._equal(prev_row, row):
320
                        # The two rows are identical (except for start and end date) and overlapping. Combine the two
321
                        # rows into one row.
322
                        prev_row[self._key_end_date] = row[self._key_end_date]
323
                    else:
324
                        # Rows are overlapping but not identical.
325
                        prev_row[self._key_end_date] = row[self._key_start_date] - 1
326
                        ret.append(prev_row)
327
                        prev_row = row
328
                elif relation == Allen.X_STARTS_Y:
329
                    # Should not occur with proper reference data.
330
                    prev_row = row
331
                elif relation == Allen.X_EQUAL_Y:
332
                    # Can happen when the reference data sets are joined without respect for date intervals.
333
                    prev_row = row
334
                else:
335
                    raise ValueError('Data is not sorted properly')
336
            else:
337
                prev_row = row
338
339
        if prev_row:
340
            ret.append(prev_row)
341
342
        return ret
343
344
    # ------------------------------------------------------------------------------------------------------------------
345
    def _merge_pass5(self, rows):
346
        """
347
        Replaces start and end dates in the row set with their integer representation
348
349
        :param list[dict[str,T]] rows: The list of rows.
350
        """
351
        for row in rows:
352
            if self._date_type == 'str':
353
                row[self._key_start_date] = datetime.date.fromordinal(row[self._key_start_date]).isoformat()
354
                row[self._key_end_date] = datetime.date.fromordinal(row[self._key_end_date]).isoformat()
355
            elif self._date_type == 'date':
356
                row[self._key_start_date] = datetime.date.fromordinal(row[self._key_start_date])
357
                row[self._key_end_date] = datetime.date.fromordinal(row[self._key_end_date])
358
            else:
359
                raise ValueError('Unexpected date type %s' % self._date_type)
360
361
    # ------------------------------------------------------------------------------------------------------------------
362
    def merge(self, keys):
363
        """
364
        Merges the join on natural keys of two or more reference data sets.
365
366
        :param list[tuple[str,str]] keys: For each data set the keys of the start and end date.
367
368
        :rtype: list[dict[str,T]]
369
        """
370
        ret = list()
371
        self._date_type = ''
372
        for rows in self.rows.values():
373
            tmp = self._merge_pass1(keys, rows)
374
            tmp = self._merge_pass2(keys, tmp)
375
            if tmp:
376
                tmp = self._merge_pass3(tmp)
377
                tmp = self._merge_pass4(tmp)
378
                self._merge_pass5(tmp)
379
380
            ret.extend(tmp)
381
382
        return ret
383
384
# ----------------------------------------------------------------------------------------------------------------------
385