Passed
Push — development/test ( 1ef49d...da436f )
by Daniel
01:13
created

sources.common.DataManipulator   C

Complexity

Total Complexity 57

Size/Duplication

Total Lines 315
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 57
eloc 269
dl 0
loc 315
rs 5.04
c 0
b 0
f 0

18 Methods

Rating   Name   Duplication   Size   Complexity  
A DataManipulator.__init__() 0 4 1
A DataManipulator.add_value_to_dictionary_by_position() 0 9 2
B DataManipulator.fn_add_value_to_dictionary() 0 49 2
A DataManipulator.fn_add_weekday_columns_to_data_frame() 0 6 3
A DataManipulator.fn_convert_datetime_columns_to_string() 0 6 3
B DataManipulator.fn_add_and_shift_column() 0 26 5
B DataManipulator.fn_filter_data_frame_by_index() 0 23 6
A DataManipulator.fn_decide_by_omission_or_specific_false() 0 8 3
A DataManipulator.fn_apply_query_to_data_frame() 0 31 4
A DataManipulator.fn_get_column_index_from_dataframe() 0 7 3
A DataManipulator.fn_convert_string_columns_to_datetime() 0 6 2
A DataManipulator.fn_add_days_within_column_to_data_frame() 0 8 2
A DataManipulator.fn_add_timeline_evaluation_column_to_data_frame() 0 17 5
A DataManipulator.fn_load_file_list_to_data_frame() 0 15 1
A DataManipulator.fn_add_minimum_and_maximum_columns_to_data_frame() 0 9 2
A DataManipulator.fn_get_first_current_and_last_column_value_from_data_frame() 0 7 1
B DataManipulator.fn_store_data_frame_to_file_validation() 0 17 6
B DataManipulator.fn_store_data_frame_to_file() 0 31 6

How to fix   Complexity   

Complexity

