Completed
Push — master ( 55df2f...e17347 )
by Daniel
16s queued 12s
created

ExtractNeeds.close_connection()   A

Complexity

Conditions 1

Size

Total Lines 6
Code Lines 6

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 6
nop 2
dl 0
loc 6
rs 10
c 0
b 0
f 0
1
"""
2
main class to support Extract script
3
"""
4
# useful methods to measure time performance by small pieces of code
5
from codetiming import Timer
6
# package to facilitate operating system operations
7
import os
8
# package to add support for multi-language (i18n)
9
import gettext
10
# package to facilitate working with directories and files
11
from pathlib import Path
12
# custom classes specific to this project
13
from db_extractor.BasicNeeds import BasicNeeds
14
from db_extractor.CommandLineArgumentsManagement import CommandLineArgumentsManagement
15
from db_extractor.DataInputOutput import DataInputOutput
16
from db_extractor.DataManipulator import DataManipulator
17
from db_extractor.FileOperations import FileOperations
18
from db_extractor.LoggingNeeds import LoggingNeeds
19
from db_extractor.ParameterHandling import ParameterHandling
20
from db_extractor.BasicNeedsForExtractor import BasicNeedsForExtractor
21
from db_extractor.DatabaseTalker import DatabaseTalker
22
23
24
class ExtractNeeds:
25
    class_bn = None
26
    class_bnfe = None
27
    class_clam = None
28
    class_dbt = None
29
    class_dio = None
30
    class_dm = None
31
    class_fo = None
32
    class_ln = None
33
    class_ph = None
34
    config = None
35
    file_extract_sequence = None
36
    locale = None
37
    parameters = None
38
    script = None
39
    source_systems = None
40
    timer = None
41
    user_credentials = None
42
43
    def __init__(self, destination_script, in_language='en_US'):
44
        self.script = destination_script
45
        current_file_basename = os.path.basename(__file__).replace('.py', '')
46
        lang_folder = os.path.join(os.path.dirname(__file__), current_file_basename + '_Locale')
47
        self.locale = gettext.translation(
48
            current_file_basename, lang_folder, languages=[in_language])
49
        # instantiate Basic Needs class
50
        self.class_bn = BasicNeeds(in_language)
51
        # instantiate Extractor Specific Needs class
52
        self.class_bnfe = BasicNeedsForExtractor(in_language)
53
        # instantiate File Operations class
54
        self.class_fo = FileOperations(in_language)
55
        # instantiate File Operations class
56
        self.class_dbt = DatabaseTalker(in_language)
57
        # instantiate Data Manipulator class, useful to manipulate data frames
58
        self.class_dio = DataInputOutput(in_language)
59
        # instantiate Data Manipulator class, useful to manipulate data frames
60
        self.class_dm = DataManipulator(in_language)
61
        # instantiate Command Line Arguments class
62
        self.class_clam = CommandLineArgumentsManagement(in_language)
63
        # instantiate Logger class
64
        self.class_ln = LoggingNeeds()
65
        # instantiate Parameter Handling class
66
        self.class_ph = ParameterHandling(in_language)
67
68
    def close_connection(self, local_logger):
69
        self.timer.start()
70
        local_logger.info(self.locale.gettext('Closing DB connection'))
71
        self.class_dbt.connection.close()
72
        local_logger.info(self.locale.gettext('Closing DB completed'))
73
        self.timer.stop()
74
75
    def close_cursor(self, local_logger, in_cursor):
76
        self.timer.start()
77
        local_logger.info(self.locale.gettext('Free DB result-set started'))
78
        in_cursor.close()
79
        local_logger.info(self.locale.gettext('Free DB result-set completed'))
80
        self.timer.stop()
81
82
    def evaluate_extraction_overwrite_condition(self, extraction_required, in_dict):
83
        if in_dict['session']['extract-behaviour'] == 'overwrite-if-output-file-exists' \
