Completed
Push — master ( 545613...55df2f )
by Daniel
14s queued 11s
created

sources.common.DataManipulator   A

Complexity

Total Complexity 23

Size/Duplication

Total Lines 139
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 23
eloc 122
dl 0
loc 139
rs 10
c 0
b 0
f 0

9 Methods

Rating   Name   Duplication   Size   Complexity  
A DataManipulator.__init__() 0 4 1
A DataManipulator.fn_apply_query_to_data_frame() 0 31 4
A DataManipulator.fn_add_minimum_and_maximum_columns_to_data_frame() 0 9 2
A DataManipulator.fn_add_and_shift_column() 0 23 3
A DataManipulator.fn_filter_data_frame_by_index() 0 14 3
A DataManipulator.fn_get_column_index_from_data_frame() 0 7 3
A DataManipulator.fn_get_first_and_last_column_value_from_data_frame() 0 5 1
A DataManipulator.fn_set_shifting_value() 0 6 2
A DataManipulator.fn_filter_data_frame_by_index_internal() 0 18 4
1
"""
2
Data Manipulation class
3
"""
4
# package to add support for multi-language (i18n)
5
import gettext
6
# package to handle files/folders and related metadata/operations
7
import os
8
9
10
class DataManipulator:
11
    locale = None
12
13
    def __init__(self, in_language='en_US'):
14
        current_script = os.path.basename(__file__).replace('.py', '')
15
        lang_folder = os.path.join(os.path.dirname(__file__), current_script + '_Locale')
16
        self.locale = gettext.translation(current_script, lang_folder, languages=[in_language])
17
18
    def fn_add_and_shift_column(self, local_logger, timer, input_data_frame, input_details: list):
19
        evr = 'Empty Values Replacement'
20
        for crt_dict in input_details:
21
            timer.start()
22
            input_data_frame[crt_dict['New Column']] = input_data_frame[crt_dict['Original Column']]
23
            col_offset = self.fn_set_shifting_value(crt_dict)
24
            input_data_frame[crt_dict['New Column']] = \
25
                input_data_frame[crt_dict['New Column']].shift(col_offset)
26
            input_data_frame[crt_dict['New Column']] = \
27
                input_data_frame[crt_dict['New Column']].apply(lambda x: str(x)
28
                                                               .replace('nan', str(crt_dict[evr]))
0 ignored issues
show
introduced by
The variable crt_dict does not seem to be defined in case the for loop on line 20 is not entered. Are you sure this can never be the case?
Loading history...
29
                                                               .replace('.0', ''))
30
            local_logger.info(self.locale.gettext(
31
                'A new column named "{new_column_name}" as copy from "{original_column}" '
32
                + 'then shifted by {shifting_rows} to relevant data frame '
33
                + '(filling any empty value as {empty_values_replacement})')
34
                              .replace('{new_column_name}', crt_dict['New Column'])
35
                              .replace('{original_column}', crt_dict['Original Column'])
36
                              .replace('{shifting_rows}', str(col_offset))
37
                              .replace('{empty_values_replacement}',
38
                                       str(crt_dict['Empty Values Replacement'])))
39
            timer.stop()
40
        return input_data_frame
41
42
    @staticmethod
43
    def fn_add_minimum_and_maximum_columns_to_data_frame(input_data_frame, dict_expression):
44
        grouped_df = input_data_frame.groupby(dict_expression['group_by']) \
45
            .agg({dict_expression['calculation']: ['min', 'max']})
46
        grouped_df.columns = ['_'.join(col).strip() for col in grouped_df.columns.values]
47
        grouped_df = grouped_df.reset_index()
48
        if 'map' in dict_expression:
49
            grouped_df.rename(columns=dict_expression['map'], inplace=True)
50
        return grouped_df
51
52
    def fn_apply_query_to_data_frame(self, local_logger, timer, input_data_frame, extract_params):
53
        timer.start()
54
        query_expression = ''
55
        generic_pre_feedback = self.locale.gettext('Will retain only values {filter_type} '
56
                                                   + '"{filter_values}" within the field '
57
                                                   + '"{column_to_filter}"') \
58
            .replace('{column_to_filter}', extract_params['column_to_filter'])
59
        if extract_params['filter_to_apply'] == 'equal':
