ExtractNeeds.close_cursor()   A
last analyzed

Complexity

Conditions 1

Size

Total Lines 6
Code Lines 6

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 6
nop 3
dl 0
loc 6
rs 10
c 0
b 0
f 0
1
"""
2
main class to support Extract script
3
"""
4
# useful methods to measure time performance by small pieces of code
5
from codetiming import Timer
6
# package to facilitate operating system operations
7
import os
8
# package to add support for multi-language (i18n)
9
import gettext
10
# package to facilitate working with directories and files
11
from pathlib import Path
12
# custom classes specific to this project
13
from db_extractor.BasicNeeds import BasicNeeds
14
from db_extractor.CommandLineArgumentsManagement import CommandLineArgumentsManagement
15
from db_extractor.DataInputOutput import DataInputOutput
16
from db_extractor.DataManipulator import DataManipulator
17
from db_extractor.FileOperations import FileOperations
18
from db_extractor.LoggingNeeds import LoggingNeeds
19
from db_extractor.ParameterHandling import ParameterHandling
20
from db_extractor.BasicNeedsForExtractor import BasicNeedsForExtractor
21
from db_extractor.DatabaseTalker import DatabaseTalker
22
23
24
class ExtractNeeds:
25
    class_bn = None
26
    class_bnfe = None
27
    class_clam = None
28
    class_dbt = None
29
    class_dio = None
30
    class_dm = None
31
    class_fo = None
32
    class_ln = None
33
    class_ph = None
34
    config = None
35
    file_extract_sequence = None
36
    locale = None
37
    parameters = None
38
    script = None
39
    source_systems = None
40
    timer = None
41
    user_credentials = None
42
43
    def __init__(self, destination_script, in_language='en_US'):
44
        self.script = destination_script
45
        file_parts = os.path.normpath(os.path.abspath(__file__)).replace('\\', os.path.altsep)\
46
            .split(os.path.altsep)
47
        locale_domain = file_parts[(len(file_parts)-1)].replace('.py', '')
48
        locale_folder = os.path.normpath(os.path.join(
49
            os.path.join(os.path.altsep.join(file_parts[:-2]), 'project_locale'), locale_domain))
50
        self.locale = gettext.translation(locale_domain, localedir=locale_folder,
51
                                          languages=[in_language], fallback=True)
52
        # instantiate Basic Needs class
53
        self.class_bn = BasicNeeds(in_language)
54
        # instantiate Extractor Specific Needs class
55
        self.class_bnfe = BasicNeedsForExtractor(in_language)
56
        # instantiate File Operations class
57
        self.class_fo = FileOperations(in_language)
58
        # instantiate File Operations class
59
        self.class_dbt = DatabaseTalker(in_language)
60
        # instantiate Data Manipulator class, useful to manipulate data frames
61
        self.class_dio = DataInputOutput(in_language)
62
        # instantiate Data Manipulator class, useful to manipulate data frames
63
        self.class_dm = DataManipulator(in_language)
64
        # instantiate Command Line Arguments class
65
        self.class_clam = CommandLineArgumentsManagement(in_language)
66
        # instantiate Logger class
67
        self.class_ln = LoggingNeeds()
68
        # instantiate Parameter Handling class
69
        self.class_ph = ParameterHandling(in_language)
70
71
    def build_dict_for_storage_file(self, crt_output):
72
        fn_dict = {
73
            'file list': crt_output['name'],
74
            'name': crt_output['name'],
75
            'format': crt_output['format'],
76
        }
77
        if 'compression' in crt_output:
78
            fn_dict['compression'] = crt_output['compression']
79
        if 'field delimiter' in crt_output:
80
            fn_dict['field delimiter'] = crt_output['field delimiter']
81
        return fn_dict
82
83
    def close_connection(self, local_logger):
84
        self.timer.start()
85
        local_logger.info(self.locale.gettext('Closing DB connection'))
86
        self.class_dbt.connection.close()
87
        local_logger.info(self.locale.gettext('Closing DB completed'))
