DataManipulator.__init__()   A
last analyzed

Complexity

Conditions 1

Size

Total Lines 8
Code Lines 8

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 8
nop 2
dl 0
loc 8
rs 10
c 0
b 0
f 0
1
"""
2
Data Manipulation class
3
"""
4
# package to add support for multi-language (i18n)
5
import gettext
6
# package to handle files/folders and related metadata/operations
7
import os
8
9
10
class DataManipulator:
11
    locale = None
12
13
    def __init__(self, in_language='en_US'):
14
        file_parts = os.path.normpath(os.path.abspath(__file__)).replace('\\', os.path.altsep)\
15
            .split(os.path.altsep)
16
        locale_domain = file_parts[(len(file_parts)-1)].replace('.py', '')
17
        locale_folder = os.path.normpath(os.path.join(
18
            os.path.join(os.path.altsep.join(file_parts[:-2]), 'project_locale'), locale_domain))
19
        self.locale = gettext.translation(locale_domain, localedir=locale_folder,
20
                                          languages=[in_language], fallback=True)
21
22
    def fn_add_and_shift_column(self, local_logger, timer, input_data_frame, input_details: list):
23
        evr = 'Empty Values Replacement'
24
        for crt_dict in input_details:
25
            timer.start()
26
            input_data_frame[crt_dict['New Column']] = input_data_frame[crt_dict['Original Column']]
27
            col_offset = self.fn_set_shifting_value(crt_dict)
28
            input_data_frame[crt_dict['New Column']] = \
29
                input_data_frame[crt_dict['New Column']].shift(col_offset)
30
            input_data_frame[crt_dict['New Column']] = \
31
                input_data_frame[crt_dict['New Column']].apply(lambda x: str(x)
32
                                                               .replace('nan', str(crt_dict[evr]))
0 ignored issues
show
introduced by
The variable crt_dict does not seem to be defined in case the for loop on line 24 is not entered. Are you sure this can never be the case?
Loading history...
33
                                                               .replace('.0', ''))
34
            local_logger.info(self.locale.gettext(
35
                'A new column named "{new_column_name}" as copy from "{original_column}" '
36
                + 'then shifted by {shifting_rows} to relevant data frame '
37
                + '(filling any empty value as {empty_values_replacement})')
38
                              .replace('{new_column_name}', crt_dict['New Column'])
39
                              .replace('{original_column}', crt_dict['Original Column'])
40
                              .replace('{shifting_rows}', str(col_offset))
41
                              .replace('{empty_values_replacement}',
42
                                       str(crt_dict['Empty Values Replacement'])))
43
            timer.stop()
44
        return input_data_frame
45
46
    @staticmethod
47
    def fn_add_minimum_and_maximum_columns_to_data_frame(input_data_frame, dict_expression):
48
        grouped_df = input_data_frame.groupby(dict_expression['group_by']) \
49
            .agg({dict_expression['calculation']: ['min', 'max']})
50
        grouped_df.columns = ['_'.join(col).strip() for col in grouped_df.columns.values]
51
        grouped_df = grouped_df.reset_index()
52
        if 'map' in dict_expression:
53
            grouped_df.rename(columns=dict_expression['map'], inplace=True)
54
        return grouped_df
55
56
    def fn_apply_query_to_data_frame(self, local_logger, timer, input_data_frame, extract_params):
57
        timer.start()
58
        query_expression = ''
59
        generic_pre_feedback = self.locale.gettext('Will retain only values {filter_type} '
60
                                                   + '"{filter_values}" within the field '
61
                                                   + '"{column_to_filter}"') \
62
            .replace('{column_to_filter}', extract_params['column_to_filter'])
63
        if extract_params['filter_to_apply'] == 'equal':
64
            local_logger.debug(generic_pre_feedback
65
                               .replace('{filter_type}', self.locale.gettext('equal with'))
66
                               .replace('{filter_values}', extract_params['filter_values']))
67
            query_expression = '`' + extract_params['column_to_filter'] + '` == "' \
