Passed
Push — development/test ( 5eb869...ce11cc )
by Daniel
01:08
created

sources.common.DataManipulator   A

Complexity

Total Complexity 23

Size/Duplication

Total Lines 156
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 23
eloc 122
dl 0
loc 156
rs 10
c 0
b 0
f 0

9 Methods

Rating   Name   Duplication   Size   Complexity  
A DataManipulator.__init__() 0 4 1
A DataManipulator.fn_apply_query_to_data_frame() 0 31 4
A DataManipulator.fn_add_minimum_and_maximum_columns_to_data_frame() 0 9 2
A DataManipulator.fn_filter_data_frame_by_index() 0 31 3
A DataManipulator.fn_get_column_index_from_data_frame() 0 7 3
A DataManipulator.fn_add_and_shift_column() 0 23 3
A DataManipulator.fn_get_first_and_last_column_value_from_data_frame() 0 5 1
A DataManipulator.fn_set_shifting_value() 0 6 2
A DataManipulator.fn_filter_data_frame_by_index_internal() 0 18 4
1
"""
2
Data Manipulation class
3
"""
4
# package to add support for multi-language (i18n)
5
import gettext
6
# package to handle files/folders and related metadata/operations
7
import os
8
9
10
class DataManipulator:
11
    lcl = None
12
13
    def __init__(self, default_language='en_US'):
14
        current_script = os.path.basename(__file__).replace('.py', '')
15
        lang_folder = os.path.join(os.path.dirname(__file__), current_script + '_Locale')
16
        self.lcl = gettext.translation(current_script, lang_folder, languages=[default_language])
17
18
    def fn_add_and_shift_column(self, local_logger, timmer, input_data_frame, input_details: list):
19
        for current_dict in input_details:
20
            c_dict = current_dict
21
            timmer.start()
22
            input_data_frame[c_dict['New Column']] = input_data_frame[c_dict['Original Column']]
23
            col_offset = self.fn_set_shifting_value(current_dict)
24
            input_data_frame[c_dict['New Column']] = \
25
                input_data_frame[c_dict['New Column']].shift(col_offset)
26
            input_data_frame[c_dict['New Column']] = \
27
                input_data_frame[c_dict['New Column']]\
28
                    .apply(lambda x: str(x).replace('nan', str(c_dict['Empty Values Replacement']))
0 ignored issues
show
introduced by
The variable c_dict does not seem to be defined in case the for loop on line 19 is not entered. Are you sure this can never be the case?
Loading history...
29
                           .replace('.0', ''))
30
            local_logger.info(self.lcl.gettext(
31
                'A new column named "{new_column_name}" as copy from "{original_column}" '
32
                + 'then shifted by {shifting_rows} to relevant data frame '
33
                + '(filling any empty value as {empty_values_replacement})')
34
                              .replace('{new_column_name}', c_dict['New Column'])
35
                              .replace('{original_column}', c_dict['Original Column'])
36
                              .replace('{shifting_rows}', str(col_offset))
37
                              .replace('{empty_values_replacement}',
38
                                       str(c_dict['Empty Values Replacement'])))
39
            timmer.stop()
40
        return input_data_frame
41
42
    @staticmethod
43
    def fn_add_minimum_and_maximum_columns_to_data_frame(input_data_frame, dict_expression):
44
        grouped_df = input_data_frame.groupby(dict_expression['group_by']) \
45
            .agg({dict_expression['calculation']: ['min', 'max']})
46
        grouped_df.columns = ['_'.join(col).strip() for col in grouped_df.columns.values]
47
        grouped_df = grouped_df.reset_index()
48
        if 'map' in dict_expression:
49
            grouped_df.rename(columns=dict_expression['map'], inplace=True)
50
        return grouped_df
51
52
    def fn_apply_query_to_data_frame(self, local_logger, timmer, input_data_frame, extract_params):
53
        timmer.start()
54
        query_expression = ''
55
        generic_pre_feedback = self.lcl.gettext('Will retain only values {filter_type} '
56
                                                + '"{filter_values}" within the field '
57
                                                + '"{column_to_filter}"') \
58
            .replace('{column_to_filter}', extract_params['column_to_filter'])
59
        if extract_params['filter_to_apply'] == 'equal':
60
            local_logger.debug(generic_pre_feedback
61
                               .replace('{filter_type}', self.lcl.gettext('equal with'))
62
                               .replace('{filter_values}', extract_params['filter_values']))
63
            query_expression = '`' + extract_params['column_to_filter'] + '` == "' \
64
                               + extract_params['filter_values'] + '"'
65
        elif extract_params['filter_to_apply'] == 'different':
66
            local_logger.debug(generic_pre_feedback
67
                               .replace('{filter_type}', self.lcl.gettext('different than'))
68
                               .replace('{filter_values}', extract_params['filter_values']))
