Passed
Push — development/test ( 1b4dff...ef552b )
by Daniel
01:09
created

sources.common.DataManipulator   B

Complexity

Total Complexity 47

Size/Duplication

Total Lines 281
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 238
dl 0
loc 281
rs 8.64
c 0
b 0
f 0
wmc 47

17 Methods

Rating   Name   Duplication   Size   Complexity  
A DataManipulator.__init__() 0 4 1
A DataManipulator.add_value_to_dictionary_by_position() 0 9 2
B DataManipulator.fn_add_value_to_dictionary() 0 49 2
A DataManipulator.fn_add_weekday_columns_to_data_frame() 0 6 3
A DataManipulator.fn_convert_datetime_columns_to_string() 0 6 3
B DataManipulator.fn_add_and_shift_column() 0 26 5
B DataManipulator.fn_filter_data_frame_by_index() 0 23 6
A DataManipulator.fn_decide_by_omission_or_specific_false() 0 8 3
A DataManipulator.fn_apply_query_to_data_frame() 0 31 4
A DataManipulator.fn_get_column_index_from_dataframe() 0 7 3
A DataManipulator.fn_convert_string_columns_to_datetime() 0 6 2
A DataManipulator.fn_store_data_frame_to_file() 0 15 2
A DataManipulator.fn_add_days_within_column_to_data_frame() 0 8 2
A DataManipulator.fn_add_timeline_evaluation_column_to_data_frame() 0 17 5
A DataManipulator.fn_load_file_list_to_data_frame() 0 15 1
A DataManipulator.fn_add_minimum_and_maximum_columns_to_data_frame() 0 9 2
A DataManipulator.fn_get_first_current_and_last_column_value_from_data_frame() 0 7 1

How to fix   Complexity   

Complexity

