Passed
Push — development/test ( 24302c...f5ca2a )
by Daniel
01:16
created

sources.common.DataManipulator   A

Complexity

Total Complexity 30

Size/Duplication

Total Lines 143
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 125
dl 0
loc 143
rs 10
c 0
b 0
f 0
wmc 30

10 Methods

Rating   Name   Duplication   Size   Complexity  
A DataManipulator.fn_convert_columns_to_datetime() 0 6 2
A DataManipulator.fn_filter_data_frame_by_index() 0 17 5
A DataManipulator.fn_add_weekday_columns_to_data_frame() 0 6 3
A DataManipulator.fn_apply_query_to_data_frame() 0 27 4
A DataManipulator.fn_add_days_within_column_to_data_frame() 0 8 2
A DataManipulator.fn_store_data_frame_to_file() 0 12 2
A DataManipulator.fn_add_timeline_evaluation_column_to_data_frame() 0 10 4
A DataManipulator.fn_load_file_list_to_data_frame() 0 14 1
A DataManipulator.fn_drop_certain_columns() 0 13 5
A DataManipulator.fn_add_minimum_and_maximum_columns_to_data_frame() 0 9 2
1
"""
2
Data Manipulation class
3
"""
4
# package to handle date and times
5
from datetime import timedelta
6
# package facilitating Data Frames manipulation
7
import pandas as pd
8
9
10
class DataManipulator:
11
12
    @staticmethod
13
    def fn_add_days_within_column_to_data_frame(input_data_frame, dict_expression):
14
        input_data_frame['Days Within'] = input_data_frame[dict_expression['End Date']] - \
15
                                          input_data_frame[dict_expression['Start Date']] + \
16
                                          timedelta(days=1)
17
        input_data_frame['Days Within'] = input_data_frame['Days Within'] \
18
            .apply(lambda x: int(str(x).replace(' days 00:00:00', '')))
19
        return input_data_frame
20
21
    @staticmethod
22
    def fn_add_minimum_and_maximum_columns_to_data_frame(input_data_frame, dict_expression):
23
        grouped_df = input_data_frame.groupby(dict_expression['group_by']) \
24
            .agg({dict_expression['calculation']: ['min', 'max']})
25
        grouped_df.columns = ['_'.join(col).strip() for col in grouped_df.columns.values]
26
        grouped_df = grouped_df.reset_index()
27
        if 'map' in dict_expression:
28
            grouped_df.rename(columns=dict_expression['map'], inplace=True)
29
        return grouped_df
30
31
    @staticmethod
32
    def fn_add_timeline_evaluation_column_to_data_frame(in_df, dict_expression):
33
        ref_date = dict_expression['Reference Date']
34
        in_df['P'] = in_df[dict_expression['Start Date']].le(ref_date)
35
        in_df['F'] = in_df[dict_expression['End Date']].ge(ref_date)
36
        in_df['Timeline Evaluation'] = in_df[['P', 'F']] \
37
            .apply(lambda r: 'Current' if r['P'] and r['F'] else 'Past' if r['P'] else 'Future',
38
                   axis = 1)
39
        in_df.drop(columns=['P', 'F'], inplace=True)
40
        return in_df
41
42
    @staticmethod
43
    def fn_add_weekday_columns_to_data_frame(input_data_frame, columns_list):
44
        for current_column in columns_list:
45
            input_data_frame['Weekday for ' + current_column] = input_data_frame[current_column] \
46
                .apply(lambda x: x.strftime('%A'))
47
        return input_data_frame
48
49
    @staticmethod
50
    def fn_apply_query_to_data_frame(local_logger, timmer, input_data_frame, extract_params):
51
        timmer.start()
52
        query_expression = ''
53
        if extract_params['filter_to_apply'] == 'equal':
54
            local_logger.debug('Will retain only values equal with "'
55
                               + extract_params['filter_values'] + '" within the field "'
56
                               + extract_params['column_to_filter'] + '"')
57
            query_expression = '`' + extract_params['column_to_filter'] + '` == "' \
58
                               + extract_params['filter_values'] + '"'
59
        elif extract_params['filter_to_apply'] == 'different':
60
            local_logger.debug('Will retain only values different than "'
61
                               + extract_params['filter_values'] + '" within the field "'
62
                               + extract_params['column_to_filter'] + '"')