69
            query_expression = '`' + extract_params['column_to_filter'] + '` != "' \
70
                               + extract_params['filter_values'] + '"'
71
        elif extract_params['filter_to_apply'] == 'multiple_match':
72
            multiple_values = '["' + '", "'.join(extract_params['filter_values'].values()) + '"]'
73
            local_logger.debug(generic_pre_feedback
74
                               .replace('{filter_type}',
75
                                        self.lcl.gettext('matching any of these values'))
76
                               .replace('{filter_values}', multiple_values))
77
            query_expression = '`' + extract_params['column_to_filter'] + '` in ' + multiple_values
78
        local_logger.debug(self.lcl.gettext('Query expression to apply is: {query_expression}')
79
                           .replace('{query_expression}', query_expression))
80
        input_data_frame.query(query_expression, inplace=True)
81
        timmer.stop()
82
        return input_data_frame
83
84
    def fn_filter_data_frame_by_index(self, local_logger, in_data_frame, filter_rule):
85
        reference_expression = filter_rule['Query Expression for Reference Index']
86
        index_current = in_data_frame.query(reference_expression, inplace=False)
87
        local_logger.info(self.lcl.gettext(
88
            'Current index has been determined to be {index_current_value}')
89
                          .replace('{index_current_value}', str(index_current.index)))
90
        if str(index_current.index) != "Int64Index([], dtype='int64')" \
91
                and 'Deviation' in filter_rule:
92
            in_data_frame = self.fn_filter_data_frame_by_index_internal(local_logger, {
93
                'data frame': in_data_frame,
94
                'deviation': filter_rule['Deviation'],
95
                'index': index_current.index,
96
            })
97
            '''
98
            for deviation_type in filter_rule['Deviation']:
99
                deviation_number = filter_rule['Deviation'][deviation_type]
100
                index_to_apply = index_current.index
101
                if deviation_type == 'Lower':
102
                    index_to_apply = index_current.index - deviation_number
103
                    in_data_frame = in_data_frame[in_data_frame.index >= index_to_apply[0]]
104
                elif deviation_type == 'Upper':
105
                    index_to_apply = index_current.index + deviation_number
106
                    in_data_frame = in_data_frame[in_data_frame.index <= index_to_apply[0]]
107
                local_logger.info(self.lcl.gettext(
108
                    '{deviation_type} Deviation Number is {deviation_number} '
109
                    + 'to be applied to Current index, became {index_to_apply}')
110
                                  .replace('{deviation_type}', deviation_type)
111
                                  .replace('{deviation_number}', str(deviation_number))
112
                                  .replace('{index_to_apply}', str(index_to_apply)))
113
            '''
114
        return in_data_frame
115
116
    def fn_filter_data_frame_by_index_internal(self, local_logger, in_dict):
117
        in_data_frame = in_dict['data_frame']
118
        for deviation_type in in_dict['deviation']:
119
            deviation_number = in_dict['deviation'][deviation_type]
120
            index_to_apply = in_dict['index']
121
            if deviation_type == 'Lower':
122
                index_to_apply -= deviation_number
123
                in_data_frame = in_data_frame[in_dict['index'] >= index_to_apply[0]]
124
            elif deviation_type == 'Upper':
125
                index_to_apply += deviation_number
126
                in_data_frame = in_data_frame[in_dict['index'] <= index_to_apply[0]]
127
            local_logger.info(self.lcl.gettext(
128
                '{deviation_type} Deviation Number is {deviation_number} '
129
                + 'to be applied to Current index, became {index_to_apply}')
130
                              .replace('{deviation_type}', deviation_type)
131
                              .replace('{deviation_number}', str(deviation_number))
132
                              .replace('{index_to_apply}', str(index_to_apply)))
133
        return in_dict['data_frame']
134
135
    @staticmethod
136
    def fn_get_column_index_from_data_frame(data_frame_columns, column_name_to_identify):
137
        column_index_to_return = 0
138
        for ndx, column_name in enumerate(data_frame_columns):
139
            if column_name == column_name_to_identify:
140
                column_index_to_return = ndx
141
        return column_index_to_return
142
143
    @staticmethod
144
    def fn_get_first_and_last_column_value_from_data_frame(in_data_frame, in_column_name):
145
        return {
146
            'first': in_data_frame.iloc[0][in_column_name],
147
            'last': in_data_frame.iloc[(len(in_data_frame) - 1)][in_column_name],
148
        }
149
150
    @staticmethod
151
    def fn_set_shifting_value(in_dict):
152
        offset_sign = 1
153
        if in_dict['Direction'] == 'up':
154
            offset_sign = -1
155
        return offset_sign * in_dict['Deviation']
156