88
        self.timer.stop()
89
90
    def close_cursor(self, local_logger, in_cursor):
91
        self.timer.start()
92
        local_logger.info(self.locale.gettext('Free DB result-set started'))
93
        in_cursor.close()
94
        local_logger.info(self.locale.gettext('Free DB result-set completed'))
95
        self.timer.stop()
96
97
    def evaluate_extraction_overwrite_condition(self, extraction_required, in_dict):
98
        if in_dict['session']['extract-behaviour'] == 'overwrite-if-output-file-exists' \
99
                and 'extract-overwrite-condition' in in_dict['session'] \
100
                and Path(in_dict['file']['name']).is_file():
101
            fv = self.class_bnfe.fn_is_extraction_necessary_additional(
102
                self.class_ln.logger, self.class_ph, self.class_fo, in_dict)
103
            extraction_required = False
104
            new_verdict = self.locale.gettext('not required')
105
            if fv == self.class_fo.locale.gettext('older'):
106
                extraction_required = True
107
                new_verdict = self.locale.gettext('required')
108
            self.class_ln.logger.debug(self.locale.gettext(
109
                'Additional evaluation took place and new verdict is: {new_verdict}')
110
                                       .replace('{new_verdict}', new_verdict))
111
        return extraction_required
112
113
    def evaluate_if_extraction_is_required(self, in_dict):
114
        extraction_required = False
115
        if type(in_dict['session']['output-file']) == dict:
116
            extraction_required = self.evaluate_if_extraction_is_required_for_single_file({
117
                'session': in_dict['session'],
118
                'query': in_dict['query'],
119
                'sequence': in_dict['sequence'],
120
                'file': in_dict['session']['output-file'],
121
            })
122
        elif type(in_dict['session']['output-file']) == list:
123
            extraction_required = self.evaluate_if_extraction_is_required_list(in_dict)
124
        return extraction_required
125
126
    def evaluate_if_extraction_is_required_list(self, in_dict):
127
        evaluated_extraction = {}
128
        for crt_file in in_dict['session']['output-file']:
129
            crt_eval = self.evaluate_if_extraction_is_required_for_single_file({
130
                'session': in_dict['session'],
131
                'query': in_dict['query'],
132
                'sequence': in_dict['sequence'],
133
                'file': crt_file,
134
            })
135
            evaluated_extraction.update({str(crt_file['name']): crt_eval})
136
        extraction_required = self.class_bn.fn_evaluate_dict_values(evaluated_extraction)
137
        self.class_ln.logger.debug(evaluated_extraction)
138
        overall_verdict = self.locale.gettext('not required')
139
        if extraction_required:
140
            overall_verdict = self.locale.gettext('required')
141
        self.class_ln.logger.debug(self.locale.gettext(
142
            'Overall new verdict after considering multiple files is: {overall_verdict}')
143
                                   .replace('{overall_verdict}', overall_verdict))
144
        return extraction_required
145
146
    def evaluate_if_extraction_is_required_for_single_file(self, in_dict):
147
        in_dict['file']['name'] = self.class_ph.eval_expression(
148
            self.class_ln.logger, in_dict['file']['name'], in_dict['session']['start-iso-weekday'])
149
        e_dict = {
150
            'extract-behaviour': in_dict['session']['extract-behaviour'],
151
            'output-csv-file': in_dict['file']['name'],
152
        }
153
        extraction_required = self.class_bnfe.fn_is_extraction_necessary(
154
            self.class_ln.logger, e_dict)
155
        extraction_required = self.evaluate_extraction_overwrite_condition(
156
            extraction_required, in_dict)
157
        return extraction_required
158
159
    def extract_query_to_result_set(self, local_logger, in_cursor, in_dictionary):
160
        this_session = in_dictionary['session']
161
        this_query = in_dictionary['query']
162
        # get query parameters into a tuple
163
        tuple_parameters = self.class_ph.handle_query_parameters(
164
            local_logger, this_session, this_session['start-iso-weekday'])
165
        # measure expected number of parameters
