Completed
Push — master ( 524000...90a6df )
by P.R.
01:47
created

Type2JoinHelper   A

Complexity

Total Complexity 35

Size/Duplication

Total Lines 200
Duplicated Lines 0 %

Test Coverage

Coverage 98.88%

Importance

Changes 1
Bugs 0 Features 0
Metric Value
wmc 35
c 1
b 0
f 0
dl 0
loc 200
ccs 88
cts 89
cp 0.9888
rs 9

7 Methods

Rating   Name   Duplication   Size   Complexity  
B _pass1() 0 29 6
A _equal() 0 15 4
A merge() 0 21 3
A _pass3() 0 9 2
C _pass2() 0 32 7
D _pass4() 0 57 11
A _intersect() 0 19 2
1
"""
2
ETLT
3
4
Copyright 2016 Set Based IT Consultancy
5
6
Licence MIT
7
"""
8 1
import copy
9 1
import datetime
0 ignored issues
show
Unused Code introduced by
The import datetime seems to be unused.
Loading history...
10
11 1
from etlt.helper.Type2Helper import Type2Helper
12
13 1
from etlt.helper.Allen import Allen
14
15
16 1
class Type2JoinHelper(Type2Helper):
17
    """
18
    A helper class for joining data sets with date intervals.
19
    """
20
21
    # ------------------------------------------------------------------------------------------------------------------
22 1
    def _equal(self, row1, row2):
23
        """
24
        Returns True if two rows are identical excluding start and end date. Returns False otherwise.
25
26
        :param dict[str,T] row1: The first row.
27
        :param dict[str,T] row2: The second row.
28
29
        :rtype: bool
30
        """
31 1
        for key in row1.keys():
32 1
            if key not in [self._key_start_date, self._key_end_date]:
33 1
                if row1[key] != row2[key]:
34 1
                    return False
35
36 1
        return True
37
38
    # ------------------------------------------------------------------------------------------------------------------
39 1
    @staticmethod
40
    def _intersect(start1, end1, start2, end2):
41
        """
42
        Returns the intersection of two intervals. Returns (None,None) if the intersection is empty.
43
44
        :param int start1: The start date of the first interval.
45
        :param int end1: The end date of the first interval.
46
        :param int start2: The start date of the second interval.
47
        :param int end2: The end date of the second interval.
48
49
        :rtype: tuple[int|None,int|None]
50
        """
51 1
        start = max(start1, start2)
52 1
        end = min(end1, end2)
53
54 1
        if start > end:
55 1
            return None, None
56
57 1
        return start, end
58
59
    # ------------------------------------------------------------------------------------------------------------------
60 1
    def _pass1(self, keys, rows):
61
        """
62
        Replaces start and end dates in the row set with their integer representation
63
64
        :param list[tuple[str,str]] keys: The other keys with start and end date.
65
        :param list[dict[str,T]] rows: The list of rows.
66
67
        :rtype: list[dict[str,T]]
68
        """
69 1
        ret = list()
70 1
        for row in rows:
71
            # Make a copy of the row such that self._rows is not affected by merge.
72 1
            tmp = copy.copy(row)
73
74
            # Determine the type of dates based on the first start date.
75 1
            if not self._date_type:
76 1
                self._date_type = self._get_date_type(tmp[self._key_start_date])
77
78
            # Convert dates to integers.
79 1
            tmp[self._key_start_date] = self._date2int(tmp[self._key_start_date])
80 1
            tmp[self._key_end_date] = self._date2int(tmp[self._key_end_date])
81 1
            for key_start_date, key_end_date in keys:
82 1
                if key_start_date != self._key_start_date:
83 1
                    tmp[key_start_date] = self._date2int(tmp[key_start_date])
84 1
                if key_end_date != self._key_end_date:
85 1
                    tmp[key_end_date] = self._date2int(tmp[key_end_date])
86 1
            ret.append(tmp)
87
88 1
        return ret
89
90
    # ------------------------------------------------------------------------------------------------------------------
91 1
    def _pass2(self, keys, rows):
92
        """
93
        Computes the intersection of the date intervals of two or more reference data sets. If the intersection is empty
94
        the row is removed from the group.
95
96
        :param list[tuple[str,str]] keys: The other keys with start and end date.
97
        :param list[dict[str,T]] rows: The list of rows.
98
99
        :rtype: list[dict[str,T]]
100
        """
101 1
        ret = list()
102 1
        for row in rows:
103 1
            start_date = row[self._key_start_date]
