Completed
Push — master ( 545613...55df2f )
by Daniel
14s queued 11s
created

ExtractNeeds.load_configuration()   A

Complexity

Conditions 1

Size

Total Lines 11
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 7
dl 0
loc 11
rs 10
c 0
b 0
f 0
cc 1
nop 1
1
"""
2
main class to support Extract script
3
"""
4
# useful methods to measure time performance by small pieces of code
5
from codetiming import Timer
6
# package to facilitate operating system operations
7
import os
8
# package to add support for multi-language (i18n)
9
import gettext
10
# package to facilitate working with directories and files
11
from pathlib import Path
12
# custom classes specific to this project
13
from common.BasicNeeds import BasicNeeds
14
from common.CommandLineArgumentsManagement import CommandLineArgumentsManagement
15
from common.DataInputOutput import DataInputOutput
16
from common.DataManipulator import DataManipulator
17
from common.FileOperations import FileOperations
18
from common.LoggingNeeds import LoggingNeeds
19
from common.ParameterHandling import ParameterHandling
20
from db_extractor.BasicNeedsForExtractor import BasicNeedsForExtractor
21
from db_extractor.DatabaseTalker import DatabaseTalker
22
23
24
class ExtractNeeds:
25
    class_bn = None
26
    class_bnfe = None
27
    class_clam = None
28
    class_dbt = None
29
    class_dio = None
30
    class_dm = None
31
    class_fo = None
32
    class_ln = None
33
    class_ph = None
34
    config = None
35
    file_extract_sequence = None
36
    locale = None
37
    parameters = None
38
    script = None
39
    source_systems = None
40
    timer = None
41
    user_credentials = None
42
43
    def __init__(self, destination_script, in_language='en_US'):
44
        self.script = destination_script
45
        current_file_basename = os.path.basename(__file__).replace('.py', '')
46
        lang_folder = os.path.join(os.path.dirname(__file__), current_file_basename + '_Locale')
47
        self.locale = gettext.translation(
48
            current_file_basename, lang_folder, languages=[in_language])
49
        # instantiate Basic Needs class
50
        self.class_bn = BasicNeeds(in_language)
51
        # instantiate Extractor Specific Needs class
52
        self.class_bnfe = BasicNeedsForExtractor(in_language)
53
        # instantiate File Operations class
54
        self.class_fo = FileOperations(in_language)
55
        # instantiate File Operations class
56
        self.class_dbt = DatabaseTalker(in_language)
57
        # instantiate Data Manipulator class, useful to manipulate data frames
58
        self.class_dio = DataInputOutput(in_language)
59
        # instantiate Data Manipulator class, useful to manipulate data frames
60
        self.class_dm = DataManipulator(in_language)
61
        # instantiate Command Line Arguments class
62
        self.class_clam = CommandLineArgumentsManagement(in_language)
63
        # instantiate Logger class
64
        self.class_ln = LoggingNeeds()
65
        # instantiate Parameter Handling class
66
        self.class_ph = ParameterHandling(in_language)
67
68
    def close_connection(self, local_logger):
69
        self.timer.start()
70
        local_logger.info(self.locale.gettext('Closing DB connection'))
71
        self.class_dbt.connection.close()
72
        local_logger.info(self.locale.gettext('Closing DB completed'))
73
        self.timer.stop()
74
75
    def close_cursor(self, local_logger, in_cursor):
76
        self.timer.start()
77
        local_logger.info(self.locale.gettext('Free DB result-set started'))
78
        in_cursor.close()
79
        local_logger.info(self.locale.gettext('Free DB result-set completed'))
80
        self.timer.stop()
81
82
    def evaluate_extraction_overwrite_condition(self, extraction_required, in_dict):
83
        if in_dict['session']['extract-behaviour'] == 'overwrite-if-output-file-exists' \
84
                and 'extract-overwrite-condition' in in_dict['session'] \
