Test Failed
Branch development/test (a98bf9)
by Daniel
04:33
created

sources.common.FileOperations   A

Complexity

Total Complexity 30

Size/Duplication

Total Lines 224
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 30
eloc 191
dl 0
loc 224
rs 10
c 0
b 0
f 0

12 Methods

Rating   Name   Duplication   Size   Complexity  
A FileOperations.__init__() 0 4 1
A FileOperations.fn_build_file_list_internal() 0 11 4
A FileOperations.fn_get_file_datetime_verdict() 0 32 4
A FileOperations.fn_build_file_list() 0 14 2
A FileOperations.fn_store_file_statistics() 0 18 3
A FileOperations.fn_open_file_and_get_content() 0 11 3
A FileOperations.fn_get_file_simple_statistics() 0 8 1
A FileOperations.fn_get_file_statistics() 0 15 2
A FileOperations.fn_build_relevant_file_list() 0 21 2
A FileOperations.fn_move_files() 0 27 3
A FileOperations.fn_get_file_dates() 0 5 1
A FileOperations.fn_get_file_content() 0 21 4
1
"""
2
facilitates File Operations
3
"""
4
# package to handle date and times
5
from datetime import datetime
6
# package to add support for multi-language (i18n)
7
import gettext
8
# package to use for checksum calculations (in this file)
9
import hashlib
10
# package to handle json files
11
import json
12
# package to facilitate operating system operations
13
import os
14
# package to facilitate os path manipulations
15
import pathlib
16
# package regular expressions
17
import re
18
19
20
class FileOperations:
21
    timestamp_format = '%Y-%m-%d %H:%M:%S.%f %Z'
22
    locale = None
23
24
    def __init__(self, default_language='en_US'):
25
        current_script = os.path.basename(__file__).replace('.py', '')
26
        lang_folder = os.path.join(os.path.dirname(__file__), current_script + '_Locale')
27
        self.locale = gettext.translation(current_script, lang_folder, languages=[default_language])
28
29
    def fn_build_file_list(self, local_logger, timer, given_input_file):
30
        timer.start()
31
        if re.search(r'(\*|\?)*', given_input_file):
32
            local_logger.debug(self.locale.gettext('File matching pattern identified'))
33
            parent_directory = os.path.dirname(given_input_file)
34
            # loading from a specific folder all files matching a given pattern into a file list
35
            relevant_files_list = self.fn_build_relevant_file_list(local_logger,
36
                                                                   parent_directory,
37
                                                                   given_input_file)
38
        else:
39
            local_logger.debug(self.locale.gettext('Specific file name provided'))
40
            relevant_files_list = [given_input_file]
41
        timer.stop()
42
        return relevant_files_list
43
44
    def fn_build_file_list_internal(self, local_logger, working_path, matching_pattern):
45
        resulted_file_list = []
46
        file_counter = 0
47
        for current_file in working_path.iterdir():
48
            if current_file.is_file() and current_file.match(matching_pattern):
49
                resulted_file_list.append(file_counter)
50
                resulted_file_list[file_counter] = str(current_file.absolute())
51
                local_logger.info(self.locale.gettext('{file_name} identified')
52
                                  .replace('{file_name}', str(current_file.absolute())))
53
                file_counter = file_counter + 1
54
        return resulted_file_list
55
56
    def fn_build_relevant_file_list(self, local_logger, in_folder, matching_pattern):
57
        local_logger.info(
58
                self.locale.gettext('Listing all files within {in_folder} folder '
59
                                    + 'looking for {matching_pattern} as matching pattern')
60
                    .replace('{in_folder}', in_folder)
61
                    .replace('{matching_pattern}', matching_pattern))
62
        list_files = []
63
        if os.path.isdir(in_folder):
64
            working_path = pathlib.Path(in_folder)
65
            list_files = self.fn_build_file_list_internal(local_logger, working_path,
66
                                                          matching_pattern)
67
            file_counter = len(list_files)
68
            local_logger.info(self.locale.ngettext(
69
                '{files_counted} file from {in_folder} folder identified',
70
                '{files_counted} files from {in_folder} folder identified', file_counter)
71
                              .replace('{files_counted}', str(file_counter))
72
                              .replace('{in_folder}', in_folder))
