Passed
Push — development/test ( 9ec880...051b28 )
by Daniel
01:17
created

DataManipulator.__init__()   A

Complexity

Conditions 1

Size

Total Lines 4
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 4
nop 2
dl 0
loc 4
rs 10
c 0
b 0
f 0
1
"""
2
Data Manipulation class
3
"""
4
# package to add support for multi-language (i18n)
5
import gettext
6
# package to handle files/folders and related metadata/operations
7
import os
8
9
10
class DataManipulator:
11
    lcl = None
12
13
    def __init__(self, default_language='en_US'):
14
        current_script = os.path.basename(__file__).replace('.py', '')
15
        lang_folder = os.path.join(os.path.dirname(__file__), current_script + '_Locale')
16
        self.lcl = gettext.translation(current_script, lang_folder, languages=[default_language])
17
18
    def fn_add_and_shift_column(self, local_logger, timmer, input_data_frame, input_details):
19
        for in_dt in input_details:
20
            timmer.start()
21
            input_data_frame[in_dt['New Column']] = input_data_frame[in_dt['Original Column']]
22
            offset_sign = (lambda x: 1 if x == 'down' else -1)
23
            col_offset = offset_sign(in_dt['Direction']) * in_dt['Deviation']
24
            input_data_frame[in_dt['New Column']] = input_data_frame[in_dt['New Column']]\
25
                .shift(col_offset)
26
            input_data_frame[in_dt['New Column']] = input_data_frame[in_dt['New Column']]\
27
                .apply(lambda x: str(x).replace('.0', ''))\
28
                .apply(lambda x: str(x).replace('nan', str(in_dt['Empty Values Replacement'])))
0 ignored issues
show
introduced by
The variable in_dt does not seem to be defined in case the for loop on line 19 is not entered. Are you sure this can never be the case?
Loading history...
29
            local_logger.info(self.lcl.gettext(
30
                'A new column named "{new_column_name}" as copy from "{original_column}" '
31
                + 'then shifted by {shifting_rows} to relevant data frame '
32
                + '(filling any empty value as {empty_values_replacement})')
33
                              .replace('{new_column_name}', in_dt['New Column'])
34
                              .replace('{original_column}', in_dt['Original Column'])
35
                              .replace('{shifting_rows}', str(col_offset))
36
                              .replace('{empty_values_replacement}',
37
                                       str(in_dt['Empty Values Replacement'])))
38
            timmer.stop()
39
        return input_data_frame
40
41
    @staticmethod
42
    def fn_add_minimum_and_maximum_columns_to_data_frame(input_data_frame, dict_expression):
43
        grouped_df = input_data_frame.groupby(dict_expression['group_by']) \
44
            .agg({dict_expression['calculation']: ['min', 'max']})
45
        grouped_df.columns = ['_'.join(col).strip() for col in grouped_df.columns.values]
46
        grouped_df = grouped_df.reset_index()
47
        if 'map' in dict_expression:
48
            grouped_df.rename(columns=dict_expression['map'], inplace=True)
49
        return grouped_df
50
51
    def fn_apply_query_to_data_frame(self, local_logger, timmer, input_data_frame, extract_params):
52
        timmer.start()
53
        query_expression = ''
54
        generic_pre_feedback = self.lcl.gettext('Will retain only values {filter_type} '
55
                                                + '"{filter_values}" within the field '
56
                                                + '"{column_to_filter}"') \
57
            .replace('{column_to_filter}', extract_params['column_to_filter'])
58
        if extract_params['filter_to_apply'] == 'equal':
59
            local_logger.debug(generic_pre_feedback
60
                               .replace('{filter_type}', self.lcl.gettext('equal with'))
61
                               .replace('{filter_values}', extract_params['filter_values']))
62
            query_expression = '`' + extract_params['column_to_filter'] + '` == "' \
63
                               + extract_params['filter_values'] + '"'
64
        elif extract_params['filter_to_apply'] == 'different':
65
            local_logger.debug(generic_pre_feedback
66
                               .replace('{filter_type}', self.lcl.gettext('different than'))
67
                               .replace('{filter_values}', extract_params['filter_values']))
68
            query_expression = '`' + extract_params['column_to_filter'] + '` != "' \
69
                               + extract_params['filter_values'] + '"'
70
        elif extract_params['filter_to_apply'] == 'multiple_match':
71
            multiple_values = '["' + '", "'.join(extract_params['filter_values'].values()) + '"]'
72
            local_logger.debug(generic_pre_feedback
73
                               .replace('{filter_type}',
74
                                        self.lcl.gettext('matching any of these values'))
75
                               .replace('{filter_values}', multiple_values))
76
            query_expression = '`' + extract_params['column_to_filter'] + '` in ' + multiple_values
77
        local_logger.debug(self.lcl.gettext('Query expression to apply is: {query_expression}')
78
                           .replace('{query_expression}', query_expression))
79
        input_data_frame.query(query_expression, inplace=True)
80
        timmer.stop()
81
        return input_data_frame
82
83
    def fn_filter_data_frame_by_index(self, local_logger, in_data_frame, filter_rule):
84
        reference_expression = filter_rule['Query Expression for Reference Index']
85
        index_current = in_data_frame.query(reference_expression, inplace=False)
86
        local_logger.info(self.lcl.gettext(
87
            'Current index has been determined to be {index_current_value}')
88
                          .replace('{index_current_value}', str(index_current.index)))
89
        if str(index_current.index) != "Int64Index([], dtype='int64')" \
90
                and 'Deviation' in filter_rule:
91
            for deviation_type in filter_rule['Deviation']:
92
                deviation_number = filter_rule['Deviation'][deviation_type]
93
                index_to_apply = index_current.index
94
                if deviation_type == 'Lower':
95
                    index_to_apply = index_current.index - deviation_number
96
                    in_data_frame = in_data_frame[in_data_frame.index >= index_to_apply[0]]
97
                elif deviation_type == 'Upper':
98
                    index_to_apply = index_current.index + deviation_number
99
                    in_data_frame = in_data_frame[in_data_frame.index <= index_to_apply[0]]
100
                local_logger.info(self.lcl.gettext(
101
                    '{deviation_type} Deviation Number is {deviation_number} '
102
                    + 'to be applied to Current index, became {index_to_apply}')
103
                                  .replace('{deviation_type}', deviation_type)
104
                                  .replace('{deviation_number}', str(deviation_number))
105
                                  .replace('{index_to_apply}', str(index_to_apply)))
106
        return in_data_frame
107
108
    @staticmethod
109
    def fn_get_column_index_from_dataframe(data_frame_columns, column_name_to_identify):
110
        column_index_to_return = 0
111
        for ndx, column_name in enumerate(data_frame_columns):
112
            if column_name == column_name_to_identify:
113
                column_index_to_return = ndx
114
        return column_index_to_return
115
116
    @staticmethod
117
    def fn_get_first_and_last_column_value_from_data_frame(in_data_frame, in_column_name):
118
        return {
119
            'first': in_data_frame.iloc[0][in_column_name],
120
            'last': in_data_frame.iloc[(len(in_data_frame) - 1)][in_column_name],
121
        }
122