Complex classes like sources.common.DataManipulator often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""
2
Data Manipulation class
3
"""
4
# package to handle date and times
5
from datetime import timedelta
6
# package to add support for multi-language (i18n)
7
import gettext
8
# package to handle files/folders and related metadata/operations
9
import os
10
# package facilitating Data Frames manipulation
11
import pandas as pd
12
13
14
class DataManipulator:
15
    lcl = None
16
17
    def __init__(self, default_language='en_US'):
18
        current_script = os.path.basename(__file__).replace('.py', '')
19
        lang_folder = os.path.join(os.path.dirname(__file__), current_script + '_Locale')
20
        self.lcl = gettext.translation(current_script, lang_folder, languages=[default_language])
21
22
    def fn_add_and_shift_column(self, local_logger, timmer, input_data_frame, input_details):
23
        for current_input_details in input_details:
24
            timmer.start()
25
            col_name_to_add = current_input_details['New Column']
26
            offset_sign = -1
27
            if current_input_details['Direction'] == 'down':
28
                offset_sign = 1
29
            col_offset = offset_sign * current_input_details['Deviation']
30
            input_data_frame[col_name_to_add] = \
31
                input_data_frame[current_input_details['Original Column']]
32
            input_data_frame[col_name_to_add] = input_data_frame[col_name_to_add].shift(col_offset)
33
            fill_value = current_input_details['Empty Values Replacement']
34
            input_data_frame[col_name_to_add] = input_data_frame[col_name_to_add].apply(
35
                lambda x: str(x).replace('.0', '')).apply(lambda x: str(x).replace('nan',
36
                                                                                   fill_value))
0 ignored issues
show
introduced by
The variable fill_value does not seem to be defined in case the for loop on line 23 is not entered. Are you sure this can never be the case?
Loading history...
37
            local_logger.info(self.lcl.gettext( \
38
                'A new column named "{new_column_name}" as copy from "{original_column}" '
39
                + 'then shifted by {shifting_rows} to relevant data frame '
40
                + '(filling any empty value as {empty_values_replacement})') \
41
                              .replace('{new_column_name}', col_name_to_add) \
42
                              .replace('{original_column}',
43
                                       current_input_details['Original Column']) \
44
                              .replace('{shifting_rows}', str(col_offset)) \
45
                              .replace('{empty_values_replacement}', fill_value))
46
            timmer.stop()
47
        return input_data_frame
48
49
    @staticmethod
50
    def fn_add_days_within_column_to_data_frame(input_data_frame, dict_expression):
51
        input_data_frame['Days Within'] = input_data_frame[dict_expression['End Date']] - \
52
                                          input_data_frame[dict_expression['Start Date']] + \
53
                                          timedelta(days=1)
54
        input_data_frame['Days Within'] = input_data_frame['Days Within'] \
55
            .apply(lambda x: int(str(x).replace(' days 00:00:00', '')))
56
        return input_data_frame
57
58
    @staticmethod
59
    def fn_add_minimum_and_maximum_columns_to_data_frame(input_data_frame, dict_expression):
60
        grouped_df = input_data_frame.groupby(dict_expression['group_by']) \
61
            .agg({dict_expression['calculation']: ['min', 'max']})
62
        grouped_df.columns = ['_'.join(col).strip() for col in grouped_df.columns.values]
63
        grouped_df = grouped_df.reset_index()
64
        if 'map' in dict_expression:
65
            grouped_df.rename(columns=dict_expression['map'], inplace=True)
66
        return grouped_df
67
68
    def fn_add_timeline_evaluation_column_to_data_frame(self, in_df, dict_expression):
69
        # shorten last method parameter
70
        de = dict_expression
71
        # add helpful column to use on "Timeline Evaluation" column determination
72
        in_df['Reference Date'] = de['Reference Date']
73
        # actual "Timeline Evaluation" column determination
74
        cols = ['Reference Date', de['Start Date'], de['End Date']]
75
        in_df['Timeline Evaluation'] = in_df[cols] \
76
            .apply(lambda r: 'Current' if r[de['Start Date']]
77
                                          <= r['Reference Date']
78
                                          <= r[de['End Date']] else\
79
                   'Past' if r[de['Start Date']] < r['Reference Date'] else 'Future', axis=1)
80
        # decide if the helpful column is to be retained or not
81
        removal_needed = self.fn_decide_by_omission_or_specific_false(de, 'Keep Reference Date')
82
        if removal_needed:
83
            in_df.drop(columns=['Reference Date'], inplace=True)
84
        return in_df
85
86
    def fn_add_value_to_dictionary(self, in_list, adding_value, adding_type, reference_column):
87
        add_type = adding_type.lower()
88
        total_columns = len(in_list)
89
        if reference_column is None:
90
            reference_indexes = {
91
                'add': {
92
                    'after': 0,
93
                    'before': 0,
94
                },
95
                'cycle_down_to': {
96
                    'after': 0,
97
                    'before': 0,
98
                },
99
            }
100
        else:
101
            reference_indexes = {
102
                'add': {
103
                    'after': in_list.copy().index(reference_column) + 1,
104
                    'before': in_list.copy().index(reference_column),
105
                },
106
                'cycle_down_to': {
107
                    'after': in_list.copy().index(reference_column),
108
                    'before': in_list.copy().index(reference_column),
109
                },
110
            }
111
        positions = {
112
            'after': {
113
                'cycle_down_to': reference_indexes.get('cycle_down_to').get('after'),
114
                'add': reference_indexes.get('add').get('after'),
115
            },
116
            'before': {
117
                'cycle_down_to': reference_indexes.get('cycle_down_to').get('before'),
118
                'add': reference_indexes.get('add').get('before'),
119
            },
120
            'first': {
121
                'cycle_down_to': 0,
122
                'add': 0,
123
            },
124
            'last': {
125
                'cycle_down_to': total_columns,
126
                'add': total_columns,
127
            }
128
        }
129
        return self.add_value_to_dictionary_by_position({
130
            'adding_value': adding_value,
131
            'list': in_list,
132
            'position_to_add': positions.get(add_type).get('add'),
133
            'position_to_cycle_down_to': positions.get(add_type).get('cycle_down_to'),
134
            'total_columns': total_columns,
135
        })
136
137
    @staticmethod
138
    def add_value_to_dictionary_by_position(adding_dictionary):
139
        list_with_values = adding_dictionary['list']
140
        list_with_values.append(adding_dictionary['total_columns'])
141
        for counter in range(adding_dictionary['total_columns'],
142
                             adding_dictionary['position_to_cycle_down_to'], -1):
143
            list_with_values[counter] = list_with_values[(counter - 1)]
144
        list_with_values[adding_dictionary['position_to_add']] = adding_dictionary['adding_value']
145
        return list_with_values
146
147
    @staticmethod
148
    def fn_add_weekday_columns_to_data_frame(input_data_frame, columns_list):
149
        for current_column in columns_list:
150
            input_data_frame['Weekday for ' + current_column] = input_data_frame[current_column] \
151
                .apply(lambda x: x.strftime('%A'))
152
        return input_data_frame
153
154
    def fn_apply_query_to_data_frame(self, local_logger, timmer, input_data_frame, extract_params):
155
        timmer.start()
156
        query_expression = ''
157
        generic_pre_feedback = self.lcl.gettext('Will retain only values {filter_type} '
158
                                                + '"{filter_values}" within the field '
159
                                                + '"{column_to_filter}"') \
160
            .replace('{column_to_filter}', extract_params['column_to_filter'])
161
        if extract_params['filter_to_apply'] == 'equal':
162
            local_logger.debug(generic_pre_feedback \
163
                               .replace('{filter_type}', self.lcl.gettext('equal with')) \
164
                               .replace('{filter_values}', extract_params['filter_values']))
165
            query_expression = '`' + extract_params['column_to_filter'] + '` == "' \
166
                               + extract_params['filter_values'] + '"'
167
        elif extract_params['filter_to_apply'] == 'different':
168
            local_logger.debug(generic_pre_feedback \
169
                               .replace('{filter_type}', self.lcl.gettext('different than')) \
170
                               .replace('{filter_values}', extract_params['filter_values']))
171
            query_expression = '`' + extract_params['column_to_filter'] + '` != "' \
172
                               + extract_params['filter_values'] + '"'
173
        elif extract_params['filter_to_apply'] == 'multiple_match':
174
            multiple_values = '["' + '", "'.join(extract_params['filter_values'].values()) + '"]'
175
            local_logger.debug(generic_pre_feedback \
176
                               .replace('{filter_type}',
177
                                        self.lcl.gettext('matching any of these values')) \
178
                               .replace('{filter_values}', multiple_values))
179
            query_expression = '`' + extract_params['column_to_filter'] + '` in ' + multiple_values
180
        local_logger.debug(self.lcl.gettext('Query expression to apply is: {query_expression}') \
181
                           .replace('{query_expression}', query_expression))
182
        input_data_frame.query(query_expression, inplace=True)
183
        timmer.stop()
184
        return input_data_frame
185
186
    @staticmethod
187
    def fn_convert_datetime_columns_to_string(input_data_frame, columns_list, columns_format):
188
        for current_column in columns_list:
189
            input_data_frame[current_column] = \
190
                input_data_frame[current_column].map(lambda x: x.strftime(columns_format))
191
        return input_data_frame
192
193
    @staticmethod
194
    def fn_convert_string_columns_to_datetime(input_data_frame, columns_list, columns_format):
195
        for current_column in columns_list:
196
            input_data_frame[current_column] = pd.to_datetime(input_data_frame[current_column],
197
                                                              format=columns_format)
198
        return input_data_frame
199
200
    @staticmethod
201
    def fn_decide_by_omission_or_specific_false(in_dictionary, key_decision_factor):
202
        removal_needed = False
203
        if key_decision_factor not in in_dictionary:
204
            removal_needed = True
205
        elif not in_dictionary[key_decision_factor]:
206
            removal_needed = True
207
        return removal_needed
208
209
    def fn_filter_data_frame_by_index(self, local_logger, in_data_frame, filter_rule):
210
        reference_expression = filter_rule['Query Expression for Reference Index']
211
        index_current = in_data_frame.query(reference_expression, inplace=False)
212
        local_logger.info(self.lcl.gettext( \
213
            'Current index has been determined to be {index_current_value}') \
214
                          .replace('{index_current_value}', str(index_current.index)))
215
        if str(index_current.index) != "Int64Index([], dtype='int64')" \
216
                and 'Deviation' in filter_rule:
217
            for deviation_type in filter_rule['Deviation']:
218
                deviation_number = filter_rule['Deviation'][deviation_type]
219
                if deviation_type == 'Lower':
220
                    index_to_apply = index_current.index - deviation_number
221
                    in_data_frame = in_data_frame[in_data_frame.index >= index_to_apply[0]]
222
                elif deviation_type == 'Upper':
223
                    index_to_apply = index_current.index + deviation_number
224
                    in_data_frame = in_data_frame[in_data_frame.index <= index_to_apply[0]]
225
                local_logger.info(self.lcl.gettext( \
226
                    '{deviation_type} Deviation Number is {deviation_number} '
227
                    + 'to be applied to Current index, became {index_to_apply}') \
228
                                  .replace('{deviation_type}', deviation_type) \
229
                                  .replace('{deviation_number}', str(deviation_number)) \
230
                                  .replace('{index_to_apply}', str(index_to_apply)))
0 ignored issues
show
introduced by
The variable index_to_apply does not seem to be defined for all execution paths.
Loading history...
231
        return in_data_frame
232
233
    @staticmethod
234
    def fn_get_column_index_from_dataframe(data_frame_columns, column_name_to_identify):
235
        column_index_to_return = 0
236
        for ndx, column_name in enumerate(data_frame_columns):
237
            if column_name == column_name_to_identify:
238
                column_index_to_return = ndx
239
        return column_index_to_return
240
241
    @staticmethod
242
    def fn_get_first_current_and_last_column_value_from_data_frame(in_data_frame, in_column_name):
243
        return {
244
            'first': in_data_frame.iloc[0][in_column_name],
245
            'current': in_data_frame.query('`Timeline Evaluation` == "Current"',
246
                                           inplace=False)[in_column_name].max(),
247
            'last': in_data_frame.iloc[(len(in_data_frame) - 1)][in_column_name],
248
        }
249
250
    def fn_load_file_list_to_data_frame(self, local_logger, timmer, file_list, csv_delimiter):
251
        timmer.start()
252
        combined_csv = pd.concat([pd.read_csv(filepath_or_buffer=current_file,
253
                                              delimiter=csv_delimiter,
254
                                              cache_dates=True,
255
                                              index_col=None,
256
                                              memory_map=True,
257
                                              low_memory=False,
258
                                              encoding='utf-8',
259
                                              ) for current_file in file_list])
260
        local_logger.info(self.lcl.gettext( \
261
            'All relevant files ({files_counted}) were merged into a Pandas Data Frame') \
262
                          .replace('{files_counted}', str(len(file_list))))
263
        timmer.stop()
264
        return combined_csv
265
266
    def fn_store_data_frame_to_file(self, local_logger, timmer, in_data_frame, in_file_details):
267
        timmer.start()
268
        is_file_saved = False
269
        given_format = self.fn_store_data_frame_to_file_validation(local_logger, in_data_frame,
270
                                                                   in_file_details)
271
        if given_format == 'csv':
272
            in_data_frame.to_csv(path_or_buf=in_file_details['name'],
273
                                 sep=in_file_details['field-delimiter'],
274
                                 header=True,
275
                                 index=False,
276
                                 encoding='utf-8')
277
            is_file_saved = True
278
        elif given_format == 'excel':
279
            in_data_frame.to_excel(excel_writer=in_file_details['name'],
280
                                   engine='xlsxwriter',
281
                                   freeze_panes=(1, 1),
282
                                   verbose=True)
283
            is_file_saved = True
284
        elif given_format == 'pickle':
285
            if 'compression' not in in_file_details:
286
                in_file_details['compression'] = 'gzip'
287
            in_data_frame.to_pickle(path=in_file_details['name'],
288
                                    compression=in_file_details['compression'])
289
            is_file_saved = True
290
        if is_file_saved:
291
            local_logger.info(self.lcl.gettext( \
292
                'Pandas Data Frame has just been saved to file "{file_name}", '
293
                + 'considering {file_type} as file type') \
294
                              .replace('{file_name}', in_file_details['name']) \
295
                              .replace('{file_type}', in_file_details['format']))
296
        timmer.stop()
297
298
    def fn_store_data_frame_to_file_validation(self, local_logger, in_data_frame, in_file_details):
299
        implemented_file_formats = ['csv', 'excel', 'pickle']
300
        are_settings_ok = False
301
        if 'format' in in_file_details:
302
            given_format = in_file_details['format'].lower()
303
            if given_format not in implemented_file_formats:
304
                local_logger.error(self.lcl.gettext('File "format" attribute has a value of "{format_value}" which is not among currently implemented values ("{implemented_file_formats}", therefore file saving is not possible').replace('{format_value}', given_format).replace('{implemented_file_formats}', '", "'.join(implemented_file_formats)))
305
            elif given_format == 'csv':
306
                if 'field-delimiter' in in_file_details:
307
                    are_settings_ok = given_format
308
                else:
309
                    local_logger.error(self.lcl.gettext('File format is CSV and that required an "field-delimiter" attribute with value, and that is missing, therfore file saving is not going to be performed'))
310
            elif given_format in ('excel', 'pickle'):
311
                are_settings_ok = given_format
312
        else:
313
            local_logger.error(self.lcl.gettext('File "format" attribute is mandatory in the file setting, but missing, therefore file saving is not possible'))
314
        return are_settings_ok
315