73
        else:
74
            local_logger.error(self.locale.gettext('Folder {folder_name} does not exist')
75
                               .replace('{folder_name}', in_folder))
76
        return list_files
77
78
    def fn_get_file_content(self, in_file_handler, in_file_type):
79
        if in_file_type == 'json':
80
            try:
81
                json_interpreted_details = json.load(in_file_handler)
82
                print(datetime.utcnow().strftime(self.timestamp_format) + '- ' +
83
                      self.locale.gettext('JSON structure interpreted'))
84
                return json_interpreted_details
85
            except Exception as e:
86
                print(datetime.utcnow().strftime(self.timestamp_format) + '- ' +
87
                      self.locale.gettext('Error encountered when trying to interpret JSON'))
88
                print(e)
89
        elif in_file_type == 'raw':
90
            raw_interpreted_file = in_file_handler.read()
91
            print(datetime.utcnow().strftime(self.timestamp_format) + '- ' +
92
                  self.locale.gettext('Entire file content read'))
93
            return raw_interpreted_file
94
        else:
95
            print(datetime.utcnow().strftime(self.timestamp_format) + '- ' +
96
                  self.locale.gettext('Unknown file type provided, '
97
                                      + 'expected either "json" or "raw" but got {in_file_type}')
98
                  .replace('{in_file_type}', in_file_type))
99
100
    @staticmethod
101
    def fn_get_file_dates(file_to_evaluate):
102
        return {
103
            'created': datetime.fromtimestamp(os.path.getctime(file_to_evaluate)),
104
            'last modified': datetime.fromtimestamp(os.path.getmtime(file_to_evaluate)),
105
        }
106
107
    def fn_get_file_simple_statistics(self, file_to_evaluate):
108
        file_date_time = self.fn_get_file_dates(file_to_evaluate)
109
        return {
110
            'date when created': datetime.strftime(file_date_time['created'],
111
                                                   self.timestamp_format).strip(),
112
            'date when last modified': datetime.strftime(file_date_time['last modified'],
113
                                                         self.timestamp_format).strip(),
114
            'size [bytes]': os.path.getsize(file_to_evaluate),
115
        }
116
117
    def fn_get_file_statistics(self, file_to_evaluate):
118
        file_statistics = self.fn_get_file_simple_statistics(file_to_evaluate)
119
        try:
120
            file_handler = open(file=file_to_evaluate, mode='r', encoding='mbcs')
121
        except UnicodeDecodeError:
122
            file_handler = open(file=file_to_evaluate, mode='r', encoding='utf-8')
123
        file_content = file_handler.read().encode()
124
        file_handler.close()
125
        file_statistics['MD5 Checksum'] = hashlib.md5(file_content).hexdigest()
126
        file_statistics['SHA1 Checksum'] = hashlib.sha1(file_content).hexdigest()
127
        file_statistics['SHA224 Checksum'] = hashlib.sha224(file_content).hexdigest()
128
        file_statistics['SHA256 Checksum'] = hashlib.sha256(file_content).hexdigest()
129
        file_statistics['SHA384 Checksum'] = hashlib.sha384(file_content).hexdigest()
130
        file_statistics['SHA512 Checksum'] = hashlib.sha512(file_content).hexdigest()
131
        return file_statistics
132
133
    def fn_get_file_datetime_verdict(self, local_logger, file_to_evaluate,
134
                                     created_or_modified, reference_datetime):
135
        implemented_choices = ['created', 'last modified']
136
        verdict = self.locale.gettext('unknown')
137
        file_date_time = self.fn_get_file_dates(file_to_evaluate)
138
        if created_or_modified in implemented_choices:
139
            which_datetime = file_date_time.get(created_or_modified)
140
            verdict = self.locale.gettext('older')
141
            if which_datetime > reference_datetime:
142
                verdict = self.locale.gettext('newer')
143
            elif which_datetime == reference_datetime:
144
                verdict = self.locale.gettext('same')
145
            str_file_datetime = datetime.strftime(which_datetime, self.timestamp_format).strip()
