1
|
|
|
""" |
2
|
|
|
main class to support Extract script |
3
|
|
|
""" |
4
|
|
|
# useful methods to measure time performance by small pieces of code |
5
|
|
|
from codetiming import Timer |
6
|
|
|
# package to facilitate operating system operations |
7
|
|
|
import os |
8
|
|
|
# package to add support for multi-language (i18n) |
9
|
|
|
import gettext |
10
|
|
|
# package to facilitate working with directories and files |
11
|
|
|
from pathlib import Path |
12
|
|
|
# custom classes specific to this project |
13
|
|
|
from db_extractor.BasicNeeds import BasicNeeds |
14
|
|
|
from db_extractor.CommandLineArgumentsManagement import CommandLineArgumentsManagement |
15
|
|
|
from db_extractor.DataInputOutput import DataInputOutput |
16
|
|
|
from db_extractor.DataManipulator import DataManipulator |
17
|
|
|
from db_extractor.FileOperations import FileOperations |
18
|
|
|
from db_extractor.LoggingNeeds import LoggingNeeds |
19
|
|
|
from db_extractor.ParameterHandling import ParameterHandling |
20
|
|
|
from db_extractor.BasicNeedsForExtractor import BasicNeedsForExtractor |
21
|
|
|
from db_extractor.DatabaseTalker import DatabaseTalker |
22
|
|
|
|
23
|
|
|
|
24
|
|
|
class ExtractNeeds: |
25
|
|
|
class_bn = None |
26
|
|
|
class_bnfe = None |
27
|
|
|
class_clam = None |
28
|
|
|
class_dbt = None |
29
|
|
|
class_dio = None |
30
|
|
|
class_dm = None |
31
|
|
|
class_fo = None |
32
|
|
|
class_ln = None |
33
|
|
|
class_ph = None |
34
|
|
|
config = None |
35
|
|
|
file_extract_sequence = None |
36
|
|
|
locale = None |
37
|
|
|
parameters = None |
38
|
|
|
script = None |
39
|
|
|
source_systems = None |
40
|
|
|
timer = None |
41
|
|
|
user_credentials = None |
42
|
|
|
|
43
|
|
|
def __init__(self, destination_script, in_language='en_US'): |
44
|
|
|
self.script = destination_script |
45
|
|
|
file_parts = os.path.normpath(os.path.abspath(__file__)).replace('\\', os.path.altsep)\ |
46
|
|
|
.split(os.path.altsep) |
47
|
|
|
locale_domain = file_parts[(len(file_parts)-1)].replace('.py', '') |
48
|
|
|
locale_folder = os.path.normpath(os.path.join( |
49
|
|
|
os.path.join(os.path.altsep.join(file_parts[:-2]), 'project_locale'), locale_domain)) |
50
|
|
|
self.locale = gettext.translation(locale_domain, localedir=locale_folder, |
51
|
|
|
languages=[in_language], fallback=True) |
52
|
|
|
# instantiate Basic Needs class |
53
|
|
|
self.class_bn = BasicNeeds(in_language) |
54
|
|
|
# instantiate Extractor Specific Needs class |
55
|
|
|
self.class_bnfe = BasicNeedsForExtractor(in_language) |
56
|
|
|
# instantiate File Operations class |
57
|
|
|
self.class_fo = FileOperations(in_language) |
58
|
|
|
# instantiate File Operations class |
59
|
|
|
self.class_dbt = DatabaseTalker(in_language) |
60
|
|
|
# instantiate Data Manipulator class, useful to manipulate data frames |
61
|
|
|
self.class_dio = DataInputOutput(in_language) |
62
|
|
|
# instantiate Data Manipulator class, useful to manipulate data frames |
63
|
|
|
self.class_dm = DataManipulator(in_language) |
64
|
|
|
# instantiate Command Line Arguments class |
65
|
|
|
self.class_clam = CommandLineArgumentsManagement(in_language) |
66
|
|
|
# instantiate Logger class |
67
|
|
|
self.class_ln = LoggingNeeds() |
68
|
|
|
# instantiate Parameter Handling class |
69
|
|
|
self.class_ph = ParameterHandling(in_language) |
70
|
|
|
|
71
|
|
|
def build_dict_for_storage_file(self, crt_output): |
72
|
|
|
fn_dict = { |
73
|
|
|
'file list': crt_output['name'], |
74
|
|
|
'name': crt_output['name'], |
75
|
|
|
'format': crt_output['format'], |
76
|
|
|
} |
77
|
|
|
if 'compression' in crt_output: |
78
|
|
|
fn_dict['compression'] = crt_output['compression'] |
79
|
|
|
if 'field delimiter' in crt_output: |
80
|
|
|
fn_dict['field delimiter'] = crt_output['field delimiter'] |
81
|
|
|
return fn_dict |
82
|
|
|
|
83
|
|
|
def close_connection(self, local_logger): |
84
|
|
|
self.timer.start() |
85
|
|
|
local_logger.info(self.locale.gettext('Closing DB connection')) |
86
|
|
|
self.class_dbt.connection.close() |
87
|
|
|
local_logger.info(self.locale.gettext('Closing DB completed')) |
88
|
|
|
self.timer.stop() |
89
|
|
|
|
90
|
|
|
def close_cursor(self, local_logger, in_cursor): |
91
|
|
|
self.timer.start() |
92
|
|
|
local_logger.info(self.locale.gettext('Free DB result-set started')) |
93
|
|
|
in_cursor.close() |
94
|
|
|
local_logger.info(self.locale.gettext('Free DB result-set completed')) |
95
|
|
|
self.timer.stop() |
96
|
|
|
|
97
|
|
|
def evaluate_extraction_overwrite_condition(self, extraction_required, in_dict): |
98
|
|
|
if in_dict['session']['extract-behaviour'] == 'overwrite-if-output-file-exists' \ |
99
|
|
|
and 'extract-overwrite-condition' in in_dict['session'] \ |
100
|
|
|
and Path(in_dict['file']['name']).is_file(): |
101
|
|
|
fv = self.class_bnfe.fn_is_extraction_necessary_additional( |
102
|
|
|
self.class_ln.logger, self.class_ph, self.class_fo, in_dict) |
103
|
|
|
extraction_required = False |
104
|
|
|
new_verdict = self.locale.gettext('not required') |
105
|
|
|
if fv == self.class_fo.locale.gettext('older'): |
106
|
|
|
extraction_required = True |
107
|
|
|
new_verdict = self.locale.gettext('required') |
108
|
|
|
self.class_ln.logger.debug(self.locale.gettext( |
109
|
|
|
'Additional evaluation took place and new verdict is: {new_verdict}') |
110
|
|
|
.replace('{new_verdict}', new_verdict)) |
111
|
|
|
return extraction_required |
112
|
|
|
|
113
|
|
|
def evaluate_if_extraction_is_required(self, in_dict): |
114
|
|
|
extraction_required = False |
115
|
|
|
if type(in_dict['session']['output-file']) == dict: |
116
|
|
|
extraction_required = self.evaluate_if_extraction_is_required_for_single_file({ |
117
|
|
|
'session': in_dict['session'], |
118
|
|
|
'query': in_dict['query'], |
119
|
|
|
'sequence': in_dict['sequence'], |
120
|
|
|
'file': in_dict['session']['output-file'], |
121
|
|
|
}) |
122
|
|
|
elif type(in_dict['session']['output-file']) == list: |
123
|
|
|
extraction_required = self.evaluate_if_extraction_is_required_list(in_dict) |
124
|
|
|
return extraction_required |
125
|
|
|
|
126
|
|
|
def evaluate_if_extraction_is_required_list(self, in_dict): |
127
|
|
|
evaluated_extraction = {} |
128
|
|
|
for crt_file in in_dict['session']['output-file']: |
129
|
|
|
crt_eval = self.evaluate_if_extraction_is_required_for_single_file({ |
130
|
|
|
'session': in_dict['session'], |
131
|
|
|
'query': in_dict['query'], |
132
|
|
|
'sequence': in_dict['sequence'], |
133
|
|
|
'file': crt_file, |
134
|
|
|
}) |
135
|
|
|
evaluated_extraction.update({str(crt_file['name']): crt_eval}) |
136
|
|
|
extraction_required = self.class_bn.fn_evaluate_dict_values(evaluated_extraction) |
137
|
|
|
self.class_ln.logger.debug(evaluated_extraction) |
138
|
|
|
overall_verdict = self.locale.gettext('not required') |
139
|
|
|
if extraction_required: |
140
|
|
|
overall_verdict = self.locale.gettext('required') |
141
|
|
|
self.class_ln.logger.debug(self.locale.gettext( |
142
|
|
|
'Overall new verdict after considering multiple files is: {overall_verdict}') |
143
|
|
|
.replace('{overall_verdict}', overall_verdict)) |
144
|
|
|
return extraction_required |
145
|
|
|
|
146
|
|
|
def evaluate_if_extraction_is_required_for_single_file(self, in_dict): |
147
|
|
|
in_dict['file']['name'] = self.class_ph.eval_expression( |
148
|
|
|
self.class_ln.logger, in_dict['file']['name'], in_dict['session']['start-iso-weekday']) |
149
|
|
|
e_dict = { |
150
|
|
|
'extract-behaviour': in_dict['session']['extract-behaviour'], |
151
|
|
|
'output-csv-file': in_dict['file']['name'], |
152
|
|
|
} |
153
|
|
|
extraction_required = self.class_bnfe.fn_is_extraction_necessary( |
154
|
|
|
self.class_ln.logger, e_dict) |
155
|
|
|
extraction_required = self.evaluate_extraction_overwrite_condition( |
156
|
|
|
extraction_required, in_dict) |
157
|
|
|
return extraction_required |
158
|
|
|
|
159
|
|
|
def extract_query_to_result_set(self, local_logger, in_cursor, in_dictionary): |
160
|
|
|
this_session = in_dictionary['session'] |
161
|
|
|
this_query = in_dictionary['query'] |
162
|
|
|
# get query parameters into a tuple |
163
|
|
|
tuple_parameters = self.class_ph.handle_query_parameters( |
164
|
|
|
local_logger, this_session, this_session['start-iso-weekday']) |
165
|
|
|
# measure expected number of parameters |
166
|
|
|
expected_no_of_parameters = str(this_query).count('%s') |
167
|
|
|
# simulate final query to log (useful for debugging purposes) |
168
|
|
|
simulated_query = self.class_ph.simulate_final_query( |
169
|
|
|
local_logger, self.timer, this_query, expected_no_of_parameters, tuple_parameters) |
170
|
|
|
simulated_query_single_line = self.class_bn.fn_multi_line_string_to_single(simulated_query) |
171
|
|
|
local_logger.info(self.locale.gettext('Query with parameters interpreted is: %s') |
172
|
|
|
.replace('%s', simulated_query_single_line)) |
173
|
|
|
# actual execution of the query |
174
|
|
|
in_cursor = self.class_dbt.execute_query( |
175
|
|
|
local_logger, self.timer, in_cursor, this_query, |
176
|
|
|
expected_no_of_parameters, tuple_parameters) |
177
|
|
|
# bringing the information from server (data transfer) |
178
|
|
|
dict_to_return = { |
179
|
|
|
'rows_counted': 0 |
180
|
|
|
} |
181
|
|
|
if in_cursor is not None: |
182
|
|
|
dict_to_return = { |
183
|
|
|
'columns': self.class_dbt.get_column_names(local_logger, self.timer, in_cursor), |
184
|
|
|
'result_set': self.class_dbt.fetch_executed_query( |
185
|
|
|
local_logger, self.timer, in_cursor), |
186
|
|
|
'rows_counted': in_cursor.rowcount, |
187
|
|
|
} |
188
|
|
|
return dict_to_return |
189
|
|
|
|
190
|
|
|
def initiate_logger_and_timer(self): |
191
|
|
|
# initiate logger |
192
|
|
|
self.class_ln.initiate_logger(self.parameters.output_log_file, self.script) |
193
|
|
|
# initiate localization specific for this script |
194
|
|
|
# define global timer to use |
195
|
|
|
self.timer = Timer(self.script, |
196
|
|
|
text=self.locale.gettext('Time spent is {seconds}'), |
197
|
|
|
logger=self.class_ln.logger.debug) |
198
|
|
|
|
199
|
|
|
def load_configuration(self): |
200
|
|
|
# load application configuration (inputs are defined into a json file) |
201
|
|
|
ref_folder = os.path.dirname(__file__).replace('db_extractor', 'config') |
202
|
|
|
config_file = os.path.join(ref_folder, 'db-extractor.json').replace('\\', '/') |
203
|
|
|
self.config = self.class_fo.fn_open_file_and_get_content(config_file) |
204
|
|
|
# get command line parameter values |
205
|
|
|
self.parameters = self.class_clam.parse_arguments(self.config['input_options'][self.script]) |
206
|
|
|
# checking inputs, if anything is invalid an exit(1) will take place |
207
|
|
|
self.class_bn.fn_check_inputs(self.parameters) |
208
|
|
|
# checking inputs, if anything is invalid an exit(1) will take place |
209
|
|
|
self.class_bnfe.fn_check_inputs_specific(self.parameters) |
210
|
|
|
|
211
|
|
|
def load_extraction_sequence_and_dependencies(self): |
212
|
|
|
self.timer.start() |
213
|
|
|
self.file_extract_sequence = self.class_fo.fn_open_file_and_get_content( |
214
|
|
|
self.parameters.input_extracting_sequence_file, 'json') |
215
|
|
|
self.class_ln.logger.info(self.locale.gettext( |
216
|
|
|
'Configuration file name with extracting sequence(es) has been loaded')) |
217
|
|
|
self.timer.stop() |
218
|
|
|
# store file statistics |
219
|
|
|
self.class_fo.fn_store_file_statistics({ |
220
|
|
|
'checksum included': self.parameters.include_checksum_in_files_statistics, |
221
|
|
|
'file list': self.parameters.input_extracting_sequence_file, |
222
|
|
|
'file meaning': self.locale.gettext( |
223
|
|
|
'Configuration file name with extracting sequence(es)'), |
224
|
|
|
'logger': self.class_ln.logger, |
225
|
|
|
'timer': self.timer, |
226
|
|
|
}) |
227
|
|
|
# get the source system details from provided file |
228
|
|
|
self.timer.start() |
229
|
|
|
self.source_systems = self.class_fo.fn_open_file_and_get_content( |
230
|
|
|
self.parameters.input_source_system_file, 'json')['Systems'] |
231
|
|
|
self.class_ln.logger.info(self.locale.gettext('Source Systems file name has been loaded')) |
232
|
|
|
self.timer.stop() |
233
|
|
|
self.class_fo.fn_store_file_statistics({ |
234
|
|
|
'checksum included': self.parameters.include_checksum_in_files_statistics, |
235
|
|
|
'file list': self.parameters.input_source_system_file, |
236
|
|
|
'file meaning': self.locale.gettext('Source Systems file name'), |
237
|
|
|
'logger': self.class_ln.logger, |
238
|
|
|
'timer': self.timer, |
239
|
|
|
}) |
240
|
|
|
# get the source system details from provided file |
241
|
|
|
self.timer.start() |
242
|
|
|
self.user_credentials = self.class_fo.fn_open_file_and_get_content( |
243
|
|
|
self.parameters.input_credentials_file, 'json')['Credentials'] |
244
|
|
|
self.class_ln.logger.info(self.locale.gettext( |
245
|
|
|
'Configuration file name with credentials has been loaded')) |
246
|
|
|
self.timer.stop() |
247
|
|
|
self.class_fo.fn_store_file_statistics({ |
248
|
|
|
'checksum included': self.parameters.include_checksum_in_files_statistics, |
249
|
|
|
'file list': self.parameters.input_credentials_file, |
250
|
|
|
'file meaning': self.locale.gettext('Configuration file name with credentials'), |
251
|
|
|
'logger': self.class_ln.logger, |
252
|
|
|
'timer': self.timer, |
253
|
|
|
}) |
254
|
|
|
|
255
|
|
|
def load_query(self, crt_query): |
256
|
|
|
self.timer.start() |
257
|
|
|
query = self.class_fo.fn_open_file_and_get_content(crt_query['input-query-file'], 'raw') |
258
|
|
|
feedback = self.locale.gettext('Generic query is: %s') \ |
259
|
|
|
.replace('%s', self.class_bn.fn_multi_line_string_to_single(query)) |
260
|
|
|
self.class_ln.logger.info(feedback) |
261
|
|
|
self.timer.stop() |
262
|
|
|
return query |
263
|
|
|
|
264
|
|
|
@staticmethod |
265
|
|
|
def pack_three_levels(in_session, in_query, in_sequence): |
266
|
|
|
return { |
267
|
|
|
'session': in_session, |
268
|
|
|
'query': in_query, |
269
|
|
|
'sequence': in_sequence, |
270
|
|
|
} |
271
|
|
|
|
272
|
|
|
def result_set_to_disk_file(self, local_logger, stats, in_dict): |
273
|
|
|
result_df = self.class_dbt.result_set_to_data_frame( |
274
|
|
|
local_logger, self.timer, stats['columns'], stats['result_set']) |
275
|
|
|
if 'additional-columns' in in_dict['session']: |
276
|
|
|
if in_dict['session']['additional-columns'] == 'inherit-from-parent': |
277
|
|
|
in_dict['session']['additional-columns'] = in_dict['query']['additional-columns'] |
278
|
|
|
elif in_dict['session']['additional-columns'] == 'inherit-from-grand-parent': |
279
|
|
|
in_dict['session']['additional-columns'] = in_dict['sequence']['additional-columns'] |
280
|
|
|
result_df = self.class_dbt.append_additional_columns_to_df( |
281
|
|
|
local_logger, self.timer, result_df, in_dict['session']) |
282
|
|
|
self.store_result_set_to_disk(self.class_ln.logger, result_df, in_dict['session']) |
283
|
|
|
|
284
|
|
|
@staticmethod |
285
|
|
|
def set_default_parameter_rules(in_dict): |
286
|
|
|
# assumption is for either DICT or LIST values are numeric |
287
|
|
|
# in case text is given different rules have to be specified |
288
|
|
|
dictionary_to_return = { |
289
|
|
|
"dict-values-glue": ", ", |
290
|
|
|
"dict-values-prefix": "IN (", |
291
|
|
|
"dict-values-suffix": ")", |
292
|
|
|
"list-values-glue": ", ", |
293
|
|
|
"list-values-prefix": "", |
294
|
|
|
"list-values-suffix": "" |
295
|
|
|
} |
296
|
|
|
if 'parameters-handling-rules' in in_dict['session']: |
297
|
|
|
dictionary_to_return = in_dict['session']['parameters-handling-rules'] |
298
|
|
|
if dictionary_to_return == 'inherit-from-parent': |
299
|
|
|
dictionary_to_return = in_dict['query']['parameters-handling-rules'] |
300
|
|
|
elif dictionary_to_return == 'inherit-from-grand-parent': |
301
|
|
|
dictionary_to_return = in_dict['sequence']['parameters-handling-rules'] |
302
|
|
|
return dictionary_to_return |
303
|
|
|
|
304
|
|
|
def store_result_set_to_disk(self, local_logger, in_data_frame, crt_session): |
305
|
|
|
output_file_setting_type = type(crt_session['output-file']) |
306
|
|
|
if output_file_setting_type in (dict, list): |
307
|
|
|
output_list = crt_session['output-file'] |
308
|
|
|
if output_file_setting_type == dict: |
309
|
|
|
output_list = [crt_session['output-file']] |
310
|
|
|
for crt_output in output_list: |
311
|
|
|
fn_dict = self.build_dict_for_storage_file(crt_output) |
312
|
|
|
self.class_dio.fn_store_data_frame_to_file( |
313
|
|
|
local_logger, self.timer, in_data_frame, fn_dict) |
314
|
|
|
self.class_fo.fn_store_file_statistics({ |
315
|
|
|
'checksum included': self.parameters.include_checksum_in_files_statistics, |
316
|
|
|
'file list': crt_output['name'], |
317
|
|
|
'file meaning': self.locale.gettext('Output file name'), |
318
|
|
|
'logger': self.class_ln.logger, |
319
|
|
|
'timer': self.timer, |
320
|
|
|
}) |
321
|
|
|
|