63
            query_expression = '`' + extract_params['column_to_filter'] + '` != "' \
64
                               + extract_params['filter_values'] + '"'
65
        elif extract_params['filter_to_apply'] == 'multiple_match':
66
            local_logger.debug('Will retain only values equal with "'
67
                               + extract_params['filter_values'] + '" within the field "'
68
                               + extract_params['column_to_filter'] + '"')
69
            query_expression = '`' + extract_params['column_to_filter'] + '` in ["' \
70
                               + '", "'.join(extract_params['filter_values'].values()) \
71
                               + '"]'
72
        local_logger.debug('Query expression to apply is: ' + query_expression)
73
        input_data_frame.query(query_expression, inplace=True)
74
        timmer.stop()
75
        return input_data_frame
76
77
    @staticmethod
78
    def fn_convert_columns_to_datetime(input_data_frame, columns_list, columns_format):
79
        for current_column in columns_list:
80
            input_data_frame[current_column] = pd.to_datetime(input_data_frame[current_column],
81
                                                              format=columns_format)
82
        return input_data_frame
83
84
    def fn_drop_certain_columns(self, local_logger, timmer, working_dictionary):
85
        for current_file in working_dictionary['files']:
86
            # load all relevant files into a single data frame
87
            df = self.fn_load_file_list_to_data_frame(local_logger, timmer, [current_file],
88
                                                      working_dictionary['csv_field_separator'])
89
            save_necessary = False
90
            for column_to_eliminate in working_dictionary['columns_to_eliminate']:
91
                if column_to_eliminate in df:
92
                    df.drop(columns=column_to_eliminate, inplace=True)
93
                    save_necessary = True
94
            if save_necessary:
95
                self.fn_store_data_frame_to_file(local_logger, timmer, df, current_file,
96
                                                 working_dictionary['csv_field_separator'])
97
98
    @staticmethod
99
    def fn_filter_data_frame_by_index(local_logger, in_data_frame, filter_rule):
100
        index_current = in_data_frame.query('`Timeline Evaluation` == "Current"', inplace=False)
101
        local_logger.info('Current index has been determined to be ' + str(index_current.index))
102
        if 'Deviation' in filter_rule:
103
            for deviation_type in filter_rule['Deviation']:
104
                deviation_number = filter_rule['Deviation'][deviation_type]
105
                if deviation_type == 'Lower':
106
                    index_to_apply = index_current.index - deviation_number
107
                    in_data_frame = in_data_frame[in_data_frame.index >= index_to_apply[0]]
108
                elif deviation_type == 'Upper':
109
                    index_to_apply = index_current.index + deviation_number
110
                    in_data_frame = in_data_frame[in_data_frame.index <= index_to_apply[0]]
111
                local_logger.info(deviation_type + ' Deviation Number is ' + str(deviation_number)
112
                                  + ' to be applied to Current index, became '
113
                                  + str(index_to_apply))
114
        return in_data_frame
115
116
    @staticmethod
117
    def fn_load_file_list_to_data_frame(local_logger, timmer, file_list, csv_delimiter):
118
        timmer.start()
119
        combined_csv = pd.concat([pd.read_csv(filepath_or_buffer=current_file,
120
                                              delimiter=csv_delimiter,
121
                                              cache_dates=True,
122
                                              index_col=None,
123
                                              memory_map=True,
124
                                              low_memory=False,
125
                                              encoding='utf-8',
126
                                              ) for current_file in file_list])
127
        local_logger.info('All relevant files were merged into a Pandas Data Frame')
128
        timmer.stop()
129
        return combined_csv
130
131
    @staticmethod
132
    def fn_store_data_frame_to_file(local_logger, timmer, input_data_frame, input_file_details):
133
        timmer.start()
134
        if input_file_details['format'] == 'csv':
135
            input_data_frame.to_csv(path_or_buf=input_file_details['name'],
136
                                    sep=input_file_details['field-delimiter'],
137
                                    header=True,
138
                                    index=False,
139
                                    encoding='utf-8')
140
        local_logger.info('Data frame has just been saved to file "'
141
                          + input_file_details['name'] + '"')
142
        timmer.stop()
143