Type2CondenseHelper.condense()   A
last analyzed

Complexity

Conditions 3

Size

Total Lines 14
Code Lines 10

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 9
CRAP Score 3

Importance

Changes 0
Metric Value
cc 3
eloc 10
nop 1
dl 0
loc 14
ccs 9
cts 9
cp 1
crap 3
rs 9.9
c 0
b 0
f 0
1 1
from typing import Any, Dict, List, Optional, Set, Tuple
2 1
3
from etlt.helper.Allen import Allen
4
from etlt.helper.Type2Helper import Type2Helper
5 1
6
7
class Type2CondenseHelper(Type2Helper):
8
    """
9
    A helper class for deriving the distinct intervals in reference data with date intervals.
10
11
    A typical use case for this class is aggregate the reference data for a type 2 dimension into the reference data
12
    for another type 2 dimension at a higher in the dimension hierarchy.
13
    """
14 1
15 1
    # ------------------------------------------------------------------------------------------------------------------
16
    @staticmethod
17
    def _distinct(row1: Tuple[int, int], row2: Tuple[int, int]) -> Optional[List[Tuple[int, int]]]:
18
        """
19
        Returns a list of distinct (or none overlapping) intervals if two intervals are overlapping. Returns None if
20
        the two intervals are none overlapping. The list can have 2 or 3 intervals.
21
22
        :param row1: The first interval.
23
        :param row2: The second interval.
24
        """
25 1
        relation = Allen.relation(row1[0], row1[1], row2[0], row2[1])
26
27 1
        if relation is None:
28
            # One of the 2 intervals is invalid.
29
            return []
30
31 1
        if relation == Allen.X_BEFORE_Y:
32
            # row1: |----|
33
            # row2:            |-----|
34 1
            return None  # [(row1[0], row1[1]), (row2[0], row2[1])]
35
36 1
        if relation == Allen.X_BEFORE_Y_INVERSE:
37
            # row1:            |-----|
38
            # row2: |----|
39 1
            return None  # [(row2[0], row2[1]), (row1[0], row1[1])]
40
41 1
        if relation == Allen.X_MEETS_Y:
42
            # row1: |-------|
43
            # row2:          |-------|
44 1
            return None  # [(row1[0], row1[1]), (row2[0], row2[1])]
45
46 1
        if relation == Allen.X_MEETS_Y_INVERSE:
47
            # row1:          |-------|
48
            # row2: |-------|
49 1
            return None  # [(row2[0], row2[1]), (row1[0], row1[1])]
50
51 1
        if relation == Allen.X_OVERLAPS_WITH_Y:
52
            # row1: |-----------|
53
            # row2:       |----------|
54 1
            return [(row1[0], row2[0] - 1), (row2[0], row1[1]), (row1[1] + 1, row2[1])]
55
56 1
        if relation == Allen.X_OVERLAPS_WITH_Y_INVERSE:
57
            # row1:       |----------|
58
            # row2: |-----------|
59 1
            return [(row2[0], row1[0] - 1), (row1[0], row2[1]), (row2[1] + 1, row1[1])]
60
61 1
        if relation == Allen.X_STARTS_Y:
62
            # row1: |------|
63
            # row2: |----------------|
64
            return [(row1[0], row1[1]), (row1[1] + 1, row2[1])]
65
66 1
        if relation == Allen.X_STARTS_Y_INVERSE:
67
            # row1: |----------------|
68
            # row2: |------|
69 1
            return [(row2[0], row2[1]), (row2[1] + 1, row1[1])]
70
71 1
        if relation == Allen.X_DURING_Y:
72
            # row1:      |------|
73
            # row2: |----------------|
74 1
            return [(row2[0], row1[0] - 1), (row1[0], row1[1]), (row1[1] + 1, row2[1])]
75
76 1
        if relation == Allen.X_DURING_Y_INVERSE:
77
            # row1: |----------------|
78
            # row2:      |------|
79 1
            return [(row1[0], row2[0] - 1), (row2[0], row2[1]), (row2[1] + 1, row1[1])]
80
81 1
        if relation == Allen.X_FINISHES_Y:
82
            # row1:           |------|
83
            # row2: |----------------|
84 1
            return [(row2[0], row1[0] - 1), (row1[0], row1[1])]
85
86 1
        if relation == Allen.X_FINISHES_Y_INVERSE:
87
            # row1: |----------------|
88
            # row2:           |------|
89 1
            return [(row1[0], row2[0] - 1), (row2[0], row2[1])]
90
91 1
        if relation == Allen.X_EQUAL_Y:
92
            # row1: |----------------|
93
            # row2: |----------------|
94 1
            return None  # [(row1[0], row1[1])]
95
96
        # We got all 13 relation in Allen's interval algebra covered.
97
        raise ValueError('Unexpected relation {0}'.format(relation))
98
99
    # ------------------------------------------------------------------------------------------------------------------
100 1
    @staticmethod
101 1
    def _add_interval(all_intervals: Set[Tuple[int, int]], new_interval: Tuple[int, int]) -> None:
102
        """
103
        Adds a new interval to a set of none overlapping intervals.
104
105
        :param all_intervals: The set of distinct intervals.
106
        :param new_interval: The new interval.
107
        """
108 1
        intervals = None
109 1
        old_interval = None
110 1
        for old_interval in all_intervals:
111 1
            intervals = Type2CondenseHelper._distinct(new_interval, old_interval)
112 1
            if intervals:
113 1
                break
114
115 1
        if intervals is None:
116 1
            all_intervals.add(new_interval)
117
        else:
118 1
            if old_interval:
119 1
                all_intervals.remove(old_interval)
120 1
            for distinct_interval in intervals:
121 1
                Type2CondenseHelper._add_interval(all_intervals, distinct_interval)
122
123
    # ------------------------------------------------------------------------------------------------------------------
124 1
    def _derive_distinct_intervals(self, rows: List[Dict[str, Any]]) -> Set[Tuple[int, int]]:
125
        """
126
        Returns the set of distinct intervals in a row set.
127
128
        :param rows: The rows set.
129
        """
130
        ret = set()
131
        for row in rows:
132 1
            self._add_interval(ret, (row[self._key_start_date], row[self._key_end_date]))
133 1
134 1
        return ret
135
136 1
    # ------------------------------------------------------------------------------------------------------------------
137
    def condense(self) -> None:
138
        """
139 1
        Condense the data set to the distinct intervals based on the pseudo key.
140
        """
141
        for pseudo_key, rows in self._rows.items():
142
            tmp1 = []
143 1
            intervals = sorted(self._derive_distinct_intervals(rows))
144 1
            for interval in intervals:
145 1
                tmp2 = dict(zip(self._pseudo_key, pseudo_key))
146 1
                tmp2[self._key_start_date] = interval[0]
147 1
                tmp2[self._key_end_date] = interval[1]
148 1
                tmp1.append(tmp2)
149 1
150 1
            self._rows[pseudo_key] = tmp1
151
152
# ----------------------------------------------------------------------------------------------------------------------
153