Passed
Push — development/test ( c39db9...ec5267 )
by Daniel
01:13
created

sources.common.DataManipulator   B

Complexity

Total Complexity 47

Size/Duplication

Total Lines 207
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 180
dl 0
loc 207
rs 8.64
c 0
b 0
f 0
wmc 47

15 Methods

Rating   Name   Duplication   Size   Complexity  
A DataManipulator.fn_add_days_within_column_to_data_frame() 0 8 2
A DataManipulator.fn_add_minimum_and_maximum_columns_to_data_frame() 0 9 2
A DataManipulator.add_value_to_dictionary_by_position() 0 9 2
A DataManipulator.fn_add_weekday_columns_to_data_frame() 0 6 3
A DataManipulator.fn_convert_datetime_columns_to_string() 0 6 3
A DataManipulator.fn_filter_data_frame_by_index() 0 17 5
A DataManipulator.fn_decide_by_omission_or_specific_false() 0 8 3
A DataManipulator.fn_apply_query_to_data_frame() 0 27 4
A DataManipulator.fn_convert_string_columns_to_datetime() 0 6 2
A DataManipulator.fn_store_data_frame_to_file() 0 12 2
A DataManipulator.fn_add_timeline_evaluation_column_to_data_frame() 0 17 5
A DataManipulator.get_column_index_from_dataframe() 0 7 3
A DataManipulator.fn_load_file_list_to_data_frame() 0 14 1
A DataManipulator.fn_drop_certain_columns() 0 13 5
B DataManipulator.add_value_to_dictionary() 0 21 5

How to fix   Complexity   

Complexity