166
        expected_no_of_parameters = str(this_query).count('%s')
167
        # simulate final query to log (useful for debugging purposes)
168
        simulated_query = self.class_ph.simulate_final_query(
169
            local_logger, self.timer, this_query, expected_no_of_parameters, tuple_parameters)
170
        simulated_query_single_line = self.class_bn.fn_multi_line_string_to_single(simulated_query)
171
        local_logger.info(self.locale.gettext('Query with parameters interpreted is: %s')
172
                          .replace('%s', simulated_query_single_line))
173
        # actual execution of the query
174
        in_cursor = self.class_dbt.execute_query(
175
            local_logger, self.timer, in_cursor, this_query,
176
            expected_no_of_parameters, tuple_parameters)
177
        # bringing the information from server (data transfer)
178
        dict_to_return = {
179
            'rows_counted': 0
180
        }
181
        if in_cursor is not None:
182
            dict_to_return = {
183
                'columns': self.class_dbt.get_column_names(local_logger, self.timer, in_cursor),
184
                'result_set': self.class_dbt.fetch_executed_query(
185
                        local_logger, self.timer, in_cursor),
186
                'rows_counted': in_cursor.rowcount,
187
            }
188
        return dict_to_return
189
190
    def initiate_logger_and_timer(self):
191
        # initiate logger
192
        self.class_ln.initiate_logger(self.parameters.output_log_file, self.script)
193
        # initiate localization specific for this script
194
        # define global timer to use
195
        self.timer = Timer(self.script,
196
                           text=self.locale.gettext('Time spent is {seconds}'),
197
                           logger=self.class_ln.logger.debug)
198
199
    def load_configuration(self):
200
        # load application configuration (inputs are defined into a json file)
201
        ref_folder = os.path.dirname(__file__).replace('db_extractor', 'config')
202
        config_file = os.path.join(ref_folder, 'db-extractor.json').replace('\\', '/')
203
        self.config = self.class_fo.fn_open_file_and_get_content(config_file)
204
        # get command line parameter values
205
        self.parameters = self.class_clam.parse_arguments(self.config['input_options'][self.script])
206
        # checking inputs, if anything is invalid an exit(1) will take place
207
        self.class_bn.fn_check_inputs(self.parameters)
208
        # checking inputs, if anything is invalid an exit(1) will take place
209
        self.class_bnfe.fn_check_inputs_specific(self.parameters)
210
211
    def load_extraction_sequence_and_dependencies(self):
212
        self.timer.start()
213
        self.file_extract_sequence = self.class_fo.fn_open_file_and_get_content(
214
            self.parameters.input_extracting_sequence_file, 'json')
215
        self.class_ln.logger.info(self.locale.gettext(
216
            'Configuration file name with extracting sequence(es) has been loaded'))
217
        self.timer.stop()
218
        # store file statistics
219
        self.class_fo.fn_store_file_statistics({
220
            'checksum included': self.parameters.include_checksum_in_files_statistics,
221
            'file list': self.parameters.input_extracting_sequence_file,
222
            'file meaning': self.locale.gettext(
223
                'Configuration file name with extracting sequence(es)'),
224
            'logger': self.class_ln.logger,
225
            'timer': self.timer,
226
        })
227
        # get the source system details from provided file
228
        self.timer.start()
229
        self.source_systems = self.class_fo.fn_open_file_and_get_content(
230
            self.parameters.input_source_system_file, 'json')['Systems']
231
        self.class_ln.logger.info(self.locale.gettext('Source Systems file name has been loaded'))
232
        self.timer.stop()
233
        self.class_fo.fn_store_file_statistics({
234
            'checksum included': self.parameters.include_checksum_in_files_statistics,
235
            'file list': self.parameters.input_source_system_file,
236
            'file meaning': self.locale.gettext('Source Systems file name'),
237
            'logger': self.class_ln.logger,
238
            'timer': self.timer,
239
        })
240
        # get the source system details from provided file
241
        self.timer.start()