68
                               + extract_params['filter_values'] + '"'
69
        elif extract_params['filter_to_apply'] == 'different':
70
            local_logger.debug(generic_pre_feedback
71
                               .replace('{filter_type}', self.locale.gettext('different than'))
72
                               .replace('{filter_values}', extract_params['filter_values']))
73
            query_expression = '`' + extract_params['column_to_filter'] + '` != "' \
74
                               + extract_params['filter_values'] + '"'
75
        elif extract_params['filter_to_apply'] == 'multiple_match':
76
            multiple_values = '["' + '", "'.join(extract_params['filter_values'].values()) + '"]'
77
            local_logger.debug(generic_pre_feedback
78
                               .replace('{filter_type}',
79
                                        self.locale.gettext('matching any of these values'))
80
                               .replace('{filter_values}', multiple_values))
81
            query_expression = '`' + extract_params['column_to_filter'] + '` in ' + multiple_values
82
        local_logger.debug(self.locale.gettext('Query expression to apply is: {query_expression}')
83
                           .replace('{query_expression}', query_expression))
84
        input_data_frame.query(query_expression, inplace=True)
85
        timer.stop()
86
        return input_data_frame
87
88
    def fn_filter_data_frame_by_index(self, local_logger, in_data_frame, filter_rule):
89
        reference_expression = filter_rule['Query Expression for Reference Index']
90
        index_current = in_data_frame.query(reference_expression, inplace=False)
91
        local_logger.info(self.locale.gettext(
92
            'Current index has been determined to be {index_current_value}')
93
                          .replace('{index_current_value}', str(index_current.index)))
94
        if str(index_current.index) != "Int64Index([], dtype='int64')" \
95
                and 'Deviation' in filter_rule:
96
            in_data_frame = self.fn_filter_data_frame_by_index_internal(local_logger, {
97
                'data frame': in_data_frame,
98
                'deviation': filter_rule['Deviation'],
99
                'index': index_current.index,
100
            })
101
        return in_data_frame
102
103
    def fn_filter_data_frame_by_index_internal(self, local_logger, in_dict):
104
        in_data_frame = in_dict['data_frame']
105
        for deviation_type in in_dict['deviation']:
106
            deviation_number = in_dict['deviation'][deviation_type]
107
            index_to_apply = in_dict['index']
108
            if deviation_type == 'Lower':
109
                index_to_apply -= deviation_number
110
                in_data_frame = in_data_frame[in_dict['index'] >= index_to_apply[0]]
111
            elif deviation_type == 'Upper':
112
                index_to_apply += deviation_number
113
                in_data_frame = in_data_frame[in_dict['index'] <= index_to_apply[0]]
114
            local_logger.info(self.locale.gettext(
115
                '{deviation_type} Deviation Number is {deviation_number} '
116
                + 'to be applied to Current index, became {index_to_apply}')
117
                              .replace('{deviation_type}', deviation_type)
118
                              .replace('{deviation_number}', str(deviation_number))
119
                              .replace('{index_to_apply}', str(index_to_apply)))
120
        return in_dict['data_frame']
121
122
    @staticmethod
123
    def fn_get_column_index_from_data_frame(data_frame_columns, column_name_to_identify):
124
        column_index_to_return = 0
125
        for ndx, column_name in enumerate(data_frame_columns):
126
            if column_name == column_name_to_identify:
127
                column_index_to_return = ndx
128
        return column_index_to_return
129
130
    @staticmethod
131
    def fn_get_first_and_last_column_value_from_data_frame(in_data_frame, in_column_name):
132
        return {
133
            'first': in_data_frame.iloc[0][in_column_name],
134
            'last': in_data_frame.iloc[(len(in_data_frame) - 1)][in_column_name],
135
        }
136
137
    @staticmethod
138
    def fn_set_shifting_value(in_dict):
139
        offset_sign = 1
140
        if in_dict['Direction'] == 'up':
141
            offset_sign = -1
142
        return offset_sign * in_dict['Deviation']
143