Completed
Push — master ( 55d7fa...011702 )
by P.R.
01:40
created

Type2JoinHelper   B

Complexity

Total Complexity 39

Size/Duplication

Total Lines 240
Duplicated Lines 0 %

Test Coverage

Coverage 88.89%

Importance

Changes 2
Bugs 0 Features 0
Metric Value
wmc 39
c 2
b 0
f 0
dl 0
loc 240
ccs 88
cts 99
cp 0.8889
rs 8.2857

7 Methods

Rating   Name   Duplication   Size   Complexity  
B _pass1() 0 29 6
A _equal() 0 15 4
A _pass3() 0 9 2
C _pass2() 0 32 7
A _intersect() 0 19 2
A merge() 0 21 3
F _pass4() 0 97 15
1
"""
2
ETLT
3
4
Copyright 2016 Set Based IT Consultancy
5
6
Licence MIT
7
"""
8 1
import copy
9
10 1
from etlt.helper.Allen import Allen
11 1
from etlt.helper.Type2Helper import Type2Helper
12
13
14 1
class Type2JoinHelper(Type2Helper):
15
    """
16
    A helper class for joining data sets with date intervals.
17
    """
18
19
    # ------------------------------------------------------------------------------------------------------------------
20 1
    def _equal(self, row1, row2):
21
        """
22
        Returns True if two rows are identical excluding start and end date. Returns False otherwise.
23
24
        :param dict[str,T] row1: The first row.
25
        :param dict[str,T] row2: The second row.
26
27
        :rtype: bool
28
        """
29 1
        for key in row1.keys():
30 1
            if key not in [self._key_start_date, self._key_end_date]:
31 1
                if row1[key] != row2[key]:
32 1
                    return False
33
34 1
        return True
35
36
    # ------------------------------------------------------------------------------------------------------------------
37 1
    @staticmethod
38
    def _intersect(start1, end1, start2, end2):
39
        """
40
        Returns the intersection of two intervals. Returns (None,None) if the intersection is empty.
41
42
        :param int start1: The start date of the first interval.
43
        :param int end1: The end date of the first interval.
44
        :param int start2: The start date of the second interval.
45
        :param int end2: The end date of the second interval.
46
47
        :rtype: tuple[int|None,int|None]
48
        """
49 1
        start = max(start1, start2)
50 1
        end = min(end1, end2)
51
52 1
        if start > end:
53 1
            return None, None
54
55 1
        return start, end
56
57
    # ------------------------------------------------------------------------------------------------------------------
58 1
    def _pass1(self, keys, rows):
59
        """
60
        Replaces start and end dates in the row set with their integer representation
61
62
        :param list[tuple[str,str]] keys: The other keys with start and end date.
63
        :param list[dict[str,T]] rows: The list of rows.
64
65
        :rtype: list[dict[str,T]]
66
        """
67 1
        ret = list()
68 1
        for row in rows:
69
            # Make a copy of the row such that self._rows is not affected by merge.
70 1
            tmp = copy.copy(row)
71
72
            # Determine the type of dates based on the first start date.
73 1
            if not self._date_type:
74 1
                self._date_type = self._get_date_type(tmp[self._key_start_date])
75
76
            # Convert dates to integers.
77 1
            tmp[self._key_start_date] = self._date2int(tmp[self._key_start_date])
78 1
            tmp[self._key_end_date] = self._date2int(tmp[self._key_end_date])
79 1
            for key_start_date, key_end_date in keys:
80 1
                if key_start_date != self._key_start_date:
81 1
                    tmp[key_start_date] = self._date2int(tmp[key_start_date])
82 1
                if key_end_date != self._key_end_date:
83 1
                    tmp[key_end_date] = self._date2int(tmp[key_end_date])
84 1
            ret.append(tmp)
85
86 1
        return ret
87
88
    # ------------------------------------------------------------------------------------------------------------------
89 1
    def _pass2(self, keys, rows):
90
        """
91
        Computes the intersection of the date intervals of two or more reference data sets. If the intersection is empty
92
        the row is removed from the group.
93
94
        :param list[tuple[str,str]] keys: The other keys with start and end date.
95
        :param list[dict[str,T]] rows: The list of rows.
96
97
        :rtype: list[dict[str,T]]
98
        """
99 1
        ret = list()
100 1
        for row in rows:
101 1
            start_date = row[self._key_start_date]
102 1
            end_date = row[self._key_end_date]
103 1
            for key_start_date, key_end_date in keys:
104 1
                start_date, end_date = Type2JoinHelper._intersect(start_date,
105
                                                                  end_date,
106
                                                                  row[key_start_date],
107
                                                                  row[key_end_date])
108 1
                if not start_date:
109 1
                    break
110 1
                if self._key_start_date != key_start_date:
111 1
                    del row[key_start_date]
112 1
                if self._key_end_date != key_end_date:
113 1
                    del row[key_end_date]
114
115 1
            if start_date:
116 1
                row[self._key_start_date] = start_date
117 1
                row[self._key_end_date] = end_date
118 1
                ret.append(row)
119
120 1
        return ret
121
122
    # ------------------------------------------------------------------------------------------------------------------
123 1
    def _pass3(self, rows):
124
        """
125
        Returns a list of rows sorted by start and end date.
126
127
        :param list[dict[str,T]] rows: The list of rows.
128
129
        :rtype: list[dict[str,T]]
130
        """
131 1
        return sorted(rows, key=lambda row: (row[self._key_start_date], row[self._key_end_date]))
132
133
    # ------------------------------------------------------------------------------------------------------------------
