Passed
Push — development/test ( ec5267...7ab62a )
by Daniel
01:06
created

sources.common.DataManipulator   A

Complexity

Total Complexity 39

Size/Duplication

Total Lines 221
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 184
dl 0
loc 221
rs 9.28
c 0
b 0
f 0
wmc 39

14 Methods

Rating   Name   Duplication   Size   Complexity  
A DataManipulator.fn_add_days_within_column_to_data_frame() 0 8 2
A DataManipulator.fn_add_minimum_and_maximum_columns_to_data_frame() 0 9 2
A DataManipulator.fn_add_timeline_evaluation_column_to_data_frame() 0 17 5
A DataManipulator.add_value_to_dictionary_by_position() 0 9 2
A DataManipulator.fn_add_weekday_columns_to_data_frame() 0 6 3
A DataManipulator.fn_convert_datetime_columns_to_string() 0 6 3
A DataManipulator.fn_filter_data_frame_by_index() 0 17 5
A DataManipulator.fn_decide_by_omission_or_specific_false() 0 8 3
A DataManipulator.fn_apply_query_to_data_frame() 0 27 4
A DataManipulator.fn_convert_string_columns_to_datetime() 0 6 2
A DataManipulator.fn_store_data_frame_to_file() 0 12 2
A DataManipulator.get_column_index_from_dataframe() 0 7 3
A DataManipulator.fn_load_file_list_to_data_frame() 0 14 1
B DataManipulator.add_value_to_dictionary() 0 49 2
1
"""
2
Data Manipulation class
3
"""
4
# package to handle date and times
5
from datetime import timedelta
6
# package facilitating Data Frames manipulation
7
import pandas as pd
8
9
10
class DataManipulator:
11
12
    @staticmethod
13
    def fn_add_days_within_column_to_data_frame(input_data_frame, dict_expression):
14
        input_data_frame['Days Within'] = input_data_frame[dict_expression['End Date']] - \
15
                                          input_data_frame[dict_expression['Start Date']] + \
16
                                          timedelta(days=1)
17
        input_data_frame['Days Within'] = input_data_frame['Days Within'] \
18
            .apply(lambda x: int(str(x).replace(' days 00:00:00', '')))
19
        return input_data_frame
20
21
    @staticmethod
22
    def fn_add_minimum_and_maximum_columns_to_data_frame(input_data_frame, dict_expression):
23
        grouped_df = input_data_frame.groupby(dict_expression['group_by']) \
24
            .agg({dict_expression['calculation']: ['min', 'max']})
25
        grouped_df.columns = ['_'.join(col).strip() for col in grouped_df.columns.values]
26
        grouped_df = grouped_df.reset_index()
27
        if 'map' in dict_expression:
28
            grouped_df.rename(columns=dict_expression['map'], inplace=True)
29
        return grouped_df
30
31
    def fn_add_timeline_evaluation_column_to_data_frame(self, in_df, dict_expression):
32
        # shorten last method parameter
33
        de = dict_expression
34
        # add helpful column to use on "Timeline Evaluation" column determination
35
        in_df['Reference Date'] = de['Reference Date']
36
        # actual "Timeline Evaluation" column determination
37
        cols = ['Reference Date', de['Start Date'], de['End Date']]
38
        in_df['Timeline Evaluation'] = in_df[cols] \
39
            .apply(lambda r: 'Current' if r[de['Start Date']]
40
                                          <= r['Reference Date']
41
                                          <= r[de['End Date']] else\
42
                   'Past' if r[de['Start Date']] < r['Reference Date'] else 'Future', axis=1)
43
        # decide if the helpful column is to be retained or not
44
        removal_needed = self.fn_decide_by_omission_or_specific_false(de, 'Keep Reference Date')
45
        if removal_needed:
46
            in_df.drop(columns=['Reference Date'], inplace=True)
47
        return in_df
48
49
    def add_value_to_dictionary(self, in_list, adding_value, adding_type, reference_column):