104 1
            end_date = row[self._key_end_date]
105 1
            for key_start_date, key_end_date in keys:
106 1
                start_date, end_date = Type2JoinHelper._intersect(start_date,
107
                                                                  end_date,
108
                                                                  row[key_start_date],
109
                                                                  row[key_end_date])
110 1
                if not start_date:
111 1
                    break
112 1
                if self._key_start_date != key_start_date:
113 1
                    del row[key_start_date]
114 1
                if self._key_end_date != key_end_date:
115 1
                    del row[key_end_date]
116
117 1
            if start_date:
118 1
                row[self._key_start_date] = start_date
119 1
                row[self._key_end_date] = end_date
120 1
                ret.append(row)
121
122 1
        return ret
123
124
    # ------------------------------------------------------------------------------------------------------------------
125 1
    def _pass3(self, rows):
126
        """
127
        Returns a list of rows sorted by start and end date.
128
129
        :param list[dict[str,T]] rows: The list of rows.
130
131
        :rtype: list[dict[str,T]]
132
        """
133 1
        return sorted(rows, key=lambda row: (row[self._key_start_date], row[self._key_end_date]))
134
135
    # ------------------------------------------------------------------------------------------------------------------
136 1
    def _pass4(self, rows):
137
        """
138
        Merges adjacent and overlapping rows in the same group (i.e. with the same natural key).
139
140
        :param list[dict[str,T]] rows: The rows in a group (i.e. with the same natural key).
141
        .
142
        :rtype: list[dict[str,T]]
143
        """
144 1
        ret = list()
145
146 1
        prev_row = None
147 1
        for row in rows:
148 1
            if prev_row:
149 1
                relation = Allen.relation(prev_row[self._key_start_date],
150
                                          prev_row[self._key_end_date],
151
                                          row[self._key_start_date],
152
                                          row[self._key_end_date])
153 1
                if relation == Allen.X_BEFORE_Y:
154
                    # Two rows with distinct intervals.
155 1
                    ret.append(prev_row)
156 1
                    prev_row = row
157 1
                elif relation == Allen.X_MEETS_Y:
158
                    # The two rows are adjacent.
159 1
                    if self._equal(prev_row, row):
160
                        # The two rows are identical (except for start and end date) and adjacent. Combine the two rows
161
                        # into one row.
162 1
                        prev_row[self._key_end_date] = row[self._key_end_date]
163
                    else:
164
                        # Rows are adjacent but not identical.
165 1
                        ret.append(prev_row)
166 1
                        prev_row = row
167 1
                elif relation == Allen.X_OVERLAPS_WITH_Y:
168
                    # Should not occur with proper reference data.
169 1
                    if self._equal(prev_row, row):
170
                        # The two rows are identical (except for start and end date) and overlapping. Combine the two
171
                        # rows into one row.
172 1
                        prev_row[self._key_end_date] = row[self._key_end_date]
173
                    else:
174
                        # Rows are overlapping but not identical.
175 1
                        prev_row[self._key_end_date] = row[self._key_start_date] - 1
176 1
                        ret.append(prev_row)
177 1
                        prev_row = row
178 1
                elif relation == Allen.X_STARTS_Y:
179
                    # Should not occur with proper reference data.
180 1
                    prev_row = row
181 1
                elif relation == Allen.X_EQUAL_Y:
182
                    # Can happen when the reference data sets are joined without respect for date intervals.
183 1
                    prev_row = row
184
                else:
185
                    raise ValueError('Data is not sorted properly')
186
            else:
187 1
                prev_row = row
188
189 1
        if prev_row:
190 1
            ret.append(prev_row)
191
192 1
        return ret
193
194
    # ------------------------------------------------------------------------------------------------------------------
195 1
    def merge(self, keys):
196
        """
197
        Merges the join on natural keys of two or more reference data sets.
198
199
        :param list[tuple[str,str]] keys: For each data set the keys of the start and end date.
200
201
        :rtype: list[dict[str,T]]
202
        """
203 1
        ret = list()
204 1
        self._date_type = ''
205 1
        for rows in self.rows.values():
206 1
            tmp = self._pass1(keys, rows)
207 1
            tmp = self._pass2(keys, tmp)
208 1
            if tmp:
209 1
                tmp = self._pass3(tmp)
210 1
                tmp = self._pass4(tmp)
211 1
                self._rows_int2date(tmp)
212
213 1
            ret.extend(tmp)
214
215 1
        return ret
216
217
# ----------------------------------------------------------------------------------------------------------------------
218