Passed
Push — development/test ( ccd4a2...9ec173 )
by Daniel
01:05
created

sources.common.DataManipulator   B

Complexity

Total Complexity 45

Size/Duplication

Total Lines 242
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 45
eloc 203
dl 0
loc 242
rs 8.8
c 0
b 0
f 0

15 Methods

Rating   Name   Duplication   Size   Complexity  
A DataManipulator.__init__() 0 4 1
A DataManipulator.add_value_to_dictionary_by_position() 0 9 2
A DataManipulator.fn_add_value_to_dictionary() 0 42 2
A DataManipulator.fn_add_weekday_columns_to_data_frame() 0 6 3
A DataManipulator.fn_convert_datetime_columns_to_string() 0 6 3
B DataManipulator.fn_add_and_shift_column() 0 22 6
B DataManipulator.fn_filter_data_frame_by_index() 0 24 6
A DataManipulator.fn_decide_by_omission_or_specific_false() 0 8 3
A DataManipulator.fn_apply_query_to_data_frame() 0 31 4
A DataManipulator.fn_get_column_index_from_dataframe() 0 7 3
A DataManipulator.fn_convert_string_columns_to_datetime() 0 6 2
A DataManipulator.fn_add_days_within_column_to_data_frame() 0 8 2
A DataManipulator.fn_add_timeline_evaluation_column_to_data_frame() 0 21 5
A DataManipulator.fn_add_minimum_and_maximum_columns_to_data_frame() 0 9 2
A DataManipulator.fn_get_first_current_and_last_column_value_from_data_frame() 0 7 1

How to fix   Complexity   

Complexity