50
        add_type = adding_type.lower()
51
        total_columns = len(in_list)
52
        if reference_column is None:
53
            reference_indexes = {
54
                'add': {
55
                    'after': 0,
56
                    'before': 0,
57
                },
58
                'cycle_down_to': {
59
                    'after': 0,
60
                    'before': 0,
61
                },
62
            }
63
        else:
64
            reference_indexes = {
65
                'add': {
66
                    'after': in_list.copy().index(reference_column) + 1,
67
                    'before': in_list.copy().index(reference_column),
68
                },
69
                'cycle_down_to': {
70
                    'after': in_list.copy().index(reference_column),
71
                    'before': in_list.copy().index(reference_column),
72
                },
73
            }
74
        positions = {
75
            'after': {
76
                'cycle_down_to': reference_indexes.get('cycle_down_to').get('after'),
77
                'add': reference_indexes.get('add').get('after'),
78
            },
79
            'before': {
80
                'cycle_down_to': reference_indexes.get('cycle_down_to').get('before'),
81
                'add': reference_indexes.get('add').get('before'),
82
            },
83
            'first': {
84
                'cycle_down_to': 0,
85
                'add': 0,
86
            },
87
            'last': {
88
                'cycle_down_to': total_columns,
89
                'add': total_columns,
90
            }
91
        }
92
        return self.add_value_to_dictionary_by_position({
93
            'adding_value': adding_value,
94
            'list': in_list,
95
            'position_to_add': positions.get(add_type).get('add'),
96
            'position_to_cycle_down_to': positions.get(add_type).get('cycle_down_to'),
97
            'total_columns': total_columns,
98
        })
99
100
    @staticmethod
101
    def add_value_to_dictionary_by_position(adding_dictionary):
102
        list_with_values = adding_dictionary['list']
103
        list_with_values.append(adding_dictionary['total_columns'])
104
        for counter in range(adding_dictionary['total_columns'],
105
                             adding_dictionary['position_to_cycle_down_to'], -1):
106
            list_with_values[counter] = list_with_values[(counter - 1)]
107
        list_with_values[adding_dictionary['position_to_add']] = adding_dictionary['adding_value']
108
        return list_with_values
109
110
    @staticmethod
111
    def fn_add_weekday_columns_to_data_frame(input_data_frame, columns_list):
112
        for current_column in columns_list:
113
            input_data_frame['Weekday for ' + current_column] = input_data_frame[current_column] \
114
                .apply(lambda x: x.strftime('%A'))
115
        return input_data_frame
116
117
    @staticmethod
118
    def fn_apply_query_to_data_frame(local_logger, timmer, input_data_frame, extract_params):
119
        timmer.start()
120
        query_expression = ''
121
        if extract_params['filter_to_apply'] == 'equal':
122
            local_logger.debug('Will retain only values equal with "'
123
                               + extract_params['filter_values'] + '" within the field "'
124
                               + extract_params['column_to_filter'] + '"')
125
            query_expression = '`' + extract_params['column_to_filter'] + '` == "' \
126
                               + extract_params['filter_values'] + '"'
127
        elif extract_params['filter_to_apply'] == 'different':
128
            local_logger.debug('Will retain only values different than "'
129
                               + extract_params['filter_values'] + '" within the field "'
130
                               + extract_params['column_to_filter'] + '"')
131
            query_expression = '`' + extract_params['column_to_filter'] + '` != "' \
132
                               + extract_params['filter_values'] + '"'
133
        elif extract_params['filter_to_apply'] == 'multiple_match':
134
            local_logger.debug('Will retain only values equal with "'
135
                               + extract_params['filter_values'] + '" within the field "'
136
                               + extract_params['column_to_filter'] + '"')
137
            query_expression = '`' + extract_params['column_to_filter'] + '` in ["' \
138
                               + '", "'.join(extract_params['filter_values'].values()) \
139
                               + '"]'