60
            local_logger.debug(generic_pre_feedback
61
                               .replace('{filter_type}', self.locale.gettext('equal with'))
62
                               .replace('{filter_values}', extract_params['filter_values']))
63
            query_expression = '`' + extract_params['column_to_filter'] + '` == "' \
64
                               + extract_params['filter_values'] + '"'
65
        elif extract_params['filter_to_apply'] == 'different':
66
            local_logger.debug(generic_pre_feedback
67
                               .replace('{filter_type}', self.locale.gettext('different than'))
68
                               .replace('{filter_values}', extract_params['filter_values']))
69
            query_expression = '`' + extract_params['column_to_filter'] + '` != "' \
70
                               + extract_params['filter_values'] + '"'
71
        elif extract_params['filter_to_apply'] == 'multiple_match':
72
            multiple_values = '["' + '", "'.join(extract_params['filter_values'].values()) + '"]'
73
            local_logger.debug(generic_pre_feedback
74
                               .replace('{filter_type}',
75
                                        self.locale.gettext('matching any of these values'))
76
                               .replace('{filter_values}', multiple_values))
77
            query_expression = '`' + extract_params['column_to_filter'] + '` in ' + multiple_values
78
        local_logger.debug(self.locale.gettext('Query expression to apply is: {query_expression}')
79
                           .replace('{query_expression}', query_expression))
80
        input_data_frame.query(query_expression, inplace=True)
81
        timer.stop()
82
        return input_data_frame
83
84
    def fn_filter_data_frame_by_index(self, local_logger, in_data_frame, filter_rule):
85
        reference_expression = filter_rule['Query Expression for Reference Index']
86
        index_current = in_data_frame.query(reference_expression, inplace=False)
87
        local_logger.info(self.locale.gettext(
88
            'Current index has been determined to be {index_current_value}')
89
                          .replace('{index_current_value}', str(index_current.index)))
90
        if str(index_current.index) != "Int64Index([], dtype='int64')" \
91
                and 'Deviation' in filter_rule:
92
            in_data_frame = self.fn_filter_data_frame_by_index_internal(local_logger, {
93
                'data frame': in_data_frame,
94
                'deviation': filter_rule['Deviation'],
95
                'index': index_current.index,
96
            })
97
        return in_data_frame
98
99
    def fn_filter_data_frame_by_index_internal(self, local_logger, in_dict):
100
        in_data_frame = in_dict['data_frame']
101
        for deviation_type in in_dict['deviation']:
102
            deviation_number = in_dict['deviation'][deviation_type]
103
            index_to_apply = in_dict['index']
104
            if deviation_type == 'Lower':
105
                index_to_apply -= deviation_number
106
                in_data_frame = in_data_frame[in_dict['index'] >= index_to_apply[0]]
107
            elif deviation_type == 'Upper':
108
                index_to_apply += deviation_number
109
                in_data_frame = in_data_frame[in_dict['index'] <= index_to_apply[0]]
110
            local_logger.info(self.locale.gettext(
111
                '{deviation_type} Deviation Number is {deviation_number} '
112
                + 'to be applied to Current index, became {index_to_apply}')
113
                              .replace('{deviation_type}', deviation_type)
114
                              .replace('{deviation_number}', str(deviation_number))
115
                              .replace('{index_to_apply}', str(index_to_apply)))
116
        return in_dict['data_frame']
117
118
    @staticmethod
119
    def fn_get_column_index_from_data_frame(data_frame_columns, column_name_to_identify):
120
        column_index_to_return = 0
121
        for ndx, column_name in enumerate(data_frame_columns):
122
            if column_name == column_name_to_identify:
123
                column_index_to_return = ndx
124
        return column_index_to_return
125
126
    @staticmethod
127
    def fn_get_first_and_last_column_value_from_data_frame(in_data_frame, in_column_name):
128
        return {
129
            'first': in_data_frame.iloc[0][in_column_name],
130
            'last': in_data_frame.iloc[(len(in_data_frame) - 1)][in_column_name],
131
        }
132
133
    @staticmethod
134
    def fn_set_shifting_value(in_dict):
135
        offset_sign = 1
136
        if in_dict['Direction'] == 'up':
137
            offset_sign = -1
138
        return offset_sign * in_dict['Deviation']
139