85
                and Path(in_dict['file']['name']).is_file():
86
            fv = self.class_bnfe.fn_is_extraction_necessary_additional(
87
                self.class_ln.logger, self.class_ph, self.class_fo, in_dict)
88
            extraction_required = False
89
            new_verdict = self.locale.gettext('not required')
90
            if fv == self.class_fo.locale.gettext('older'):
91
                extraction_required = True
92
                new_verdict = self.locale.gettext('required')
93
            self.class_ln.logger.debug(self.locale.gettext(
94
                'Additional evaluation took place and new verdict is: {new_verdict}')
95
                                       .replace('{new_verdict}', new_verdict))
96
        return extraction_required
97
98
    def evaluate_if_extraction_is_required(self, in_dict):
99
        extraction_required = False
100
        if type(in_dict['session']['output-file']) == dict:
101
            extraction_required = self.evaluate_if_extraction_is_required_for_single_file({
102
                'session': in_dict['session'],
103
                'query': in_dict['query'],
104
                'sequence': in_dict['sequence'],
105
                'file': in_dict['session']['output-file'],
106
            })
107
        elif type(in_dict['session']['output-file']) == list:
108
            extraction_required = self.evaluate_if_extraction_is_required_list(in_dict)
109
        return extraction_required
110
111
    def evaluate_if_extraction_is_required_list(self, in_dict):
112
        evaluated_extraction = {}
113
        for crt_file in in_dict['session']['output-file']:
114
            crt_eval = self.evaluate_if_extraction_is_required_for_single_file({
115
                'session': in_dict['session'],
116
                'query': in_dict['query'],
117
                'sequence': in_dict['sequence'],
118
                'file': crt_file,
119
            })
120
            evaluated_extraction.update({str(crt_file['name']): crt_eval})
121
        extraction_required = self.class_bn.fn_evaluate_dict_values(evaluated_extraction)
122
        self.class_ln.logger.debug(evaluated_extraction)
123
        overall_verdict = self.locale.gettext('not required')
124
        if extraction_required:
125
            overall_verdict = self.locale.gettext('required')
126
        self.class_ln.logger.debug(self.locale.gettext(
127
            'Overall new verdict after considering multiple files is: {overall_verdict}')
128
                                   .replace('{overall_verdict}', overall_verdict))
129
        return extraction_required
130
131
    def evaluate_if_extraction_is_required_for_single_file(self, in_dict):
132
        in_dict['file']['name'] = self.class_ph.eval_expression(
133
            self.class_ln.logger, in_dict['file']['name'], in_dict['session']['start-iso-weekday'])
134
        e_dict = {
135
            'extract-behaviour': in_dict['session']['extract-behaviour'],
136
            'output-csv-file': in_dict['file']['name'],
137
        }
138
        extraction_required = self.class_bnfe.fn_is_extraction_necessary(
139
            self.class_ln.logger, e_dict)
140
        extraction_required = self.evaluate_extraction_overwrite_condition(
141
            extraction_required, in_dict)
142
        return extraction_required
143
144
    def extract_query_to_result_set(self, local_logger, in_cursor, in_dictionary):
145
        this_session = in_dictionary['session']
146
        this_query = in_dictionary['query']
147
        # get query parameters into a tuple
148
        tuple_parameters = self.class_ph.handle_query_parameters(
149
            local_logger, this_session, this_session['start-iso-weekday'])
150
        # measure expected number of parameters
151
        expected_no_of_parameters = str(this_query).count('%s')
152
        # simulate final query to log (useful for debugging purposes)
153
        simulated_query = self.class_ph.simulate_final_query(
154
            local_logger, self.timer, this_query, expected_no_of_parameters, tuple_parameters)
155
        simulated_query_single_line = self.class_bn.fn_multi_line_string_to_single(simulated_query)
156
        local_logger.info(self.locale.gettext('Query with parameters interpreted is: %s')
157
                          .replace('%s', simulated_query_single_line))
