Passed
Push — development/test ( 7ab62a...6c4337 )
by Daniel
01:22
created

FileOperations.__init__()   A

Complexity

Conditions 1

Size

Total Lines 4
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 4
nop 2
dl 0
loc 4
rs 10
c 0
b 0
f 0
1
"""
2
facilitates File Operations
3
"""
4
# package to handle date and times
5
from datetime import datetime
6
# package to add support for multi-language (i18n)
7
import gettext
8
# package to use for checksum calculations (in this file)
9
import hashlib
10
# package to handle json files
11
import json
12
# package to facilitate operating system operations
13
import os
14
# package to facilitate os path manipulations
15
import pathlib
16
# package regular expressions
17
import re
18
19
20
class FileOperations:
21
    timestamp_format = '%Y-%m-%d %H:%M:%S.%f %Z'
22
    lcl = None
23
24
    def __init__(self, default_language='en_US'):
25
        current_script = os.path.basename(__file__).replace('.py', '')
26
        lang_folder = os.path.join(os.path.dirname(__file__), current_script + '_Locale')
27
        self.lcl = gettext.translation(current_script, lang_folder, languages=[default_language])
28
29
    def build_file_list(self, local_logger, timmer, given_input_file):
30
        if re.search(r'(\*|\?)*', given_input_file):
31
            local_logger.debug(self.lcl.gettext('File matching pattern identified'))
32
            parent_directory = os.path.dirname(given_input_file)
33
            # loading from a specific folder all files matching a given pattern into a file list
34
            relevant_files_list = self.fn_build_relevant_file_list(local_logger, timmer,
35
                                                                   parent_directory,
36
                                                                   given_input_file)
37
        else:
38
            local_logger.debug(self.lcl.gettext('Specific file name provided'))
39
            relevant_files_list = [given_input_file]
40
        return relevant_files_list
41
42
    def fn_build_relevant_file_list(self, local_logger, timmer, in_folder, matching_pattern):
43
        timmer.start()
44
        local_logger.info(self.lcl.gettext('Listing all files within {in_folder} folder '
45
                                           + 'looking for {matching_pattern} as matching pattern') \
46
                          .replace('{in_folder}', in_folder) \
47
                          .replace('{matching_pattern}', matching_pattern))
48
        list_files = []
49
        file_counter = 0
50
        if os.path.isdir(in_folder):
51
            working_path = pathlib.Path(in_folder)
52
            for current_file in working_path.iterdir():
53
                if current_file.is_file() and current_file.match(matching_pattern):
54
                    list_files.append(file_counter)
55
                    list_files[file_counter] = str(current_file.absolute())
56
                    local_logger.info(self.lcl.gettext('{file_name} identified') \
57
                                      .replace('{file_name}', str(current_file.absolute())))
58
                    file_counter = file_counter + 1
59
        local_logger.info(self.lcl.ngettext( \
60
            '{files_counted} file from {in_folder} folder identified',
61
            '{files_counted} files from {in_folder} folder identified', file_counter) \
62
                          .replace('{files_counted}', str(file_counter)) \
63
                          .replace('{in_folder}', in_folder))
64
        timmer.stop()
65
        return list_files
66
67
    def fn_get_file_content(self, in_file_handler, in_file_type):
68
        if in_file_type == 'json':
69
            try:
70
                json_interpreted_details = json.load(in_file_handler)
71
                print(datetime.utcnow().strftime(self.timestamp_format) +
72
                      self.lcl.gettext('JSON structure interpreted'))
73
                return json_interpreted_details
74
            except Exception as e:
75
                print(datetime.utcnow().strftime(self.timestamp_format) +
76
                      self.lcl.gettext('Error encountered when trying to interpret JSON'))
77
                print(e)
78
        elif in_file_type == 'raw':
79
            raw_interpreted_file = in_file_handler.read()
80
            print(datetime.utcnow().strftime(self.timestamp_format) +
81
                  self.lcl.gettext('Entire file content read'))
82
            return raw_interpreted_file
83
        else:
84
            print(datetime.utcnow().strftime(self.timestamp_format) +
85
                  self.lcl.gettext('Unknown file type provided, '
86
                                   + 'expected either "json" or "raw" but got {in_file_type}') \
87
                  .replace('{in_file_type}', in_file_type))
88
89
    def fn_get_file_statistics(self, file_to_evaluate):
90
        try:
91
            file_handler = open(file=file_to_evaluate, mode='r', encoding='mbcs')
92
        except UnicodeDecodeError:
93
            file_handler = open(file=file_to_evaluate, mode='r', encoding='utf-8')
94
        file_content = file_handler.read().encode()
95
        file_handler.close()
96
        file_checksums = {
97
            'md5': hashlib.md5(file_content).hexdigest(),
98
            'sha1': hashlib.sha1(file_content).hexdigest(),
99
            'sha224': hashlib.sha224(file_content).hexdigest(),
100
            'sha256': hashlib.sha256(file_content).hexdigest(),
101
            'sha384': hashlib.sha384(file_content).hexdigest(),
102
            'sha512': hashlib.sha512(file_content).hexdigest(),
103
        }
104
        f_dts = {
105
            'created': datetime.fromtimestamp(os.path.getctime(file_to_evaluate)),
106
            'modified': datetime.fromtimestamp(os.path.getctime(file_to_evaluate)),
107
        }
108
        return {
109
            'date when created': datetime.strftime(f_dts['created'], self.timestamp_format),
110
            'date when last modified': datetime.strftime(f_dts['modified'], self.timestamp_format),
111
            'size [bytes]': os.path.getsize(file_to_evaluate),
112
            'MD5 Checksum': file_checksums['md5'],
113
            'SHA1 Checksum': file_checksums['sha1'],
114
            'SHA224 Checksum': file_checksums['sha224'],
115
            'SHA256 Checksum': file_checksums['sha256'],
116
            'SHA384 Checksum': file_checksums['sha384'],
117
            'SHA512 Checksum': file_checksums['sha512'],
118
        }
119
120
    def fn_move_files(self, local_logger, timmer, file_names, destination_folder):
121
        timmer.start()
122
        resulted_files = []
123
        for current_file in file_names:
124
            source_folder = os.path.dirname(current_file)
125
            new_file_name = current_file.replace(source_folder, destination_folder)
126
            if os.path.isfile(new_file_name):
127
                local_logger.warning(self.lcl.gettext('File {file_name} will be overwritten') \
128
                                     .replace('{file_name}', current_file))
129
                os.replace(current_file, new_file_name)
130
                local_logger.info(self.lcl.gettext( \
131
                    'File {file_name} has just been been overwritten as {new_file_name}') \
132
                                     .replace('{file_name}', current_file) \
133
                                     .replace('{new_file_name}', current_file))
134
            else:
135
                local_logger.info(self.lcl.gettext( \
136
                    'File {file_name} will be renamed as {new_file_name}') \
137
                                     .replace('{file_name}', current_file) \
138
                                     .replace('{new_file_name}', current_file))
139
                os.rename(current_file, new_file_name)
140
                local_logger.info(self.lcl.gettext( \
141
                    'File {file_name} has just been renamed as {new_file_name}') \
142
                                     .replace('{file_name}', current_file) \
143
                                     .replace('{new_file_name}', current_file))
144
            resulted_files.append(str(new_file_name))
145
        timmer.stop()
146
        return resulted_files
147
148
    def fn_open_file_and_get_content(self, input_file, content_type='json'):
149
        if os.path.isfile(input_file):
150
            with open(input_file, 'r') as file_handler:
151
                print(datetime.utcnow().strftime(self.timestamp_format) +
152
                      + self.lcl.gettext('File {file_name} has just been opened') \
153
                      .replace('{file_name}', input_file))
154
                return self.fn_get_file_content(file_handler, content_type)
155
        else:
156
            print(datetime.utcnow().strftime(self.timestamp_format) +
157
                  self.lcl.gettext('File {file_name} does not exist') \
158
                  .replace('{file_name}', input_file))
159
160
    def fn_store_file_statistics(self, local_logger, timmer, file_name, file_meaning):
161
        timmer.start()
162
        list_file_names = [file_name]
163
        if type(file_name) == list:
164
            list_file_names = file_name
165
        for current_file_name in list_file_names:
166
            file_statistics = str(self.fn_get_file_statistics(current_file_name)) \
167
                .replace('date when created', self.lcl.gettext('date when created')) \
168
                .replace('date when last modified', self.lcl.gettext('date when last modified')) \
169
                .replace('size [bytes]', self.lcl.gettext('size [bytes]')) \
170
                .replace('Checksum', self.lcl.gettext('Checksum'))
171
            local_logger.info(self.lcl.gettext( \
172
                'File "{file_name}" has the following characteristics: {file_statistics}') \
173
                              .replace('{file_meaning}', file_meaning) \
174
                              .replace('{file_name}', current_file_name) \
175
                              .replace('{file_statistics}', file_statistics))
176
        timmer.stop()
177