134 1
    def _pass4(self, rows):
135
        """
136
        Merges adjacent and overlapping rows in the same group (i.e. with the same natural key). With proper reference
137
        data overlapping rows MUST not occur. However, this  method can handle overlapping rows. Overlapping rows are
138
        resolved as follows:
139
        * The interval with the most recent begin date prevails for the overlapping period.
140
        * If the begin dates are the same the interval with the most recent end date prevails.
141
        * If the begin and end dates are equal the last row in the data set prevails.
142
143
        :param list[dict[str,T]] rows: The rows in a group (i.e. with the same natural key).
144
        .
145
        :rtype: list[dict[str,T]]
146
        """
147 1
        ret = list()
148
149 1
        prev_row = None
150 1
        for row in rows:
151 1
            if prev_row:
152 1
                relation = Allen.relation(prev_row[self._key_start_date],
153
                                          prev_row[self._key_end_date],
154
                                          row[self._key_start_date],
155
                                          row[self._key_end_date])
156 1
                if relation == Allen.X_BEFORE_Y:
157
                    # Two rows with distinct intervals.
158
                    # prev_row: |----|
159
                    # row:                 |-----|
160 1
                    ret.append(prev_row)
161 1
                    prev_row = row
162 1
                elif relation == Allen.X_MEETS_Y:
163
                    # The two rows are adjacent.
164
                    # prev_row: |-------|
165
                    # row:               |-------|
166 1
                    if self._equal(prev_row, row):
167
                        # The two rows are identical (except for start and end date) and adjacent. Combine the two rows
168
                        # into one row.
169 1
                        prev_row[self._key_end_date] = row[self._key_end_date]
170
                    else:
171
                        # Rows are adjacent but not identical.
172 1
                        ret.append(prev_row)
173 1
                        prev_row = row
174 1
                elif relation == Allen.X_OVERLAPS_WITH_Y:
175
                    # prev_row overlaps row. Should not occur with proper reference data.
176
                    # prev_row: |-----------|
177
                    # row:            |----------|
178 1
                    if self._equal(prev_row, row):
179
                        # The two rows are identical (except for start and end date) and overlapping. Combine the two
180
                        # rows into one row.
181 1
                        prev_row[self._key_end_date] = row[self._key_end_date]
182
                    else:
183
                        # Rows are overlapping but not identical.
184 1
                        prev_row[self._key_end_date] = row[self._key_start_date] - 1
185 1
                        ret.append(prev_row)
186 1
                        prev_row = row
187 1
                elif relation == Allen.X_STARTS_Y:
188
                    # prev_row start row. Should not occur with proper reference data.
189
                    # prev_row: |------|
190
                    # row:      |----------------|
191 1
                    prev_row = row
192 1
                elif relation == Allen.X_EQUAL_Y:
193
                    # Can happen when the reference data sets are joined without respect for date intervals.
194
                    # prev_row: |----------------|
195
                    # row:      |----------------|
196 1
                    prev_row = row
197
                elif relation == Allen.X_DURING_Y_INVERSE:
198
                    # row during prev_row. Should not occur with proper reference data.
199
                    # prev_row: |----------------|
200
                    # row:           |------|
201
                    if not self._equal(prev_row, row):
202
                        prev_row[self._key_end_date] = row[self._key_start_date] - 1
203
                        ret.append(prev_row)
204
                        prev_row = row
205
                        # Note: the interval after row[self._key_end_date] is discarded.
206
207
                    # Note: if the two rows are identical (except for start and end date) nothing to do.
208
                elif relation == Allen.X_FINISHES_Y_INVERSE:
209
                    # row finishes prev_row. Should not occur with proper reference data.
210
                    # prev_row: |----------------|
211
                    # row:                |------|
212
                    if not self._equal(prev_row, row):
213
                        prev_row[self._key_end_date] = row[self._key_start_date] - 1
214
                        ret.append(prev_row)
215
                        prev_row = row
216
217
                    # Note: if the two rows are identical (except for start and end date) nothing to do.
218
                else:
219
                    # Not in _pass3 the rows are sorted such that.
220
                    # prev_row[self._key_begin_date] <= row[self._key_begin_date]. Hence the following relation should
221
                    # not occur: X_FINISHES_Y, X_BEFORE_Y_INVERSE, X_MEETS_Y_INVERSE, X_OVERLAPS_WITH_Y_INVERSE, and
222
                    # X_STARTS_Y_INVERSE. Hence, we covered all 13 relations in Allen's interval algebra.
223
                    raise ValueError('Data is not sorted properly. Relation: %d' % relation)
224
            else:
225 1
                prev_row = row
226
227 1
        if prev_row:
228 1
            ret.append(prev_row)
229
230 1
        return ret
231
232
    # ------------------------------------------------------------------------------------------------------------------
233 1
    def merge(self, keys):
234
        """
235
        Merges the join on natural keys of two or more reference data sets.
236
237
        :param list[tuple[str,str]] keys: For each data set the keys of the start and end date.
238
239
        :rtype: list[dict[str,T]]
240
        """
241 1
        ret = list()
242 1
        self._date_type = ''
243 1
        for rows in self.rows.values():
244 1
            tmp = self._pass1(keys, rows)
245 1
            tmp = self._pass2(keys, tmp)
246 1
            if tmp:
247 1
                tmp = self._pass3(tmp)
248 1
                tmp = self._pass4(tmp)
249 1
                self._rows_int2date(tmp)
250
251 1
            ret.extend(tmp)
252
253 1
        return ret
254
255
# ----------------------------------------------------------------------------------------------------------------------
256