158
        # actual execution of the query
159
        in_cursor = self.class_dbt.execute_query(
160
            local_logger, self.timer, in_cursor, this_query,
161
            expected_no_of_parameters, tuple_parameters)
162
        # bringing the information from server (data transfer)
163
        dict_to_return = {
164
            'rows_counted': 0
165
        }
166
        if in_cursor is not None:
167
            dict_to_return = {
168
                'columns': self.class_dbt.get_column_names(local_logger, self.timer, in_cursor),
169
                'result_set': self.class_dbt.fetch_executed_query(
170
                        local_logger, self.timer, in_cursor),
171
                'rows_counted': in_cursor.rowcount,
172
            }
173
        return dict_to_return
174
175
    def initiate_logger_and_timer(self):
176
        # initiate logger
177
        self.class_ln.initiate_logger(self.parameters.output_log_file, self.script)
178
        # initiate localization specific for this script
179
        # define global timer to use
180
        self.timer = Timer(self.script,
181
                           text=self.locale.gettext('Time spent is {seconds}'),
182
                           logger=self.class_ln.logger.debug)
183
184
    def load_configuration(self):
185
        # load application configuration (inputs are defined into a json file)
186
        ref_folder = os.path.dirname(__file__).replace('db_extractor', 'config')
187
        config_file = os.path.join(ref_folder, 'db-extractor.json').replace('\\', '/')
188
        self.config = self.class_fo.fn_open_file_and_get_content(config_file)
189
        # get command line parameter values
190
        self.parameters = self.class_clam.parse_arguments(self.config['input_options'][self.script])
191
        # checking inputs, if anything is invalid an exit(1) will take place
192
        self.class_bn.fn_check_inputs(self.parameters)
193
        # checking inputs, if anything is invalid an exit(1) will take place
194
        self.class_bnfe.fn_check_inputs_specific(self.parameters)
195
196
    def load_extraction_sequence_and_dependencies(self):
197
        self.timer.start()
198
        self.file_extract_sequence = self.class_fo.fn_open_file_and_get_content(
199
            self.parameters.input_extracting_sequence_file, 'json')
200
        self.class_ln.logger.info(self.locale.gettext(
201
            'Configuration file name with extracting sequence(es) has been loaded'))
202
        self.timer.stop()
203
        # store file statistics
204
        self.class_fo.fn_store_file_statistics(
205
            self.class_ln.logger, self.timer, self.parameters.input_extracting_sequence_file,
206
            self.locale.gettext('Configuration file name with extracting sequence(es)'))
207
        # get the source system details from provided file
208
        self.timer.start()
209
        self.source_systems = self.class_fo.fn_open_file_and_get_content(
210
            self.parameters.input_source_system_file, 'json')['Systems']
211
        self.class_ln.logger.info(self.locale.gettext('Source Systems file name has been loaded'))
212
        self.timer.stop()
213
        self.class_fo.fn_store_file_statistics(
214
            self.class_ln.logger, self.timer, self.parameters.input_source_system_file,
215
            self.locale.gettext('Source Systems file name'))
216
        # get the source system details from provided file
217
        self.timer.start()
218
        self.user_credentials = self.class_fo.fn_open_file_and_get_content(
219
            self.parameters.input_credentials_file, 'json')['Credentials']
220
        self.class_ln.logger.info(self.locale.gettext(
221
            'Configuration file name with credentials has been loaded'))
222
        self.timer.stop()
223
        self.class_fo.fn_store_file_statistics(
224
            self.class_ln.logger, self.timer, self.parameters.input_credentials_file,
225
            self.locale.gettext('Configuration file name with credentials'))
226
227
    def load_query(self, crt_query):
228
        self.timer.start()
229
        query = self.class_fo.fn_open_file_and_get_content(crt_query['input-query-file'], 'raw')