84
                and 'extract-overwrite-condition' in in_dict['session'] \
85
                and Path(in_dict['file']['name']).is_file():
86
            fv = self.class_bnfe.fn_is_extraction_necessary_additional(
87
                self.class_ln.logger, self.class_ph, self.class_fo, in_dict)
88
            extraction_required = False
89
            new_verdict = self.locale.gettext('not required')
90
            if fv == self.class_fo.locale.gettext('older'):
91
                extraction_required = True
92
                new_verdict = self.locale.gettext('required')
93
            self.class_ln.logger.debug(self.locale.gettext(
94
                'Additional evaluation took place and new verdict is: {new_verdict}')
95
                                       .replace('{new_verdict}', new_verdict))
96
        return extraction_required
97
98
    def evaluate_if_extraction_is_required(self, in_dict):
99
        extraction_required = False
100
        if type(in_dict['session']['output-file']) == dict:
101
            extraction_required = self.evaluate_if_extraction_is_required_for_single_file({
102
                'session': in_dict['session'],
103
                'query': in_dict['query'],
104
                'sequence': in_dict['sequence'],
105
                'file': in_dict['session']['output-file'],
106
            })
107
        elif type(in_dict['session']['output-file']) == list:
108
            extraction_required = self.evaluate_if_extraction_is_required_list(in_dict)
109
        return extraction_required
110
111
    def evaluate_if_extraction_is_required_list(self, in_dict):
112
        evaluated_extraction = {}
113
        for crt_file in in_dict['session']['output-file']:
114
            crt_eval = self.evaluate_if_extraction_is_required_for_single_file({
115
                'session': in_dict['session'],
116
                'query': in_dict['query'],
117
                'sequence': in_dict['sequence'],
118
                'file': crt_file,
119
            })
120
            evaluated_extraction.update({str(crt_file['name']): crt_eval})
121
        extraction_required = self.class_bn.fn_evaluate_dict_values(evaluated_extraction)
122
        self.class_ln.logger.debug(evaluated_extraction)
123
        overall_verdict = self.locale.gettext('not required')
124
        if extraction_required:
125
            overall_verdict = self.locale.gettext('required')
126
        self.class_ln.logger.debug(self.locale.gettext(
127
            'Overall new verdict after considering multiple files is: {overall_verdict}')
128
                                   .replace('{overall_verdict}', overall_verdict))
129
        return extraction_required
130
131
    def evaluate_if_extraction_is_required_for_single_file(self, in_dict):
132
        in_dict['file']['name'] = self.class_ph.eval_expression(
133
            self.class_ln.logger, in_dict['file']['name'], in_dict['session']['start-iso-weekday'])
134
        e_dict = {
135
            'extract-behaviour': in_dict['session']['extract-behaviour'],
136
            'output-csv-file': in_dict['file']['name'],
137
        }
138
        extraction_required = self.class_bnfe.fn_is_extraction_necessary(
139
            self.class_ln.logger, e_dict)
140
        extraction_required = self.evaluate_extraction_overwrite_condition(
141
            extraction_required, in_dict)
142
        return extraction_required
143
144
    def extract_query_to_result_set(self, local_logger, in_cursor, in_dictionary):
145
        this_session = in_dictionary['session']
146
        this_query = in_dictionary['query']
147
        # get query parameters into a tuple
148
        tuple_parameters = self.class_ph.handle_query_parameters(
149
            local_logger, this_session, this_session['start-iso-weekday'])
150
        # measure expected number of parameters
151
        expected_no_of_parameters = str(this_query).count('%s')
152
        # simulate final query to log (useful for debugging purposes)
153
        simulated_query = self.class_ph.simulate_final_query(
154
            local_logger, self.timer, this_query, expected_no_of_parameters, tuple_parameters)
155
        simulated_query_single_line = self.class_bn.fn_multi_line_string_to_single(simulated_query)
