Passed
Push — development/test ( 6383cd...222d55 )
by Daniel
02:57
created

sources.common.DataManipulator   A

Complexity

Total Complexity 41

Size/Duplication

Total Lines 245
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 205
dl 0
loc 245
rs 9.1199
c 0
b 0
f 0
wmc 41

15 Methods

Rating   Name   Duplication   Size   Complexity  
A DataManipulator.add_value_to_dictionary_by_position() 0 9 2
A DataManipulator.fn_add_weekday_columns_to_data_frame() 0 6 3
A DataManipulator.fn_convert_datetime_columns_to_string() 0 6 3
B DataManipulator.fn_filter_data_frame_by_index() 0 23 6
A DataManipulator.__init__() 0 4 1
A DataManipulator.fn_decide_by_omission_or_specific_false() 0 8 3
A DataManipulator.fn_apply_query_to_data_frame() 0 31 4
A DataManipulator.fn_convert_string_columns_to_datetime() 0 6 2
A DataManipulator.fn_store_data_frame_to_file() 0 15 2
A DataManipulator.fn_add_days_within_column_to_data_frame() 0 8 2
A DataManipulator.fn_add_timeline_evaluation_column_to_data_frame() 0 17 5
A DataManipulator.get_column_index_from_dataframe() 0 7 3
A DataManipulator.fn_load_file_list_to_data_frame() 0 15 1
A DataManipulator.fn_add_minimum_and_maximum_columns_to_data_frame() 0 9 2
B DataManipulator.add_value_to_dictionary() 0 49 2

How to fix   Complexity   

Complexity