242
        self.user_credentials = self.class_fo.fn_open_file_and_get_content(
243
            self.parameters.input_credentials_file, 'json')['Credentials']
244
        self.class_ln.logger.info(self.locale.gettext(
245
            'Configuration file name with credentials has been loaded'))
246
        self.timer.stop()
247
        self.class_fo.fn_store_file_statistics({
248
            'checksum included': self.parameters.include_checksum_in_files_statistics,
249
            'file list': self.parameters.input_credentials_file,
250
            'file meaning': self.locale.gettext('Configuration file name with credentials'),
251
            'logger': self.class_ln.logger,
252
            'timer': self.timer,
253
        })
254
255
    def load_query(self, crt_query):
256
        self.timer.start()
257
        query = self.class_fo.fn_open_file_and_get_content(crt_query['input-query-file'], 'raw')
258
        feedback = self.locale.gettext('Generic query is: %s') \
259
            .replace('%s', self.class_bn.fn_multi_line_string_to_single(query))
260
        self.class_ln.logger.info(feedback)
261
        self.timer.stop()
262
        return query
263
264
    @staticmethod
265
    def pack_three_levels(in_session, in_query, in_sequence):
266
        return {
267
            'session': in_session,
268
            'query': in_query,
269
            'sequence': in_sequence,
270
        }
271
272
    def result_set_to_disk_file(self, local_logger, stats, in_dict):
273
        result_df = self.class_dbt.result_set_to_data_frame(
274
            local_logger, self.timer, stats['columns'], stats['result_set'])
275
        if 'additional-columns' in in_dict['session']:
276
            if in_dict['session']['additional-columns'] == 'inherit-from-parent':
277
                in_dict['session']['additional-columns'] = in_dict['query']['additional-columns']
278
            elif in_dict['session']['additional-columns'] == 'inherit-from-grand-parent':
279
                in_dict['session']['additional-columns'] = in_dict['sequence']['additional-columns']
280
            result_df = self.class_dbt.append_additional_columns_to_df(
281
                local_logger, self.timer, result_df, in_dict['session'])
282
        self.store_result_set_to_disk(self.class_ln.logger, result_df, in_dict['session'])
283
284
    @staticmethod
285
    def set_default_parameter_rules(in_dict):
286
        # assumption is for either DICT or LIST values are numeric
287
        # in case text is given different rules have to be specified
288
        dictionary_to_return = {
289
            "dict-values-glue": ", ",
290
            "dict-values-prefix": "IN (",
291
            "dict-values-suffix": ")",
292
            "list-values-glue": ", ",
293
            "list-values-prefix": "",
294
            "list-values-suffix": ""
295
        }
296
        if 'parameters-handling-rules' in in_dict['session']:
297
            dictionary_to_return = in_dict['session']['parameters-handling-rules']
298
            if dictionary_to_return == 'inherit-from-parent':
299
                dictionary_to_return = in_dict['query']['parameters-handling-rules']
300
            elif dictionary_to_return == 'inherit-from-grand-parent':
301
                dictionary_to_return = in_dict['sequence']['parameters-handling-rules']
302
        return dictionary_to_return
303
304
    def store_result_set_to_disk(self, local_logger, in_data_frame, crt_session):
305
        output_file_setting_type = type(crt_session['output-file'])
306
        if output_file_setting_type in (dict, list):
307
            output_list = crt_session['output-file']
308
            if output_file_setting_type == dict:
309
                output_list = [crt_session['output-file']]
310
            for crt_output in output_list:
311
                fn_dict = self.build_dict_for_storage_file(crt_output)
312
                self.class_dio.fn_store_data_frame_to_file(
313
                    local_logger, self.timer, in_data_frame, fn_dict)
314
                self.class_fo.fn_store_file_statistics({
315
                    'checksum included': self.parameters.include_checksum_in_files_statistics,
316
                    'file list': crt_output['name'],
317
                    'file meaning': self.locale.gettext('Output file name'),
318
                    'logger': self.class_ln.logger,
319
                    'timer': self.timer,
320
                })
321