156
        local_logger.info(self.locale.gettext('Query with parameters interpreted is: %s')
157
                          .replace('%s', simulated_query_single_line))
158
        # actual execution of the query
159
        in_cursor = self.class_dbt.execute_query(
160
            local_logger, self.timer, in_cursor, this_query,
161
            expected_no_of_parameters, tuple_parameters)
162
        # bringing the information from server (data transfer)
163
        dict_to_return = {
164
            'rows_counted': 0
165
        }
166
        if in_cursor is not None:
167
            dict_to_return = {
168
                'columns': self.class_dbt.get_column_names(local_logger, self.timer, in_cursor),
169
                'result_set': self.class_dbt.fetch_executed_query(
170
                        local_logger, self.timer, in_cursor),
171
                'rows_counted': in_cursor.rowcount,
172
            }
173
        return dict_to_return
174
175
    def initiate_logger_and_timer(self):
176
        # initiate logger
177
        self.class_ln.initiate_logger(self.parameters.output_log_file, self.script)
178
        # initiate localization specific for this script
179
        # define global timer to use
180
        self.timer = Timer(self.script,
181
                           text=self.locale.gettext('Time spent is {seconds}'),
182
                           logger=self.class_ln.logger.debug)
183
184
    def load_configuration(self):
185
        # load application configuration (inputs are defined into a json file)
186
        ref_folder = os.path.dirname(__file__).replace('db_extractor', 'config')
187
        config_file = os.path.join(ref_folder, 'db-extractor.json').replace('\\', '/')
188
        self.config = self.class_fo.fn_open_file_and_get_content(config_file)
189
        # get command line parameter values
190
        self.parameters = self.class_clam.parse_arguments(self.config['input_options'][self.script])
191
        # checking inputs, if anything is invalid an exit(1) will take place
192
        self.class_bn.fn_check_inputs(self.parameters)
193
        # checking inputs, if anything is invalid an exit(1) will take place
194
        self.class_bnfe.fn_check_inputs_specific(self.parameters)
195
196
    def load_extraction_sequence_and_dependencies(self):
197
        self.timer.start()
198
        self.file_extract_sequence = self.class_fo.fn_open_file_and_get_content(
199
            self.parameters.input_extracting_sequence_file, 'json')
200
        self.class_ln.logger.info(self.locale.gettext(
201
            'Configuration file name with extracting sequence(es) has been loaded'))
202
        self.timer.stop()
203
        # store file statistics
204
        self.class_fo.fn_store_file_statistics(
205
            self.class_ln.logger, self.timer, self.parameters.input_extracting_sequence_file,
206
            self.locale.gettext('Configuration file name with extracting sequence(es)'))
207
        # get the source system details from provided file
208
        self.timer.start()
209
        self.source_systems = self.class_fo.fn_open_file_and_get_content(
210
            self.parameters.input_source_system_file, 'json')['Systems']
211
        self.class_ln.logger.info(self.locale.gettext('Source Systems file name has been loaded'))
212
        self.timer.stop()
213
        self.class_fo.fn_store_file_statistics(
214
            self.class_ln.logger, self.timer, self.parameters.input_source_system_file,
215
            self.locale.gettext('Source Systems file name'))
216
        # get the source system details from provided file
217
        self.timer.start()
218
        self.user_credentials = self.class_fo.fn_open_file_and_get_content(
219
            self.parameters.input_credentials_file, 'json')['Credentials']
220
        self.class_ln.logger.info(self.locale.gettext(
221
            'Configuration file name with credentials has been loaded'))
222
        self.timer.stop()
223
        self.class_fo.fn_store_file_statistics(
224
            self.class_ln.logger, self.timer, self.parameters.input_credentials_file,
225
            self.locale.gettext('Configuration file name with credentials'))
226
227
    def load_query(self, crt_query):
228
        self.timer.start()
229
        query = self.class_fo.fn_open_file_and_get_content(crt_query['input-query-file'], 'raw')