230
        feedback = self.locale.gettext('Generic query is: %s') \
231
            .replace('%s', self.class_bn.fn_multi_line_string_to_single(query))
232
        self.class_ln.logger.info(feedback)
233
        self.timer.stop()
234
        return query
235
236
    @staticmethod
237
    def pack_three_levels(in_session, in_query, in_sequence):
238
        return {
239
            'session': in_session,
240
            'query': in_query,
241
            'sequence': in_sequence,
242
        }
243
244
    def result_set_to_disk_file(self, local_logger, stats, in_dict):
245
        result_df = self.class_dbt.result_set_to_data_frame(
246
            local_logger, self.timer, stats['columns'], stats['result_set'])
247
        if 'additional-columns' in in_dict['session']:
248
            if in_dict['session']['additional-columns'] == 'inherit-from-parent':
249
                in_dict['session']['additional-columns'] = in_dict['query']['additional-columns']
250
            elif in_dict['session']['additional-columns'] == 'inherit-from-grand-parent':
251
                in_dict['session']['additional-columns'] = in_dict['sequence']['additional-columns']
252
            result_df = self.class_dbt.append_additional_columns_to_df(
253
                local_logger, self.timer, result_df, in_dict['session'])
254
        self.store_result_set_to_disk(self.class_ln.logger, result_df, in_dict['session'])
255
256
    @staticmethod
257
    def set_default_starting_weekday(in_dict):
258
        week_starts_with_iso_weekday = 1
259
        if 'start-iso-weekday' in in_dict['session']:
260
            if in_dict['session']['start-iso-weekday'] == 'inherit-from-parent':
261
                in_dict['session']['start-iso-weekday'] = in_dict['query']['start-iso-weekday']
262
            elif in_dict['session']['start-iso-weekday'] == 'inherit-from-grand-parent':
263
                in_dict['session']['start-iso-weekday'] = in_dict['sequence']['start-iso-weekday']
264
            week_starts_with_iso_weekday = in_dict['session']['start-iso-weekday']
265
        return week_starts_with_iso_weekday
266
267
    @staticmethod
268
    def set_default_parameter_rules(in_dict):
269
        # assumption is for either DICT or LIST values are numeric
270
        # in case text is given different rules have to be specified
271
        dictionary_to_return = {
272
            "dict-values-glue": ", ",
273
            "dict-values-prefix": "IN (",
274
            "dict-values-suffix": ")",
275
            "list-values-glue": ", ",
276
            "list-values-prefix": "",
277
            "list-values-suffix": ""
278
        }
279
        if 'parameters-handling-rules' in in_dict['session']:
280
            dictionary_to_return = in_dict['session']['parameters-handling-rules']
281
            if dictionary_to_return == 'inherit-from-parent':
282
                dictionary_to_return = in_dict['query']['parameters-handling-rules']
283
            elif dictionary_to_return == 'inherit-from-grand-parent':
284
                dictionary_to_return = in_dict['sequence']['parameters-handling-rules']
285
        return dictionary_to_return
286
287
    def store_result_set_to_disk(self, local_logger, in_data_frame, crt_session):
288
        output_file_setting_type = type(crt_session['output-file'])
289
        if output_file_setting_type == dict:
290
            self.class_dio.fn_store_data_frame_to_file(
291
                local_logger, self.timer, in_data_frame, crt_session['output-file'])
292
            self.class_fo.fn_store_file_statistics(
293
                local_logger, self.timer, crt_session['output-file']['name'],
294
                self.locale.gettext('Output file name'))
295
        elif output_file_setting_type == list:
296
            for crt_output in crt_session['output-file']:
297
                self.class_dio.fn_store_data_frame_to_file(
298
                    local_logger, self.timer, in_data_frame, crt_output)
299
                self.class_fo.fn_store_file_statistics(
300
                    local_logger, self.timer, crt_output['name'],
301
                    self.locale.gettext('Output file name'))
302