Completed
Push — master ( 524000...90a6df )
by P.R.
01:47
created

Type2CondenseHelper   A

Complexity

Total Complexity 23

Size/Duplication

Total Lines 119
Duplicated Lines 0 %

Test Coverage

Coverage 93.1%

Importance

Changes 1
Bugs 0 Features 0
Metric Value
wmc 23
c 1
b 0
f 0
dl 0
loc 119
ccs 54
cts 58
cp 0.931
rs 10

4 Methods

Rating   Name   Duplication   Size   Complexity  
B _add() 0 25 6
F _distinct() 0 48 12
A _derive_distinct_intervals() 0 13 2
A condense() 0 21 3
1
"""
2
ETLT
3
4
Copyright 2016 Set Based IT Consultancy
5
6
Licence MIT
7
"""
8 1
import copy
9 1
import datetime
0 ignored issues
show
Unused Code introduced by
The import datetime seems to be unused.
Loading history...
10
11 1
from etlt.helper.Allen import Allen
12 1
from etlt.helper.Type2Helper import Type2Helper
13
14
15 1
class Type2CondenseHelper(Type2Helper):
16
    """
17
    A helper class for deriving the distinct intervals in reference data with date intervals.
18
    """
19
20
    # ------------------------------------------------------------------------------------------------------------------
21 1
    @staticmethod
22
    def _distinct(start1, end1, start2, end2):
23
        """
24
        Returns a list of distinct (or none overlapping) intervals if two intervals are not distinct. Returns None if
25
        the two intervals are distinct. The list can have 2 or 3 intervals.
26
27
        :param int start1: The start date of the first interval.
28
        :param int end1: The end date of the first interval.
29
        :param int start2: The start date of the second interval.
30
        :param int end2: The end date of the second interval.
31
32
        :rtype: None|tuple[datetime.date|None,datetime.date|None]
33
        """
34 1
        relation = Allen.relation(start1, end1, start2, end2)
35 1
        if relation in [Allen.X_BEFORE_Y, Allen.X_MEETS_Y]:
36 1
            return None  # [(start1, end1), (start2, end2)]
37
38 1
        if relation in [Allen.X_BEFORE_Y_INVERSE, Allen.X_MEETS_Y_INVERSE]:
39 1
            return None  # [(start2, end2), (start1, end1)]
40
41 1
        if relation == Allen.X_OVERLAPS_WITH_Y:
42
            return [(start1, start2 - 1), (start2, end1), (end1 + 1, end2)]
43
44 1
        if relation == Allen.X_OVERLAPS_WITH_Y_INVERSE:
45 1
            return [(start2, start1 - 1), (start1, end2), (end2 + 1, end1)]
46
47 1
        if relation == Allen.X_STARTS_Y:
48
            return [(start1, end1), (end1 + 1, end2)]
49
50 1
        if relation == Allen.X_STARTS_Y_INVERSE:
51 1
            return [(start2, end2), (end2 + 1, end1)]
52
53 1
        if relation == Allen.X_DURING_Y:
54 1
            return [(start2, start1 - 1), (start1, end1), (end1 + 1, end2)]
55
56 1
        if relation == Allen.X_DURING_Y_INVERSE:
57 1
            return [(start1, start2 - 1), (start2, end2), (end2 + 1, end1)]
58
59 1
        if relation == Allen.X_FINISHES_Y:
60
            return [(start2, start1 - 1), (start1, end1)]
61
62 1
        if relation == Allen.X_FINISHES_Y_INVERSE:
63 1
            return [(start1, start2 - 1), (start2, end2)]
64
65 1
        if relation == Allen.X_EQUAL_Y:
66 1
            return None  # [(start1, end1)]
67
68
        raise ValueError('Unexpected relation %d' % relation)
69
70
    # ------------------------------------------------------------------------------------------------------------------
71 1
    @staticmethod
72
    def _add(all_intervals, new_interval):
73
        """
74
        Adds a new interval to a set of distinct intervals.
75
76
        :param set[(int,int)] all_intervals: The set of distinct intervals.
77
        :param (int,int) new_interval: The new interval.
78
        """
79 1
        intervals = None
80 1
        old_interval = None
81 1
        for old_interval in all_intervals:
82 1
            intervals = Type2CondenseHelper._distinct(new_interval[0],
83
                                                      new_interval[1],
84
                                                      old_interval[0],
85
                                                      old_interval[1])
86 1
            if intervals:
87 1
                break
88
89 1
        if intervals:
90 1
            if old_interval:
91 1
                all_intervals.remove(old_interval)
92 1
            for distinct_interval in intervals:
93 1
                Type2CondenseHelper._add(all_intervals, distinct_interval)
94
        else:
95 1
            all_intervals.add(new_interval)
96
97
    # ------------------------------------------------------------------------------------------------------------------
98 1
    def _derive_distinct_intervals(self, rows):
99
        """
100
        Returns the set of distinct intervals in a row set.
101
102
        :param list[dict[str,T]] rows: The rows set.
103
104
        :rtype: set[(int,int)]
105
        """
106 1
        ret = set()
107 1
        for row in rows:
108 1
            self._add(ret, (row[self._key_start_date], row[self._key_end_date]))
109
110 1
        return ret
111
112
    # ------------------------------------------------------------------------------------------------------------------
113 1
    def condense(self):
114
        """
115
        Returns the data set condensed to the distinct intervals based on the natural key.
116
117
        :rtype: list[dict[str,T]]
118
        """
119 1
        ret = list()
120 1
        self._date_type = ''
121 1
        for rows in self.rows.values():
122 1
            tmp1 = self._rows_date2int(rows)
123 1
            tmp2 = self._derive_distinct_intervals(tmp1)
124 1
            tmp2 = sorted(tmp2)
125 1
            for tmp3 in tmp2:
126 1
                tmp4 = copy.copy(rows[0])
127 1
                tmp4[self._key_start_date] = tmp3[0]
128 1
                tmp4[self._key_end_date] = tmp3[1]
129 1
                ret.append(tmp4)
130
131 1
        self._rows_int2date(ret)
132
133 1
        return ret
134
135
# ----------------------------------------------------------------------------------------------------------------------
136