Complex classes like sources.common.DataManipulator often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""
2
Data Manipulation class
3
"""
4
# package to handle date and times
5
from datetime import timedelta
6
# package facilitating Data Frames manipulation
7
import pandas as pd
8
9
10
class DataManipulator:
11
12
    @staticmethod
13
    def fn_add_days_within_column_to_data_frame(input_data_frame, dict_expression):
14
        input_data_frame['Days Within'] = input_data_frame[dict_expression['End Date']] - \
15
                                          input_data_frame[dict_expression['Start Date']] + \
16
                                          timedelta(days=1)
17
        input_data_frame['Days Within'] = input_data_frame['Days Within'] \
18
            .apply(lambda x: int(str(x).replace(' days 00:00:00', '')))
19
        return input_data_frame
20
21
    @staticmethod
22
    def fn_add_minimum_and_maximum_columns_to_data_frame(input_data_frame, dict_expression):
23
        grouped_df = input_data_frame.groupby(dict_expression['group_by']) \
24
            .agg({dict_expression['calculation']: ['min', 'max']})
25
        grouped_df.columns = ['_'.join(col).strip() for col in grouped_df.columns.values]
26
        grouped_df = grouped_df.reset_index()
27
        if 'map' in dict_expression:
28
            grouped_df.rename(columns=dict_expression['map'], inplace=True)
29
        return grouped_df
30
31
    def fn_add_timeline_evaluation_column_to_data_frame(self, in_df, dict_expression):
32
        # shorten last method parameter
33
        de = dict_expression
34
        # add helpful column to use on "Timeline Evaluation" column determination
35
        in_df['Reference Date'] = de['Reference Date']
36
        # actual "Timeline Evaluation" column determination
37
        cols = ['Reference Date', de['Start Date'], de['End Date']]
38
        in_df['Timeline Evaluation'] = in_df[cols] \
39
            .apply(lambda r: 'Current' if r[de['Start Date']]
40
                                          <= r['Reference Date']
41
                                          <= r[de['End Date']] else\
42
                   'Past' if r[de['Start Date']] < r['Reference Date'] else 'Future', axis=1)
43
        # decide if the helpful column is to be retained or not
44
        removal_needed = self.fn_decide_by_omission_or_specific_false(de, 'Keep Reference Date')
45
        if removal_needed:
46
            in_df.drop(columns=['Reference Date'], inplace=True)
47
        return in_df
48
49
    def add_value_to_dictionary(self, in_list, adding_value, adding_type, reference_column):
50
        add_type = adding_type.lower()
51
        total_columns = len(in_list)
52
        if add_type == 'last':
53
            position_to_cycle_down_to = total_columns
54
            position_to_add = total_columns
55
        elif add_type == 'first':
56
            position_to_cycle_down_to = 0
57
            position_to_add = position_to_cycle_down_to
58
        elif add_type == 'after':
59
            position_to_cycle_down_to = in_list.index(reference_column)
60
            position_to_add = position_to_cycle_down_to + 1
61
        elif add_type == 'before':
62
            position_to_cycle_down_to = in_list.index(reference_column) - 1
63
            position_to_add = position_to_cycle_down_to + 1
64
        return self.add_value_to_dictionary_by_position({
65
            'adding_value': adding_value,
66
            'list': in_list,
67
            'position_to_add': position_to_add,
0 ignored issues
show
introduced by
The variable position_to_add does not seem to be defined for all execution paths.
Loading history...
68
            'position_to_cycle_down_to': position_to_cycle_down_to,
0 ignored issues
show
introduced by
The variable position_to_cycle_down_to does not seem to be defined for all execution paths.
Loading history...
69
            'total_columns': total_columns,
70
        })
71
72
    @staticmethod
73
    def add_value_to_dictionary_by_position(adding_dictionary):
74
        list_with_values = adding_dictionary['list']
75
        list_with_values.append(adding_dictionary['total_columns'])
76
        for counter in range(adding_dictionary['total_columns'],
77
                             adding_dictionary['position_to_cycle_down_to'], -1):
78
            list_with_values[counter] = list_with_values[(counter - 1)]
79
        list_with_values[adding_dictionary['position_to_add']] = adding_dictionary['adding_value']
80
        return list_with_values
81
82
    @staticmethod
83
    def fn_add_weekday_columns_to_data_frame(input_data_frame, columns_list):
84
        for current_column in columns_list:
85
            input_data_frame['Weekday for ' + current_column] = input_data_frame[current_column] \
86
                .apply(lambda x: x.strftime('%A'))
87
        return input_data_frame
88
89
    @staticmethod
90
    def fn_apply_query_to_data_frame(local_logger, timmer, input_data_frame, extract_params):
91
        timmer.start()
92
        query_expression = ''
93
        if extract_params['filter_to_apply'] == 'equal':
94
            local_logger.debug('Will retain only values equal with "'
95
                               + extract_params['filter_values'] + '" within the field "'
96
                               + extract_params['column_to_filter'] + '"')
97
            query_expression = '`' + extract_params['column_to_filter'] + '` == "' \
98
                               + extract_params['filter_values'] + '"'
99
        elif extract_params['filter_to_apply'] == 'different':
100
            local_logger.debug('Will retain only values different than "'
101
                               + extract_params['filter_values'] + '" within the field "'
102
                               + extract_params['column_to_filter'] + '"')
103
            query_expression = '`' + extract_params['column_to_filter'] + '` != "' \
104
                               + extract_params['filter_values'] + '"'
105
        elif extract_params['filter_to_apply'] == 'multiple_match':
106
            local_logger.debug('Will retain only values equal with "'
107
                               + extract_params['filter_values'] + '" within the field "'
108
                               + extract_params['column_to_filter'] + '"')
109
            query_expression = '`' + extract_params['column_to_filter'] + '` in ["' \
110
                               + '", "'.join(extract_params['filter_values'].values()) \
111
                               + '"]'
112
        local_logger.debug('Query expression to apply is: ' + query_expression)
113
        input_data_frame.query(query_expression, inplace=True)
114
        timmer.stop()
115
        return input_data_frame
116
117
    @staticmethod
118
    def fn_convert_datetime_columns_to_string(input_data_frame, columns_list, columns_format):
119
        for current_column in columns_list:
120
            input_data_frame[current_column] = \
121
                input_data_frame[current_column].map(lambda x: x.strftime(columns_format))
122
        return input_data_frame
123
124
    @staticmethod
125
    def fn_convert_string_columns_to_datetime(input_data_frame, columns_list, columns_format):
126
        for current_column in columns_list:
127
            input_data_frame[current_column] = pd.to_datetime(input_data_frame[current_column],
128
                                                              format=columns_format)
129
        return input_data_frame
130
131
    @staticmethod
132
    def fn_decide_by_omission_or_specific_false(in_dictionary, key_decision_factor):
133
        removal_needed = False
134
        if key_decision_factor not in in_dictionary:
135
            removal_needed = True
136
        elif not in_dictionary[key_decision_factor]:
137
            removal_needed = True
138
        return removal_needed
139
140
    def fn_drop_certain_columns(self, local_logger, timmer, working_dictionary):
141
        for current_file in working_dictionary['files']:
142
            # load all relevant files into a single data frame
143
            df = self.fn_load_file_list_to_data_frame(local_logger, timmer, [current_file],
144
                                                      working_dictionary['csv_field_separator'])
145
            save_necessary = False
146
            for column_to_eliminate in working_dictionary['columns_to_eliminate']:
147
                if column_to_eliminate in df:
148
                    df.drop(columns=column_to_eliminate, inplace=True)
149
                    save_necessary = True
150
            if save_necessary:
151
                self.fn_store_data_frame_to_file(local_logger, timmer, df, current_file,
152
                                                 working_dictionary['csv_field_separator'])
153
154
    @staticmethod
155
    def fn_filter_data_frame_by_index(local_logger, in_data_frame, filter_rule):
156
        index_current = in_data_frame.query('`Timeline Evaluation` == "Current"', inplace=False)
157
        local_logger.info('Current index has been determined to be ' + str(index_current.index))
158
        if 'Deviation' in filter_rule:
159
            for deviation_type in filter_rule['Deviation']:
160
                deviation_number = filter_rule['Deviation'][deviation_type]
161
                if deviation_type == 'Lower':
162
                    index_to_apply = index_current.index - deviation_number
163
                    in_data_frame = in_data_frame[in_data_frame.index >= index_to_apply[0]]
164
                elif deviation_type == 'Upper':
165
                    index_to_apply = index_current.index + deviation_number
166
                    in_data_frame = in_data_frame[in_data_frame.index <= index_to_apply[0]]
167
                local_logger.info(deviation_type + ' Deviation Number is ' + str(deviation_number)
168
                                  + ' to be applied to Current index, became '
169
                                  + str(index_to_apply))
0 ignored issues
show
introduced by
The variable index_to_apply does not seem to be defined for all execution paths.
Loading history...
170
        return in_data_frame
171
172
    @staticmethod
173
    def get_column_index_from_dataframe(data_frame_columns, column_name_to_identify):
174
        column_index_to_return = 0
175
        for ndx, column_name in enumerate(data_frame_columns):
176
            if column_name == column_name_to_identify:
177
                column_index_to_return = ndx
178
        return column_index_to_return
179
180
    @staticmethod
181
    def fn_load_file_list_to_data_frame(local_logger, timmer, file_list, csv_delimiter):
182
        timmer.start()
183
        combined_csv = pd.concat([pd.read_csv(filepath_or_buffer=current_file,
184
                                              delimiter=csv_delimiter,
185
                                              cache_dates=True,
186
                                              index_col=None,
187
                                              memory_map=True,
188
                                              low_memory=False,
189
                                              encoding='utf-8',
190
                                              ) for current_file in file_list])
191
        local_logger.info('All relevant files were merged into a Pandas Data Frame')
192
        timmer.stop()
193
        return combined_csv
194
195
    @staticmethod
196
    def fn_store_data_frame_to_file(local_logger, timmer, input_data_frame, input_file_details):
197
        timmer.start()
198
        if input_file_details['format'] == 'csv':
199
            input_data_frame.to_csv(path_or_buf=input_file_details['name'],
200
                                    sep=input_file_details['field-delimiter'],
201
                                    header=True,
202
                                    index=False,
203
                                    encoding='utf-8')
204
        local_logger.info('Data frame has just been saved to file "'
205
                          + input_file_details['name'] + '"')
206
        timmer.stop()
207