140
        local_logger.debug('Query expression to apply is: ' + query_expression)
141
        input_data_frame.query(query_expression, inplace=True)
142
        timmer.stop()
143
        return input_data_frame
144
145
    @staticmethod
146
    def fn_convert_datetime_columns_to_string(input_data_frame, columns_list, columns_format):
147
        for current_column in columns_list:
148
            input_data_frame[current_column] = \
149
                input_data_frame[current_column].map(lambda x: x.strftime(columns_format))
150
        return input_data_frame
151
152
    @staticmethod
153
    def fn_convert_string_columns_to_datetime(input_data_frame, columns_list, columns_format):
154
        for current_column in columns_list:
155
            input_data_frame[current_column] = pd.to_datetime(input_data_frame[current_column],
156
                                                              format=columns_format)
157
        return input_data_frame
158
159
    @staticmethod
160
    def fn_decide_by_omission_or_specific_false(in_dictionary, key_decision_factor):
161
        removal_needed = False
162
        if key_decision_factor not in in_dictionary:
163
            removal_needed = True
164
        elif not in_dictionary[key_decision_factor]:
165
            removal_needed = True
166
        return removal_needed
167
168
    @staticmethod
169
    def fn_filter_data_frame_by_index(local_logger, in_data_frame, filter_rule):
170
        index_current = in_data_frame.query('`Timeline Evaluation` == "Current"', inplace=False)
171
        local_logger.info('Current index has been determined to be ' + str(index_current.index))
172
        if 'Deviation' in filter_rule:
173
            for deviation_type in filter_rule['Deviation']:
174
                deviation_number = filter_rule['Deviation'][deviation_type]
175
                if deviation_type == 'Lower':
176
                    index_to_apply = index_current.index - deviation_number
177
                    in_data_frame = in_data_frame[in_data_frame.index >= index_to_apply[0]]
178
                elif deviation_type == 'Upper':
179
                    index_to_apply = index_current.index + deviation_number
180
                    in_data_frame = in_data_frame[in_data_frame.index <= index_to_apply[0]]
181
                local_logger.info(deviation_type + ' Deviation Number is ' + str(deviation_number)
182
                                  + ' to be applied to Current index, became '
183
                                  + str(index_to_apply))
0 ignored issues
show
introduced by
The variable index_to_apply does not seem to be defined for all execution paths.
Loading history...
184
        return in_data_frame
185
186
    @staticmethod
187
    def get_column_index_from_dataframe(data_frame_columns, column_name_to_identify):
188
        column_index_to_return = 0
189
        for ndx, column_name in enumerate(data_frame_columns):
190
            if column_name == column_name_to_identify:
191
                column_index_to_return = ndx
192
        return column_index_to_return
193
194
    @staticmethod
195
    def fn_load_file_list_to_data_frame(local_logger, timmer, file_list, csv_delimiter):
196
        timmer.start()
197
        combined_csv = pd.concat([pd.read_csv(filepath_or_buffer=current_file,
198
                                              delimiter=csv_delimiter,
199
                                              cache_dates=True,
200
                                              index_col=None,
201
                                              memory_map=True,
202
                                              low_memory=False,
203
                                              encoding='utf-8',
204
                                              ) for current_file in file_list])
205
        local_logger.info('All relevant files were merged into a Pandas Data Frame')
206
        timmer.stop()
207
        return combined_csv
208
209
    @staticmethod
210
    def fn_store_data_frame_to_file(local_logger, timmer, input_data_frame, input_file_details):
211
        timmer.start()
212
        if input_file_details['format'] == 'csv':
213
            input_data_frame.to_csv(path_or_buf=input_file_details['name'],
214
                                    sep=input_file_details['field_delimiter'],
215
                                    header=True,
216
                                    index=False,
217
                                    encoding='utf-8')
218
        local_logger.info('Data frame has just been saved to file "'
219
                          + input_file_details['name'] + '"')
220
        timmer.stop()
221