Complex classes like sources.common.DataManipulator often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""
2
Data Manipulation class
3
"""
4
# package to handle date and times
5
from datetime import timedelta
6
# package to add support for multi-language (i18n)
7
import gettext
8
# package to handle files/folders and related metadata/operations
9
import os
10
# package facilitating Data Frames manipulation
11
import pandas as pd
12
13
14
class DataManipulator:
15
    lcl = None
16
17
    def __init__(self, default_language='en_US'):
18
        current_script = os.path.basename(__file__).replace('.py', '')
19
        lang_folder = os.path.join(os.path.dirname(__file__), current_script + '_Locale')
20
        self.lcl = gettext.translation(current_script, lang_folder, languages=[default_language])
21
22
    def fn_add_and_shift_column(self, local_logger, timmer, input_data_frame, input_details):
23
        for in_dt in input_details:
24
            timmer.start()
25
            input_data_frame[in_dt['New Column']] = input_data_frame[in_dt['Original Column']]
26
            offset_sign = (lambda x: 1 if x == 'down' else -1)
27
            col_offset = offset_sign(in_dt['Direction']) * in_dt['Deviation']
28
            input_data_frame[in_dt['New Column']] = input_data_frame[in_dt['New Column']]\
29
                .shift(col_offset)
30
            input_data_frame[in_dt['New Column']] = input_data_frame[in_dt['New Column']]\
31
                .apply(lambda x: str(x).replace('.0', ''))\
32
                .apply(lambda x: str(x).replace('nan', str(in_dt['Empty Values Replacement'])))
33
            local_logger.info(self.lcl.gettext(
34
                'A new column named "{new_column_name}" as copy from "{original_column}" '
35
                + 'then shifted by {shifting_rows} to relevant data frame '
36
                + '(filling any empty value as {empty_values_replacement})')
37
                              .replace('{new_column_name}', in_dt['New Column'])
38
                              .replace('{original_column}', in_dt['Original Column'])
39
                              .replace('{shifting_rows}', str(col_offset))
40
                              .replace('{empty_values_replacement}',
41
                                       str(in_dt['Empty Values Replacement'])))
42
            timmer.stop()
43
        return input_data_frame
44
45
    @staticmethod
46
    def fn_add_days_within_column_to_data_frame(input_data_frame, dict_expression):
47
        input_data_frame['Days Within'] = input_data_frame[dict_expression['End Date']] - \
48
                                          input_data_frame[dict_expression['Start Date']] + \
49
                                          timedelta(days=1)
50
        input_data_frame['Days Within'] = input_data_frame['Days Within'] \
51
            .apply(lambda x: int(str(x).replace(' days 00:00:00', '')))
52
        return input_data_frame
53
54
    @staticmethod
55
    def fn_add_minimum_and_maximum_columns_to_data_frame(input_data_frame, dict_expression):
56
        grouped_df = input_data_frame.groupby(dict_expression['group_by']) \
57
            .agg({dict_expression['calculation']: ['min', 'max']})
58
        grouped_df.columns = ['_'.join(col).strip() for col in grouped_df.columns.values]
59
        grouped_df = grouped_df.reset_index()
60
        if 'map' in dict_expression:
61
            grouped_df.rename(columns=dict_expression['map'], inplace=True)
62
        return grouped_df
63
64
    def fn_add_timeline_evaluation_column_to_data_frame(self, in_df, dict_expression):
65
        # shorten last method parameter
66
        de = dict_expression
67
        # add helpful column to use on "Timeline Evaluation" column determination
68
        in_df['rd'] = de['Reference Date']
69
        # rename some columns to cope with long expression
70
        in_df.rename(columns={'Start Date': 'sd', 'End Date': 'ed'}, inplace=True)
71
        # actual "Timeline Evaluation" column determination
72
        cols = ['rd', 'sd', 'ed']
73
        in_df['Timeline Evaluation'] = in_df[cols].apply(lambda r: 'Current'
74
                                                         if r['sd'] <= r['rd'] <= r['ed'] else
75
                                                         'Past' if r['sd'] < r['rd'] else 'Future',
76
                                                         axis=1)
77
        # rename back columns
78
        in_df.rename(columns={'sd': 'Start Date', 'ed': 'End Date', 'rd': 'Reference Date'},
79
                     inplace=True)
80
        # decide if the helpful column is to be retained or not
81
        removal_needed = self.fn_decide_by_omission_or_specific_false(de, 'Keep Reference Date')
82
        if removal_needed:
83
            in_df.drop(columns=['Reference Date'], inplace=True)
84
        return in_df
85
86
    def fn_add_value_to_dictionary(self, in_list, adding_value, adding_type, reference_column):
87
        add_type = adding_type.lower()
88
        total_columns = len(in_list)
89
        reference_indexes = {
90
            'add': {'after': 0, 'before': 0},
91
            'cycle_down_to': {'after': 0, 'before': 0}
92
        }
93
        if type(reference_column) is int:
94
            reference_indexes = {
95
                'add': {
96
                    'after': in_list.copy().index(reference_column) + 1,
97
                    'before': in_list.copy().index(reference_column),
98
                },
99
                'cycle_down_to': {
100
                    'after': in_list.copy().index(reference_column),
101
                    'before': in_list.copy().index(reference_column),
102
                }
103
            }
104
        positions = {
105
            'after': {
106
                'cycle_down_to': reference_indexes.get('cycle_down_to').get('after'),
107
                'add': reference_indexes.get('add').get('after'),
108
            },
109
            'before': {
110
                'cycle_down_to': reference_indexes.get('cycle_down_to').get('before'),
111
                'add': reference_indexes.get('add').get('before'),
112
            },
113
            'first': {
114
                'cycle_down_to': 0,
115
                'add': 0,
116
            },
117
            'last': {
118
                'cycle_down_to': total_columns,
119
                'add': total_columns,
120
            }
121
        }
122
        return self.add_value_to_dictionary_by_position({
123
            'adding_value': adding_value,
124
            'list': in_list,
125
            'position_to_add': positions.get(add_type).get('add'),
126
            'position_to_cycle_down_to': positions.get(add_type).get('cycle_down_to'),
127
            'total_columns': total_columns,
128
        })
129
130
    @staticmethod
131
    def add_value_to_dictionary_by_position(adding_dictionary):
132
        list_with_values = adding_dictionary['list']
133
        list_with_values.append(adding_dictionary['total_columns'])
134
        for counter in range(adding_dictionary['total_columns'],
135
                             adding_dictionary['position_to_cycle_down_to'], -1):
136
            list_with_values[counter] = list_with_values[(counter - 1)]
137
        list_with_values[adding_dictionary['position_to_add']] = adding_dictionary['adding_value']
138
        return list_with_values
139
140
    @staticmethod
141
    def fn_add_weekday_columns_to_data_frame(input_data_frame, columns_list):
142
        for current_column in columns_list:
143
            input_data_frame['Weekday for ' + current_column] = input_data_frame[current_column] \
144
                .apply(lambda x: x.strftime('%A'))
145
        return input_data_frame
146
147
    def fn_apply_query_to_data_frame(self, local_logger, timmer, input_data_frame, extract_params):
148
        timmer.start()
149
        query_expression = ''
150
        generic_pre_feedback = self.lcl.gettext('Will retain only values {filter_type} '
151
                                                + '"{filter_values}" within the field '
152
                                                + '"{column_to_filter}"') \
153
            .replace('{column_to_filter}', extract_params['column_to_filter'])
154
        if extract_params['filter_to_apply'] == 'equal':
155
            local_logger.debug(generic_pre_feedback
156
                               .replace('{filter_type}', self.lcl.gettext('equal with'))
157
                               .replace('{filter_values}', extract_params['filter_values']))
158
            query_expression = '`' + extract_params['column_to_filter'] + '` == "' \
159
                               + extract_params['filter_values'] + '"'
160
        elif extract_params['filter_to_apply'] == 'different':
161
            local_logger.debug(generic_pre_feedback
162
                               .replace('{filter_type}', self.lcl.gettext('different than'))
163
                               .replace('{filter_values}', extract_params['filter_values']))
164
            query_expression = '`' + extract_params['column_to_filter'] + '` != "' \
165
                               + extract_params['filter_values'] + '"'
166
        elif extract_params['filter_to_apply'] == 'multiple_match':
167
            multiple_values = '["' + '", "'.join(extract_params['filter_values'].values()) + '"]'
168
            local_logger.debug(generic_pre_feedback
169
                               .replace('{filter_type}',
170
                                        self.lcl.gettext('matching any of these values'))
171
                               .replace('{filter_values}', multiple_values))
172
            query_expression = '`' + extract_params['column_to_filter'] + '` in ' + multiple_values
173
        local_logger.debug(self.lcl.gettext('Query expression to apply is: {query_expression}')
174
                           .replace('{query_expression}', query_expression))
175
        input_data_frame.query(query_expression, inplace=True)
176
        timmer.stop()
177
        return input_data_frame
178
179
    @staticmethod
180
    def fn_convert_datetime_columns_to_string(input_data_frame, columns_list, columns_format):
181
        for current_column in columns_list:
182
            input_data_frame[current_column] = \
183
                input_data_frame[current_column].map(lambda x: x.strftime(columns_format))
184
        return input_data_frame
185
186
    @staticmethod
187
    def fn_convert_string_columns_to_datetime(input_data_frame, columns_list, columns_format):
188
        for current_column in columns_list:
189
            input_data_frame[current_column] = pd.to_datetime(input_data_frame[current_column],
190
                                                              format=columns_format)
191
        return input_data_frame
192
193
    @staticmethod
194
    def fn_decide_by_omission_or_specific_false(in_dictionary, key_decision_factor):
195
        removal_needed = False
196
        if key_decision_factor not in in_dictionary:
197
            removal_needed = True
198
        elif not in_dictionary[key_decision_factor]:
199
            removal_needed = True
200
        return removal_needed
201
202
    def fn_filter_data_frame_by_index(self, local_logger, in_data_frame, filter_rule):
203
        reference_expression = filter_rule['Query Expression for Reference Index']
204
        index_current = in_data_frame.query(reference_expression, inplace=False)
205
        local_logger.info(self.lcl.gettext(
206
            'Current index has been determined to be {index_current_value}')
207
                          .replace('{index_current_value}', str(index_current.index)))
208
        if str(index_current.index) != "Int64Index([], dtype='int64')" \
209
                and 'Deviation' in filter_rule:
210
            for deviation_type in filter_rule['Deviation']:
211
                deviation_number = filter_rule['Deviation'][deviation_type]
212
                index_to_apply = index_current.index
213
                if deviation_type == 'Lower':
214
                    index_to_apply = index_current.index - deviation_number
215
                    in_data_frame = in_data_frame[in_data_frame.index >= index_to_apply[0]]
216
                elif deviation_type == 'Upper':
217
                    index_to_apply = index_current.index + deviation_number
218
                    in_data_frame = in_data_frame[in_data_frame.index <= index_to_apply[0]]
219
                local_logger.info(self.lcl.gettext(
220
                    '{deviation_type} Deviation Number is {deviation_number} '
221
                    + 'to be applied to Current index, became {index_to_apply}')
222
                                  .replace('{deviation_type}', deviation_type)
223
                                  .replace('{deviation_number}', str(deviation_number))
224
                                  .replace('{index_to_apply}', str(index_to_apply)))
225
        return in_data_frame
226
227
    @staticmethod
228
    def fn_get_column_index_from_dataframe(data_frame_columns, column_name_to_identify):
229
        column_index_to_return = 0
230
        for ndx, column_name in enumerate(data_frame_columns):
231
            if column_name == column_name_to_identify:
232
                column_index_to_return = ndx
233
        return column_index_to_return
234
235
    @staticmethod
236
    def fn_get_first_current_and_last_column_value_from_data_frame(in_data_frame, in_column_name):
237
        return {
238
            'first': in_data_frame.iloc[0][in_column_name],
239
            'current': in_data_frame.query('`Timeline Evaluation` == "Current"',
240
                                           inplace=False)[in_column_name].max(),
241
            'last': in_data_frame.iloc[(len(in_data_frame) - 1)][in_column_name],
242
        }
243