230
        feedback = self.locale.gettext('Generic query is: %s') \
231
            .replace('%s', self.class_bn.fn_multi_line_string_to_single(query))
232
        self.class_ln.logger.info(feedback)
233
        self.timer.stop()
234
        return query
235
236
    @staticmethod
237
    def pack_three_levels(in_session, in_query, in_sequence):
238
        return {
239
            'session': in_session,
240
            'query': in_query,
241
            'sequence': in_sequence,
242
        }
243
244
    def result_set_to_disk_file(self, local_logger, stats, in_dict):
245
        result_df = self.class_dbt.result_set_to_data_frame(
246
            local_logger, self.timer, stats['columns'], stats['result_set'])
247
        if 'additional-columns' in in_dict['session']:
248
            if in_dict['session']['additional-columns'] == 'inherit-from-parent':
249
                in_dict['session']['additional-columns'] = in_dict['query']['additional-columns']
250
            elif in_dict['session']['additional-columns'] == 'inherit-from-grand-parent':
251
                in_dict['session']['additional-columns'] = in_dict['sequence']['additional-columns']
252
            result_df = self.class_dbt.append_additional_columns_to_df(
253
                local_logger, self.timer, result_df, in_dict['session'])
254
        self.store_result_set_to_disk(self.class_ln.logger, result_df, in_dict['session'])
255
256
    @staticmethod
257
    def set_default_starting_weekday(in_dict):
258
        week_starts_with_iso_weekday = 1
259
        if 'start-iso-weekday' in in_dict['session']:
260
            if in_dict['session']['start-iso-weekday'] == 'inherit-from-parent':
261
                in_dict['session']['start-iso-weekday'] = in_dict['query']['start-iso-weekday']
262
            elif in_dict['session']['start-iso-weekday'] == 'inherit-from-grand-parent':
263
                in_dict['session']['start-iso-weekday'] = in_dict['sequence']['start-iso-weekday']
264
            week_starts_with_iso_weekday = in_dict['session']['start-iso-weekday']
265
        return week_starts_with_iso_weekday
266
267
    @staticmethod
268
    def set_default_parameter_rules(in_dict):
269
        # assumption is for either DICT or LIST values are numeric
270
        # in case text is given different rules have to be specified
271
        dictionary_to_return = {
272
            "dict-values-glue": ", ",
273
            "dict-values-prefix": "IN (",
274
            "dict-values-suffix": ")",
275
            "list-values-glue": ", ",
276
            "list-values-prefix": "",
277
            "list-values-suffix": ""
278
        }
279
        if 'parameters-handling-rules' in in_dict['session']:
280
            dictionary_to_return = in_dict['session']['parameters-handling-rules']
281
            if dictionary_to_return == 'inherit-from-parent':
282
                dictionary_to_return = in_dict['query']['parameters-handling-rules']
283
            elif dictionary_to_return == 'inherit-from-grand-parent':
284
                dictionary_to_return = in_dict['sequence']['parameters-handling-rules']
285
        return dictionary_to_return
286
287
    def store_result_set_to_disk(self, local_logger, in_data_frame, crt_session):
288
        output_file_setting_type = type(crt_session['output-file'])
289
        if output_file_setting_type == dict:
290
            self.class_dio.fn_store_data_frame_to_file(
291
                local_logger, self.timer, in_data_frame, crt_session['output-file'])
292
            self.class_fo.fn_store_file_statistics(
293
                local_logger, self.timer, crt_session['output-file']['name'],
294
                self.locale.gettext('Output file name'))
295
        elif output_file_setting_type == list:
296
            for crt_output in crt_session['output-file']:
297
                self.class_dio.fn_store_data_frame_to_file(
298
                    local_logger, self.timer, in_data_frame, crt_output)
299
                self.class_fo.fn_store_file_statistics(
300
                    local_logger, self.timer, crt_output['name'],
301
                    self.locale.gettext('Output file name'))
302