Complex classes like sources.common.DataManipulator often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""
2
Data Manipulation class
3
"""
4
# package to handle date and times
5
from datetime import timedelta
6
# package to add support for multi-language (i18n)
7
import gettext
8
# package to handle files/folders and related metadata/operations
9
import os
10
# package facilitating Data Frames manipulation
11
import pandas as pd
12
13
14
class DataManipulator:
15
    lcl = None
16
17
    def __init__(self, default_language='en_US'):
18
        current_script = os.path.basename(__file__).replace('.py', '')
19
        lang_folder = os.path.join(os.path.dirname(__file__), current_script + '_Locale')
20
        self.lcl = gettext.translation(current_script, lang_folder, languages=[default_language])
21
22
    def fn_add_and_shift_column(self, local_logger, timmer, input_data_frame, input_details):
23
        for current_input_details in input_details:
24
            timmer.start()
25
            col_name_to_add = current_input_details['New Column']
26
            offset_sign = -1
27
            if current_input_details['Direction'] == 'down':
28
                offset_sign = 1
29
            col_offset = offset_sign * current_input_details['Deviation']
30
            input_data_frame[col_name_to_add] = \
31
                input_data_frame[current_input_details['Original Column']]
32
            input_data_frame[col_name_to_add] = input_data_frame[col_name_to_add].shift(col_offset)
33
            fill_value = current_input_details['Empty Values Replacement']
34
            input_data_frame[col_name_to_add] = input_data_frame[col_name_to_add].apply(
35
                lambda x: str(x).replace('.0', '')).apply(lambda x: str(x).replace('nan',
36
                                                                                   fill_value))
0 ignored issues
show
introduced by
The variable fill_value does not seem to be defined in case the for loop on line 23 is not entered. Are you sure this can never be the case?
Loading history...
37
            local_logger.info(self.lcl.gettext( \
38
                'A new column named "{new_column_name}" as copy from "{original_column}" '
39
                + 'then shifted by {shifting_rows} to relevant data frame '
40
                + '(filling any empty value as {empty_values_replacement})') \
41
                              .replace('{new_column_name}', col_name_to_add) \
42
                              .replace('{original_column}',
43
                                       current_input_details['Original Column']) \
44
                              .replace('{shifting_rows}', str(col_offset)) \
45
                              .replace('{empty_values_replacement}', fill_value))
46
            timmer.stop()
47
        return input_data_frame
48
49
    @staticmethod
50
    def fn_add_days_within_column_to_data_frame(input_data_frame, dict_expression):
51
        input_data_frame['Days Within'] = input_data_frame[dict_expression['End Date']] - \
52
                                          input_data_frame[dict_expression['Start Date']] + \
53
                                          timedelta(days=1)
54
        input_data_frame['Days Within'] = input_data_frame['Days Within'] \
55
            .apply(lambda x: int(str(x).replace(' days 00:00:00', '')))
56
        return input_data_frame
57
58
    @staticmethod
59
    def fn_add_minimum_and_maximum_columns_to_data_frame(input_data_frame, dict_expression):
60
        grouped_df = input_data_frame.groupby(dict_expression['group_by']) \
61
            .agg({dict_expression['calculation']: ['min', 'max']})
62
        grouped_df.columns = ['_'.join(col).strip() for col in grouped_df.columns.values]
63
        grouped_df = grouped_df.reset_index()
64
        if 'map' in dict_expression:
65
            grouped_df.rename(columns=dict_expression['map'], inplace=True)
66
        return grouped_df
67
68
    def fn_add_timeline_evaluation_column_to_data_frame(self, in_df, dict_expression):
69
        # shorten last method parameter
70
        de = dict_expression
71
        # add helpful column to use on "Timeline Evaluation" column determination
72
        in_df['Reference Date'] = de['Reference Date']
73
        # actual "Timeline Evaluation" column determination
74
        cols = ['Reference Date', de['Start Date'], de['End Date']]
75
        in_df['Timeline Evaluation'] = in_df[cols] \
76
            .apply(lambda r: 'Current' if r[de['Start Date']]
77
                                          <= r['Reference Date']
78
                                          <= r[de['End Date']] else\
79
                   'Past' if r[de['Start Date']] < r['Reference Date'] else 'Future', axis=1)
80
        # decide if the helpful column is to be retained or not
81
        removal_needed = self.fn_decide_by_omission_or_specific_false(de, 'Keep Reference Date')
82
        if removal_needed:
83
            in_df.drop(columns=['Reference Date'], inplace=True)
84
        return in_df
85
86
    def fn_add_value_to_dictionary(self, in_list, adding_value, adding_type, reference_column):
87
        add_type = adding_type.lower()
88
        total_columns = len(in_list)
89
        if reference_column is None:
90
            reference_indexes = {
91
                'add': {
92
                    'after': 0,
93
                    'before': 0,
94
                },
95
                'cycle_down_to': {
96
                    'after': 0,
97
                    'before': 0,
98
                },
99
            }
100
        else:
101
            reference_indexes = {
102
                'add': {
103
                    'after': in_list.copy().index(reference_column) + 1,
104
                    'before': in_list.copy().index(reference_column),
105
                },
106
                'cycle_down_to': {
107
                    'after': in_list.copy().index(reference_column),
108
                    'before': in_list.copy().index(reference_column),
109
                },
110
            }
111
        positions = {
112
            'after': {
113
                'cycle_down_to': reference_indexes.get('cycle_down_to').get('after'),
114
                'add': reference_indexes.get('add').get('after'),
115
            },
116
            'before': {
117
                'cycle_down_to': reference_indexes.get('cycle_down_to').get('before'),
118
                'add': reference_indexes.get('add').get('before'),
119
            },
120
            'first': {
121
                'cycle_down_to': 0,
122
                'add': 0,
123
            },
124
            'last': {
125
                'cycle_down_to': total_columns,
126
                'add': total_columns,
127
            }
128
        }
129
        return self.add_value_to_dictionary_by_position({
130
            'adding_value': adding_value,
131
            'list': in_list,
132
            'position_to_add': positions.get(add_type).get('add'),
133
            'position_to_cycle_down_to': positions.get(add_type).get('cycle_down_to'),
134
            'total_columns': total_columns,
135
        })
136
137
    @staticmethod
138
    def add_value_to_dictionary_by_position(adding_dictionary):
139
        list_with_values = adding_dictionary['list']
140
        list_with_values.append(adding_dictionary['total_columns'])
141
        for counter in range(adding_dictionary['total_columns'],
142
                             adding_dictionary['position_to_cycle_down_to'], -1):
143
            list_with_values[counter] = list_with_values[(counter - 1)]
144
        list_with_values[adding_dictionary['position_to_add']] = adding_dictionary['adding_value']
145
        return list_with_values
146
147
    @staticmethod
148
    def fn_add_weekday_columns_to_data_frame(input_data_frame, columns_list):
149
        for current_column in columns_list:
150
            input_data_frame['Weekday for ' + current_column] = input_data_frame[current_column] \
151
                .apply(lambda x: x.strftime('%A'))
152
        return input_data_frame
153
154
    def fn_apply_query_to_data_frame(self, local_logger, timmer, input_data_frame, extract_params):
155
        timmer.start()
156
        query_expression = ''
157
        generic_pre_feedback = self.lcl.gettext('Will retain only values {filter_type} '
158
                                                + '"{filter_values}" within the field '
159
                                                + '"{column_to_filter}"') \
160
            .replace('{column_to_filter}', extract_params['column_to_filter'])
161
        if extract_params['filter_to_apply'] == 'equal':
162
            local_logger.debug(generic_pre_feedback \
163
                               .replace('{filter_type}', self.lcl.gettext('equal with')) \
164
                               .replace('{filter_values}', extract_params['filter_values']))
165
            query_expression = '`' + extract_params['column_to_filter'] + '` == "' \
166
                               + extract_params['filter_values'] + '"'
167
        elif extract_params['filter_to_apply'] == 'different':
168
            local_logger.debug(generic_pre_feedback \
169
                               .replace('{filter_type}', self.lcl.gettext('different than')) \
170
                               .replace('{filter_values}', extract_params['filter_values']))
171
            query_expression = '`' + extract_params['column_to_filter'] + '` != "' \
172
                               + extract_params['filter_values'] + '"'
173
        elif extract_params['filter_to_apply'] == 'multiple_match':
174
            multiple_values = '["' + '", "'.join(extract_params['filter_values'].values()) + '"]'
175
            local_logger.debug(generic_pre_feedback \
176
                               .replace('{filter_type}',
177
                                        self.lcl.gettext('matching any of these values')) \
178
                               .replace('{filter_values}', multiple_values))
179
            query_expression = '`' + extract_params['column_to_filter'] + '` in ' + multiple_values
180
        local_logger.debug(self.lcl.gettext('Query expression to apply is: {query_expression}') \
181
                           .replace('{query_expression}', query_expression))
182
        input_data_frame.query(query_expression, inplace=True)
183
        timmer.stop()
184
        return input_data_frame
185
186
    @staticmethod
187
    def fn_convert_datetime_columns_to_string(input_data_frame, columns_list, columns_format):
188
        for current_column in columns_list:
189
            input_data_frame[current_column] = \
190
                input_data_frame[current_column].map(lambda x: x.strftime(columns_format))
191
        return input_data_frame
192
193
    @staticmethod
194
    def fn_convert_string_columns_to_datetime(input_data_frame, columns_list, columns_format):
195
        for current_column in columns_list:
196
            input_data_frame[current_column] = pd.to_datetime(input_data_frame[current_column],
197
                                                              format=columns_format)
198
        return input_data_frame
199
200
    @staticmethod
201
    def fn_decide_by_omission_or_specific_false(in_dictionary, key_decision_factor):
202
        removal_needed = False
203
        if key_decision_factor not in in_dictionary:
204
            removal_needed = True
205
        elif not in_dictionary[key_decision_factor]:
206
            removal_needed = True
207
        return removal_needed
208
209
    def fn_filter_data_frame_by_index(self, local_logger, in_data_frame, filter_rule):
210
        reference_expression = filter_rule['Query Expression for Reference Index']
211
        index_current = in_data_frame.query(reference_expression, inplace=False)
212
        local_logger.info(self.lcl.gettext( \
213
            'Current index has been determined to be {index_current_value}') \
214
                          .replace('{index_current_value}', str(index_current.index)))
215
        if str(index_current.index) != "Int64Index([], dtype='int64')" \
216
                and 'Deviation' in filter_rule:
217
            for deviation_type in filter_rule['Deviation']:
218
                deviation_number = filter_rule['Deviation'][deviation_type]
219
                if deviation_type == 'Lower':
220
                    index_to_apply = index_current.index - deviation_number
221
                    in_data_frame = in_data_frame[in_data_frame.index >= index_to_apply[0]]
222
                elif deviation_type == 'Upper':
223
                    index_to_apply = index_current.index + deviation_number
224
                    in_data_frame = in_data_frame[in_data_frame.index <= index_to_apply[0]]
225
                local_logger.info(self.lcl.gettext( \
226
                    '{deviation_type} Deviation Number is {deviation_number} '
227
                    + 'to be applied to Current index, became {index_to_apply}') \
228
                                  .replace('{deviation_type}', deviation_type) \
229
                                  .replace('{deviation_number}', str(deviation_number)) \
230
                                  .replace('{index_to_apply}', str(index_to_apply)))
0 ignored issues
show
introduced by
The variable index_to_apply does not seem to be defined for all execution paths.
Loading history...
231
        return in_data_frame
232
233
    @staticmethod
234
    def fn_get_column_index_from_dataframe(data_frame_columns, column_name_to_identify):
235
        column_index_to_return = 0
236
        for ndx, column_name in enumerate(data_frame_columns):
237
            if column_name == column_name_to_identify:
238
                column_index_to_return = ndx
239
        return column_index_to_return
240
241
    @staticmethod
242
    def fn_get_first_current_and_last_column_value_from_data_frame(in_data_frame, in_column_name):
243
        return {
244
            'first': in_data_frame.iloc[0][in_column_name],
245
            'current': in_data_frame.query('`Timeline Evaluation` == "Current"',
246
                                           inplace=False)[in_column_name].max(),
247
            'last': in_data_frame.iloc[(len(in_data_frame) - 1)][in_column_name],
248
        }
249
250
    def fn_load_file_list_to_data_frame(self, local_logger, timmer, file_list, csv_delimiter):
251
        timmer.start()
252
        combined_csv = pd.concat([pd.read_csv(filepath_or_buffer=current_file,
253
                                              delimiter=csv_delimiter,
254
                                              cache_dates=True,
255
                                              index_col=None,
256
                                              memory_map=True,
257
                                              low_memory=False,
258
                                              encoding='utf-8',
259
                                              ) for current_file in file_list])
260
        local_logger.info(self.lcl.gettext( \
261
            'All relevant files ({files_counted}) were merged into a Pandas Data Frame') \
262
                          .replace('{files_counted}', str(len(file_list))))
263
        timmer.stop()
264
        return combined_csv
265
266
    def fn_store_data_frame_to_file(self, local_logger, timmer, input_data_frame,
267
                                    input_file_details):
268
        timmer.start()
269
        if input_file_details['format'] == 'csv':
270
            input_data_frame.to_csv(path_or_buf=input_file_details['name'],
271
                                    sep=input_file_details['field_delimiter'],
272
                                    header=True,
273
                                    index=False,
274
                                    encoding='utf-8')
275
        local_logger.info(self.lcl.gettext( \
276
            'Pandas Data Frame has just been saved to file "{file_name}", '
277
            + 'considering {file_type} as file type') \
278
                          .replace('{file_name}', input_file_details['name']) \
279
                          .replace('{file_type}', input_file_details['format']))
280
        timmer.stop()
281