Passed
Push — development/test ( 02e03c...b2a2db )
by Daniel
01:09
created

DataManipulator.fn_move_files()   A

Complexity

Conditions 3

Size

Total Lines 17
Code Lines 16

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 3
eloc 16
nop 5
dl 0
loc 17
rs 9.6
c 0
b 0
f 0
1
"""
2
Data Manipulation class
3
"""
4
# package to facilitate operating system operations
5
import os
6
# package to facilitate os path manipulations
7
import pathlib
8
# package facilitating Data Frames manipulation
9
import pandas as pd
10
# package regular expressions
11
import re
12
13
14
class DataManipulator:
15
16
    @staticmethod
17
    def fn_apply_query_to_data_frame(local_logger, timmer, data_frame, extract_params):
18
        timmer.start()
19
        query_expression = ''
20
        if extract_params['filter_to_apply'] == 'equal':
21
            local_logger.debug('Will retain only values equal with "'
22
                               + extract_params['filter_values'] + '" within the field "'
23
                               + extract_params['column_to_filter'] + '"')
24
            query_expression = '`' + extract_params['column_to_filter'] + '` == "' \
25
                               + extract_params['filter_values'] + '"'
26
        elif extract_params['filter_to_apply'] == 'different':
27
            local_logger.debug('Will retain only values different than "'
28
                               + extract_params['filter_values'] + '" within the field "'
29
                               + extract_params['column_to_filter'] + '"')
30
            query_expression = '`' + extract_params['column_to_filter'] + '` != "' \
31
                               + extract_params['filter_values'] + '"'
32
        elif extract_params['filter_to_apply'] == 'multiple_match':
33
            local_logger.debug('Will retain only values equal with "'
34
                               + extract_params['filter_values'] + '" within the field "'
35
                               + extract_params['column_to_filter'] + '"')
36
            query_expression = '`' + extract_params['column_to_filter'] + '` in ["' \
37
                               + '", "'.join(extract_params['filter_values'].values()) \
38
                               + '"]'
39
        local_logger.debug('Query expression to apply is: ' + query_expression)
40
        data_frame.query(query_expression, inplace = True)
41
        timmer.stop()
42
        return data_frame
43
44
    def build_file_list(self, local_logger, timmer, given_input_file):
45
        if re.search(r'(\*|\?)*', given_input_file):
46
            local_logger.debug('Files pattern has been provided')
47
            parent_directory = os.path.dirname(given_input_file)
48
            # loading from a specific folder all files matching a given pattern into a file list
49
            relevant_files_list = self.fn_build_relevant_file_list(local_logger, timmer,
50
                                                                   parent_directory,
51
                                                                   given_input_file)
52
        else:
53
            local_logger.debug('Specific file has been provided')
54
            relevant_files_list = [given_input_file]
55
        return relevant_files_list
56
57
    @staticmethod
58
    def fn_build_relevant_file_list(local_logger, timmer, in_folder, matching_pattern):
59
        timmer.start()
60
        local_logger.info('Will list all files within ' + in_folder
61
                          + ' folder looking for ' + matching_pattern + ' as matching pattern')
62
        list_files = []
63
        file_counter = 0
64
        if os.path.isdir(in_folder):
65
            working_path = pathlib.Path(in_folder)
66
            for current_file in working_path.iterdir():
67
                if current_file.is_file() and current_file.match(matching_pattern):
68
                    list_files.append(file_counter)
69
                    list_files[file_counter] = str(current_file.absolute())
70
                    file_counter = file_counter + 1
71
        local_logger.info('Relevant CSV files from ' + in_folder + ' folder were identified!')
72
        local_logger.info(list_files)
73
        timmer.stop()
74
        return list_files
75
76
    def fn_drop_certain_columns(self, local_logger, timmer, working_dictionary):
77
        for current_file in working_dictionary['files']:
78
            # load all relevant files into a single data frame
79
            df = self.fn_load_file_list_to_data_frame(local_logger, timmer, [current_file],
80
                                                      working_dictionary['csv_field_separator'])
81
            save_necessary = False
82
            for column_to_eliminate in working_dictionary['columns_to_eliminate']:
83
                if column_to_eliminate in df:
84
                    df.drop(columns = column_to_eliminate, inplace = True)
85
                    save_necessary = True
86
            if save_necessary:
87
                self.fn_store_data_frame_to_file(local_logger, timmer, df, current_file,
88
                                                 working_dictionary['csv_field_separator'])
89
90
    @staticmethod
91
    def fn_load_file_list_to_data_frame(local_logger, timmer, file_list, csv_delimiter):
92
        timmer.start()
93
        combined_csv = pd.concat([pd.read_csv(filepath_or_buffer = current_file,
94
                                              delimiter = csv_delimiter,
95
                                              cache_dates = True,
96
                                              index_col = None,
97
                                              memory_map = True,
98
                                              low_memory = False,
99
                                              encoding = 'utf-8',
100
                                              ) for current_file in file_list])
101
        local_logger.info('All relevant files were merged into a Pandas Data Frame')
102
        timmer.stop()
103
        return combined_csv
104
105
    @staticmethod
106
    def fn_move_files(local_logger, timmer, source_folder, file_names, destination_folder):
107
        timmer.start()
108
        resulted_files = []
109
        for current_file in file_names:
110
            new_file_name = current_file.replace(source_folder, destination_folder)
111
            if new_file_name.is_file():
112
                os.replace(current_file, new_file_name)
113
                local_logger.info('File ' + current_file
114
                                  + ' has just been been overwritten  as ' + new_file_name)
115
            else:
116
                os.rename(current_file, new_file_name)
117
                local_logger.info('File ' + current_file
118
                                  + ' has just been renamed as ' + new_file_name)
119
            resulted_files.append(new_file_name)
120
        timmer.stop()
121
        return resulted_files
122
123
    @staticmethod
124
    def fn_store_data_frame_to_file(local_logger, timmer, input_data_frame,
125
                                    destination_file_name, csv_delimiter):
126
        timmer.start()
127
        input_data_frame.to_csv(path_or_buf = destination_file_name,
128
                                sep = csv_delimiter,
129
                                header = True,
130
                                index = False,
131
                                encoding = 'utf-8')
132
        local_logger.info('Data frame has just been saved to file "' + destination_file_name + '"')
133
        timmer.stop()
134