146
            str_reference = datetime.strftime(reference_datetime, self.timestamp_format).strip()
147
            local_logger.debug(self.locale.gettext(
148
                    'File "{file_name}" which has the {created_or_modified} datetime '
149
                    + 'as "{file_datetime}" vs. "{reference_datetime}" '
150
                    + 'has a verdict = {verdict}')
151
                              .replace('{file_name}', str(file_to_evaluate))
152
                              .replace('{created_or_modified}',
153
                                       self.locale.gettext(created_or_modified))
154
                              .replace('{reference_datetime}', str_reference)
155
                              .replace('{file_datetime}', str_file_datetime)
156
                              .replace('{verdict}', verdict))
157
        else:
158
            local_logger.error(self.locale.gettext(
159
                    'Unknown file datetime choice, '
160
                    + 'expected is one of the following choices "{implemented_choices}" '
161
                    + 'but provided was "{created_or_modified}"...')
162
                               .replace('{implemented_choices}', '", "'.join(implemented_choices))
163
                               .replace('{created_or_modified}', created_or_modified))
164
        return verdict
165
166
    def fn_move_files(self, local_logger, timer, file_names, destination_folder):
167
        timer.start()
168
        resulted_files = []
169
        for current_file in file_names:
170
            source_folder = os.path.dirname(current_file)
171
            new_file_name = current_file.replace(source_folder, destination_folder)
172
            if os.path.isfile(new_file_name):
173
                local_logger.warning(self.locale.gettext('File {file_name} will be overwritten')
174
                                     .replace('{file_name}', current_file))
175
                os.replace(current_file, new_file_name)
176
                local_logger.info(self.locale.gettext(
177
                    'File {file_name} has just been been overwritten as {new_file_name}')
178
                                     .replace('{file_name}', current_file)
179
                                     .replace('{new_file_name}', current_file))
180
            else:
181
                local_logger.info(self.locale.gettext(
182
                    'File {file_name} will be renamed as {new_file_name}')
183
                                     .replace('{file_name}', current_file)
184
                                     .replace('{new_file_name}', current_file))
185
                os.rename(current_file, new_file_name)
186
                local_logger.info(self.locale.gettext(
187
                    'File {file_name} has just been renamed as {new_file_name}')
188
                                     .replace('{file_name}', current_file)
189
                                     .replace('{new_file_name}', current_file))
190
            resulted_files.append(str(new_file_name))
191
        timer.stop()
192
        return resulted_files
193
194
    def fn_open_file_and_get_content(self, input_file, content_type='json'):
195
        if os.path.isfile(input_file):
196
            with open(input_file, 'r') as file_handler:
197
                print(datetime.utcnow().strftime(self.timestamp_format) + '- ' +
198
                      self.locale.gettext('File {file_name} has just been opened')
199
                      .replace('{file_name}', str(input_file)))
200
                return self.fn_get_file_content(file_handler, content_type)
201
        else:
202
            print(datetime.utcnow().strftime(self.timestamp_format) + '- ' +
203
                  self.locale.gettext('File {file_name} does not exist')
204
                  .replace('{file_name}', str(input_file)))
205
206
    def fn_store_file_statistics(self, local_logger, timer, file_name, file_meaning):
207
        timer.start()
208
        list_file_names = [file_name]
209
        if type(file_name) == list:
210
            list_file_names = file_name
211
        for current_file_name in list_file_names:
212
            dt_when_last_modified = 'date when last modified'
213
            file_statistics = str(self.fn_get_file_statistics(current_file_name))\
214
                .replace('date when created', self.locale.gettext('date when created')) \
215
                .replace(dt_when_last_modified, self.locale.gettext(dt_when_last_modified)) \
216
                .replace('size [bytes]', self.locale.gettext('size [bytes]')) \
217
                .replace('Checksum', self.locale.gettext('Checksum'))
218
            local_logger.info(self.locale.gettext(
219
                'File "{file_name}" has the following characteristics: {file_statistics}')
220
                              .replace('{file_meaning}', file_meaning)
221
                              .replace('{file_name}', current_file_name)
222
                              .replace('{file_statistics}', file_statistics))
223
        timer.stop()
224