Complex classes like sources.common.DataManipulator often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""
2
Data Manipulation class
3
"""
4
# package to handle date and times
5
from datetime import timedelta
6
# package to add support for multi-language (i18n)
7
import gettext
8
# package to handle files/folders and related metadata/operations
9
import os
10
# package facilitating Data Frames manipulation
11
import pandas as pd
12
13
14
class DataManipulator:
15
    lcl = None
16
17
    def __init__(self, default_language='en_US'):
18
        current_script = os.path.basename(__file__).replace('.py', '')
19
        lang_folder = os.path.join(os.path.dirname(__file__), current_script + '_Locale')
20
        self.lcl = gettext.translation(current_script, lang_folder, languages=[default_language])
21
22
    @staticmethod
23
    def fn_add_days_within_column_to_data_frame(input_data_frame, dict_expression):
24
        input_data_frame['Days Within'] = input_data_frame[dict_expression['End Date']] - \
25
                                          input_data_frame[dict_expression['Start Date']] + \
26
                                          timedelta(days=1)
27
        input_data_frame['Days Within'] = input_data_frame['Days Within'] \
28
            .apply(lambda x: int(str(x).replace(' days 00:00:00', '')))
29
        return input_data_frame
30
31
    @staticmethod
32
    def fn_add_minimum_and_maximum_columns_to_data_frame(input_data_frame, dict_expression):
33
        grouped_df = input_data_frame.groupby(dict_expression['group_by']) \
34
            .agg({dict_expression['calculation']: ['min', 'max']})
35
        grouped_df.columns = ['_'.join(col).strip() for col in grouped_df.columns.values]
36
        grouped_df = grouped_df.reset_index()
37
        if 'map' in dict_expression:
38
            grouped_df.rename(columns=dict_expression['map'], inplace=True)
39
        return grouped_df
40
41
    def fn_add_timeline_evaluation_column_to_data_frame(self, in_df, dict_expression):
42
        # shorten last method parameter
43
        de = dict_expression
44
        # add helpful column to use on "Timeline Evaluation" column determination
45
        in_df['Reference Date'] = de['Reference Date']
46
        # actual "Timeline Evaluation" column determination
47
        cols = ['Reference Date', de['Start Date'], de['End Date']]
48
        in_df['Timeline Evaluation'] = in_df[cols] \
49
            .apply(lambda r: 'Current' if r[de['Start Date']]
50
                                          <= r['Reference Date']
51
                                          <= r[de['End Date']] else\
52
                   'Past' if r[de['Start Date']] < r['Reference Date'] else 'Future', axis=1)
53
        # decide if the helpful column is to be retained or not
54
        removal_needed = self.fn_decide_by_omission_or_specific_false(de, 'Keep Reference Date')
55
        if removal_needed:
56
            in_df.drop(columns=['Reference Date'], inplace=True)
57
        return in_df
58
59
    def add_value_to_dictionary(self, in_list, adding_value, adding_type, reference_column):
60
        add_type = adding_type.lower()
61
        total_columns = len(in_list)
62
        if reference_column is None:
63
            reference_indexes = {
64
                'add': {
65
                    'after': 0,
66
                    'before': 0,
67
                },
68
                'cycle_down_to': {
69
                    'after': 0,
70
                    'before': 0,
71
                },
72
            }
73
        else:
74
            reference_indexes = {
75
                'add': {
76
                    'after': in_list.copy().index(reference_column) + 1,
77
                    'before': in_list.copy().index(reference_column),
78
                },
79
                'cycle_down_to': {
80
                    'after': in_list.copy().index(reference_column),
81
                    'before': in_list.copy().index(reference_column),
82
                },
83
            }
84
        positions = {
85
            'after': {
86
                'cycle_down_to': reference_indexes.get('cycle_down_to').get('after'),
87
                'add': reference_indexes.get('add').get('after'),
88
            },
89
            'before': {
90
                'cycle_down_to': reference_indexes.get('cycle_down_to').get('before'),
91
                'add': reference_indexes.get('add').get('before'),
92
            },
93
            'first': {
94
                'cycle_down_to': 0,
95
                'add': 0,
96
            },
97
            'last': {
98
                'cycle_down_to': total_columns,
99
                'add': total_columns,
100
            }
101
        }
102
        return self.add_value_to_dictionary_by_position({
103
            'adding_value': adding_value,
104
            'list': in_list,
105
            'position_to_add': positions.get(add_type).get('add'),
106
            'position_to_cycle_down_to': positions.get(add_type).get('cycle_down_to'),
107
            'total_columns': total_columns,
108
        })
109
110
    @staticmethod
111
    def add_value_to_dictionary_by_position(adding_dictionary):
112
        list_with_values = adding_dictionary['list']
113
        list_with_values.append(adding_dictionary['total_columns'])
114
        for counter in range(adding_dictionary['total_columns'],
115
                             adding_dictionary['position_to_cycle_down_to'], -1):
116
            list_with_values[counter] = list_with_values[(counter - 1)]
117
        list_with_values[adding_dictionary['position_to_add']] = adding_dictionary['adding_value']
118
        return list_with_values
119
120
    @staticmethod
121
    def fn_add_weekday_columns_to_data_frame(input_data_frame, columns_list):
122
        for current_column in columns_list:
123
            input_data_frame['Weekday for ' + current_column] = input_data_frame[current_column] \
124
                .apply(lambda x: x.strftime('%A'))
125
        return input_data_frame
126
127
    def fn_apply_query_to_data_frame(self, local_logger, timmer, input_data_frame, extract_params):
128
        timmer.start()
129
        query_expression = ''
130
        generic_pre_feedback = self.lcl.gettext('Will retain only values {filter_type} '
131
                                                + '"{filter_values}" within the field '
132
                                                + '"{column_to_filter}"') \
133
            .replace('{column_to_filter}', extract_params['column_to_filter'])
134
        if extract_params['filter_to_apply'] == 'equal':
135
            local_logger.debug(generic_pre_feedback \
136
                               .replace('{filter_type}', self.lcl.gettext('equal with')) \
137
                               .replace('{filter_values}', extract_params['filter_values']))
138
            query_expression = '`' + extract_params['column_to_filter'] + '` == "' \
139
                               + extract_params['filter_values'] + '"'
140
        elif extract_params['filter_to_apply'] == 'different':
141
            local_logger.debug(generic_pre_feedback \
142
                               .replace('{filter_type}', self.lcl.gettext('different than')) \
143
                               .replace('{filter_values}', extract_params['filter_values']))
144
            query_expression = '`' + extract_params['column_to_filter'] + '` != "' \
145
                               + extract_params['filter_values'] + '"'
146
        elif extract_params['filter_to_apply'] == 'multiple_match':
147
            multiple_values = '["' + '", "'.join(extract_params['filter_values'].values()) + '"]'
148
            local_logger.debug(generic_pre_feedback \
149
                               .replace('{filter_type}',
150
                                        self.lcl.gettext('matching any of these values')) \
151
                               .replace('{filter_values}', multiple_values))
152
            query_expression = '`' + extract_params['column_to_filter'] + '` in ' + multiple_values
153
        local_logger.debug(self.lcl.gettext('Query expression to apply is: {query_expression}') \
154
                           .replace('{query_expression}', query_expression))
155
        input_data_frame.query(query_expression, inplace=True)
156
        timmer.stop()
157
        return input_data_frame
158
159
    @staticmethod
160
    def fn_convert_datetime_columns_to_string(input_data_frame, columns_list, columns_format):
161
        for current_column in columns_list:
162
            input_data_frame[current_column] = \
163
                input_data_frame[current_column].map(lambda x: x.strftime(columns_format))
164
        return input_data_frame
165
166
    @staticmethod
167
    def fn_convert_string_columns_to_datetime(input_data_frame, columns_list, columns_format):
168
        for current_column in columns_list:
169
            input_data_frame[current_column] = pd.to_datetime(input_data_frame[current_column],
170
                                                              format=columns_format)
171
        return input_data_frame
172
173
    @staticmethod
174
    def fn_decide_by_omission_or_specific_false(in_dictionary, key_decision_factor):
175
        removal_needed = False
176
        if key_decision_factor not in in_dictionary:
177
            removal_needed = True
178
        elif not in_dictionary[key_decision_factor]:
179
            removal_needed = True
180
        return removal_needed
181
182
    def fn_filter_data_frame_by_index(self, local_logger, in_data_frame, filter_rule):
183
        reference_expression = filter_rule['Query Expression for Reference Index']
184
        index_current = in_data_frame.query(reference_expression, inplace=False)
185
        local_logger.info(self.lcl.gettext( \
186
            'Current index has been determined to be {index_current_value}') \
187
                          .replace('{index_current_value}', str(index_current.index)))
188
        if str(index_current.index) != "Int64Index([], dtype='int64')" \
189
                and 'Deviation' in filter_rule:
190
            for deviation_type in filter_rule['Deviation']:
191
                deviation_number = filter_rule['Deviation'][deviation_type]
192
                if deviation_type == 'Lower':
193
                    index_to_apply = index_current.index - deviation_number
194
                    in_data_frame = in_data_frame[in_data_frame.index >= index_to_apply[0]]
195
                elif deviation_type == 'Upper':
196
                    index_to_apply = index_current.index + deviation_number
197
                    in_data_frame = in_data_frame[in_data_frame.index <= index_to_apply[0]]
198
                local_logger.info(self.lcl.gettext( \
199
                    '{deviation_type} Deviation Number is {deviation_number} '
200
                    + 'to be applied to Current index, became {index_to_apply}') \
201
                                  .replace('{deviation_type}', deviation_type) \
202
                                  .replace('{deviation_number}', str(deviation_number)) \
203
                                  .replace('{index_to_apply}', str(index_to_apply)))
204
        return in_data_frame
205
206
    @staticmethod
207
    def get_column_index_from_dataframe(data_frame_columns, column_name_to_identify):
208
        column_index_to_return = 0
209
        for ndx, column_name in enumerate(data_frame_columns):
210
            if column_name == column_name_to_identify:
211
                column_index_to_return = ndx
212
        return column_index_to_return
213
214
    def fn_load_file_list_to_data_frame(self, local_logger, timmer, file_list, csv_delimiter):
215
        timmer.start()
216
        combined_csv = pd.concat([pd.read_csv(filepath_or_buffer=current_file,
217
                                              delimiter=csv_delimiter,
218
                                              cache_dates=True,
219
                                              index_col=None,
220
                                              memory_map=True,
221
                                              low_memory=False,
222
                                              encoding='utf-8',
223
                                              ) for current_file in file_list])
224
        local_logger.info(self.lcl.gettext( \
225
            'All relevant files ({files_counted}) were merged into a Pandas Data Frame') \
226
                          .replace('{files_counted}', str(len(file_list))))
227
        timmer.stop()
228
        return combined_csv
229
230
    def fn_store_data_frame_to_file(self, local_logger, timmer, input_data_frame,
231
                                    input_file_details):
232
        timmer.start()
233
        if input_file_details['format'] == 'csv':
234
            input_data_frame.to_csv(path_or_buf=input_file_details['name'],
235
                                    sep=input_file_details['field_delimiter'],
236
                                    header=True,
237
                                    index=False,
238
                                    encoding='utf-8')
239
        local_logger.info(self.lcl.gettext( \
240
            'Pandas Data Frame has just been saved to file "{file_name}", '
241
            + 'considering {file_type} as file type') \
242
                          .replace('{file_name}', input_file_details['name']) \
243
                          .replace('{file_type}', input_file_details['format']))
244
        timmer.stop()
245