Passed
Branch master (074b4c)
by Daniel
01:12
created

db_extractor.ExtractNeeds   A

Complexity

Total Complexity 38

Size/Duplication

Total Lines 302
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 242
dl 0
loc 302
rs 9.36
c 0
b 0
f 0
wmc 38

17 Methods

Rating   Name   Duplication   Size   Complexity  
A ExtractNeeds.close_connection() 0 6 1
A ExtractNeeds.evaluate_if_extraction_is_required_for_single_file() 0 12 1
A ExtractNeeds.extract_query_to_result_set() 0 30 2
A ExtractNeeds.result_set_to_disk_file() 0 11 4
A ExtractNeeds.load_configuration() 0 11 1
A ExtractNeeds.pack_three_levels() 0 6 1
A ExtractNeeds.set_default_parameter_rules() 0 19 4
A ExtractNeeds.evaluate_if_extraction_is_required() 0 12 3
A ExtractNeeds.initiate_logger_and_timer() 0 8 1
A ExtractNeeds.close_cursor() 0 6 1
A ExtractNeeds.evaluate_if_extraction_is_required_list() 0 19 3
A ExtractNeeds.evaluate_extraction_overwrite_condition() 0 15 5
A ExtractNeeds.__init__() 0 27 1
A ExtractNeeds.load_query() 0 8 1
A ExtractNeeds.load_extraction_sequence_and_dependencies() 0 30 1
A ExtractNeeds.set_default_starting_weekday() 0 10 4
A ExtractNeeds.store_result_set_to_disk() 0 12 4
1
"""
2
main class to support Extract script
3
"""
4
# useful methods to measure time performance by small pieces of code
5
from codetiming import Timer
6
# package to facilitate operating system operations
7
import os
8
# package to add support for multi-language (i18n)
9
import gettext
10
# package to facilitate working with directories and files
11
from pathlib import Path
12
# custom classes specific to this project
13
from db_extractor.BasicNeeds import BasicNeeds
14
from db_extractor.CommandLineArgumentsManagement import CommandLineArgumentsManagement
15
from db_extractor.DataInputOutput import DataInputOutput
16
from db_extractor.DataManipulator import DataManipulator
17
from db_extractor.FileOperations import FileOperations
18
from db_extractor.LoggingNeeds import LoggingNeeds
19
from db_extractor.ParameterHandling import ParameterHandling
20
from db_extractor.BasicNeedsForExtractor import BasicNeedsForExtractor
21
from db_extractor.DatabaseTalker import DatabaseTalker
22
23
24
class ExtractNeeds:
25
    class_bn = None
26
    class_bnfe = None
27
    class_clam = None
28
    class_dbt = None
29
    class_dio = None
30
    class_dm = None
31
    class_fo = None
32
    class_ln = None
33
    class_ph = None
34
    config = None
35
    file_extract_sequence = None
36
    locale = None
37
    parameters = None
38
    script = None
39
    source_systems = None
40
    timer = None
41
    user_credentials = None
42
43
    def __init__(self, destination_script, in_language='en_US'):
44
        self.script = destination_script
45
        file_parts = os.path.normpath(os.path.abspath(__file__)).replace('\\', os.path.altsep)\
46
            .split(os.path.altsep)
47
        locale_domain = file_parts[(len(file_parts)-1)].replace('.py', '')
48
        locale_folder = os.path.normpath(os.path.join(
49
            os.path.join(os.path.altsep.join(file_parts[:-2]), 'project_locale'), locale_domain))
50
        self.locale = gettext.translation(locale_domain, localedir=locale_folder,
51
                                          languages=[in_language], fallback=True)
52
        # instantiate Basic Needs class
53
        self.class_bn = BasicNeeds(in_language)
54
        # instantiate Extractor Specific Needs class
55
        self.class_bnfe = BasicNeedsForExtractor(in_language)
56
        # instantiate File Operations class
57
        self.class_fo = FileOperations(in_language)
58
        # instantiate File Operations class
59
        self.class_dbt = DatabaseTalker(in_language)
60
        # instantiate Data Manipulator class, useful to manipulate data frames
61
        self.class_dio = DataInputOutput(in_language)
62
        # instantiate Data Manipulator class, useful to manipulate data frames
63
        self.class_dm = DataManipulator(in_language)
64
        # instantiate Command Line Arguments class
65
        self.class_clam = CommandLineArgumentsManagement(in_language)
66
        # instantiate Logger class
67
        self.class_ln = LoggingNeeds()
68
        # instantiate Parameter Handling class
69
        self.class_ph = ParameterHandling(in_language)
70
71
    def close_connection(self, local_logger):
72
        self.timer.start()
73
        local_logger.info(self.locale.gettext('Closing DB connection'))
74
        self.class_dbt.connection.close()
75
        local_logger.info(self.locale.gettext('Closing DB completed'))
76
        self.timer.stop()
77
78
    def close_cursor(self, local_logger, in_cursor):
79
        self.timer.start()
80
        local_logger.info(self.locale.gettext('Free DB result-set started'))
81
        in_cursor.close()
82
        local_logger.info(self.locale.gettext('Free DB result-set completed'))
83
        self.timer.stop()
84
85
    def evaluate_extraction_overwrite_condition(self, extraction_required, in_dict):
86
        if in_dict['session']['extract-behaviour'] == 'overwrite-if-output-file-exists' \
87
                and 'extract-overwrite-condition' in in_dict['session'] \
88
                and Path(in_dict['file']['name']).is_file():
89
            fv = self.class_bnfe.fn_is_extraction_necessary_additional(
90
                self.class_ln.logger, self.class_ph, self.class_fo, in_dict)
91
            extraction_required = False
92
            new_verdict = self.locale.gettext('not required')
93
            if fv == self.class_fo.locale.gettext('older'):
94
                extraction_required = True
95
                new_verdict = self.locale.gettext('required')
96
            self.class_ln.logger.debug(self.locale.gettext(
97
                'Additional evaluation took place and new verdict is: {new_verdict}')
98
                                       .replace('{new_verdict}', new_verdict))
99
        return extraction_required
100
101
    def evaluate_if_extraction_is_required(self, in_dict):
102
        extraction_required = False
103
        if type(in_dict['session']['output-file']) == dict:
104
            extraction_required = self.evaluate_if_extraction_is_required_for_single_file({
105
                'session': in_dict['session'],
106
                'query': in_dict['query'],
107
                'sequence': in_dict['sequence'],
108
                'file': in_dict['session']['output-file'],
109
            })
110
        elif type(in_dict['session']['output-file']) == list:
111
            extraction_required = self.evaluate_if_extraction_is_required_list(in_dict)
112
        return extraction_required
113
114
    def evaluate_if_extraction_is_required_list(self, in_dict):
115
        evaluated_extraction = {}
116
        for crt_file in in_dict['session']['output-file']:
117
            crt_eval = self.evaluate_if_extraction_is_required_for_single_file({
118
                'session': in_dict['session'],
119
                'query': in_dict['query'],
120
                'sequence': in_dict['sequence'],
121
                'file': crt_file,
122
            })
123
            evaluated_extraction.update({str(crt_file['name']): crt_eval})
124
        extraction_required = self.class_bn.fn_evaluate_dict_values(evaluated_extraction)
125
        self.class_ln.logger.debug(evaluated_extraction)
126
        overall_verdict = self.locale.gettext('not required')
127
        if extraction_required:
128
            overall_verdict = self.locale.gettext('required')
129
        self.class_ln.logger.debug(self.locale.gettext(
130
            'Overall new verdict after considering multiple files is: {overall_verdict}')
131
                                   .replace('{overall_verdict}', overall_verdict))
132
        return extraction_required
133
134
    def evaluate_if_extraction_is_required_for_single_file(self, in_dict):
135
        in_dict['file']['name'] = self.class_ph.eval_expression(
136
            self.class_ln.logger, in_dict['file']['name'], in_dict['session']['start-iso-weekday'])
137
        e_dict = {
138
            'extract-behaviour': in_dict['session']['extract-behaviour'],
139
            'output-csv-file': in_dict['file']['name'],
140
        }
141
        extraction_required = self.class_bnfe.fn_is_extraction_necessary(
142
            self.class_ln.logger, e_dict)
143
        extraction_required = self.evaluate_extraction_overwrite_condition(
144
            extraction_required, in_dict)
145
        return extraction_required
146
147
    def extract_query_to_result_set(self, local_logger, in_cursor, in_dictionary):
148
        this_session = in_dictionary['session']
149
        this_query = in_dictionary['query']
150
        # get query parameters into a tuple
151
        tuple_parameters = self.class_ph.handle_query_parameters(
152
            local_logger, this_session, this_session['start-iso-weekday'])
153
        # measure expected number of parameters
154
        expected_no_of_parameters = str(this_query).count('%s')
155
        # simulate final query to log (useful for debugging purposes)
156
        simulated_query = self.class_ph.simulate_final_query(
157
            local_logger, self.timer, this_query, expected_no_of_parameters, tuple_parameters)
158
        simulated_query_single_line = self.class_bn.fn_multi_line_string_to_single(simulated_query)
159
        local_logger.info(self.locale.gettext('Query with parameters interpreted is: %s')
160
                          .replace('%s', simulated_query_single_line))
161
        # actual execution of the query
162
        in_cursor = self.class_dbt.execute_query(
163
            local_logger, self.timer, in_cursor, this_query,
164
            expected_no_of_parameters, tuple_parameters)
165
        # bringing the information from server (data transfer)
166
        dict_to_return = {
167
            'rows_counted': 0
168
        }
169
        if in_cursor is not None:
170
            dict_to_return = {
171
                'columns': self.class_dbt.get_column_names(local_logger, self.timer, in_cursor),
172
                'result_set': self.class_dbt.fetch_executed_query(
173
                        local_logger, self.timer, in_cursor),
174
                'rows_counted': in_cursor.rowcount,
175
            }
176
        return dict_to_return
177
178
    def initiate_logger_and_timer(self):
179
        # initiate logger
180
        self.class_ln.initiate_logger(self.parameters.output_log_file, self.script)
181
        # initiate localization specific for this script
182
        # define global timer to use
183
        self.timer = Timer(self.script,
184
                           text=self.locale.gettext('Time spent is {seconds}'),
185
                           logger=self.class_ln.logger.debug)
186
187
    def load_configuration(self):
188
        # load application configuration (inputs are defined into a json file)
189
        ref_folder = os.path.dirname(__file__).replace('db_extractor', 'config')
190
        config_file = os.path.join(ref_folder, 'db-extractor.json').replace('\\', '/')
191
        self.config = self.class_fo.fn_open_file_and_get_content(config_file)
192
        # get command line parameter values
193
        self.parameters = self.class_clam.parse_arguments(self.config['input_options'][self.script])
194
        # checking inputs, if anything is invalid an exit(1) will take place
195
        self.class_bn.fn_check_inputs(self.parameters)
196
        # checking inputs, if anything is invalid an exit(1) will take place
197
        self.class_bnfe.fn_check_inputs_specific(self.parameters)
198
199
    def load_extraction_sequence_and_dependencies(self):
200
        self.timer.start()
201
        self.file_extract_sequence = self.class_fo.fn_open_file_and_get_content(
202
            self.parameters.input_extracting_sequence_file, 'json')
203
        self.class_ln.logger.info(self.locale.gettext(
204
            'Configuration file name with extracting sequence(es) has been loaded'))
205
        self.timer.stop()
206
        # store file statistics
207
        self.class_fo.fn_store_file_statistics(
208
            self.class_ln.logger, self.timer, self.parameters.input_extracting_sequence_file,
209
            self.locale.gettext('Configuration file name with extracting sequence(es)'))
210
        # get the source system details from provided file
211
        self.timer.start()
212
        self.source_systems = self.class_fo.fn_open_file_and_get_content(
213
            self.parameters.input_source_system_file, 'json')['Systems']
214
        self.class_ln.logger.info(self.locale.gettext('Source Systems file name has been loaded'))
215
        self.timer.stop()
216
        self.class_fo.fn_store_file_statistics(
217
            self.class_ln.logger, self.timer, self.parameters.input_source_system_file,
218
            self.locale.gettext('Source Systems file name'))
219
        # get the source system details from provided file
220
        self.timer.start()
221
        self.user_credentials = self.class_fo.fn_open_file_and_get_content(
222
            self.parameters.input_credentials_file, 'json')['Credentials']
223
        self.class_ln.logger.info(self.locale.gettext(
224
            'Configuration file name with credentials has been loaded'))
225
        self.timer.stop()
226
        self.class_fo.fn_store_file_statistics(
227
            self.class_ln.logger, self.timer, self.parameters.input_credentials_file,
228
            self.locale.gettext('Configuration file name with credentials'))
229
230
    def load_query(self, crt_query):
231
        self.timer.start()
232
        query = self.class_fo.fn_open_file_and_get_content(crt_query['input-query-file'], 'raw')
233
        feedback = self.locale.gettext('Generic query is: %s') \
234
            .replace('%s', self.class_bn.fn_multi_line_string_to_single(query))
235
        self.class_ln.logger.info(feedback)
236
        self.timer.stop()
237
        return query
238
239
    @staticmethod
240
    def pack_three_levels(in_session, in_query, in_sequence):
241
        return {
242
            'session': in_session,
243
            'query': in_query,
244
            'sequence': in_sequence,
245
        }
246
247
    def result_set_to_disk_file(self, local_logger, stats, in_dict):
248
        result_df = self.class_dbt.result_set_to_data_frame(
249
            local_logger, self.timer, stats['columns'], stats['result_set'])
250
        if 'additional-columns' in in_dict['session']:
251
            if in_dict['session']['additional-columns'] == 'inherit-from-parent':
252
                in_dict['session']['additional-columns'] = in_dict['query']['additional-columns']
253
            elif in_dict['session']['additional-columns'] == 'inherit-from-grand-parent':
254
                in_dict['session']['additional-columns'] = in_dict['sequence']['additional-columns']
255
            result_df = self.class_dbt.append_additional_columns_to_df(
256
                local_logger, self.timer, result_df, in_dict['session'])
257
        self.store_result_set_to_disk(self.class_ln.logger, result_df, in_dict['session'])
258
259
    @staticmethod
260
    def set_default_starting_weekday(in_dict):
261
        week_starts_with_iso_weekday = 1
262
        if 'start-iso-weekday' in in_dict['session']:
263
            if in_dict['session']['start-iso-weekday'] == 'inherit-from-parent':
264
                in_dict['session']['start-iso-weekday'] = in_dict['query']['start-iso-weekday']
265
            elif in_dict['session']['start-iso-weekday'] == 'inherit-from-grand-parent':
266
                in_dict['session']['start-iso-weekday'] = in_dict['sequence']['start-iso-weekday']
267
            week_starts_with_iso_weekday = in_dict['session']['start-iso-weekday']
268
        return week_starts_with_iso_weekday
269
270
    @staticmethod
271
    def set_default_parameter_rules(in_dict):
272
        # assumption is for either DICT or LIST values are numeric
273
        # in case text is given different rules have to be specified
274
        dictionary_to_return = {
275
            "dict-values-glue": ", ",
276
            "dict-values-prefix": "IN (",
277
            "dict-values-suffix": ")",
278
            "list-values-glue": ", ",
279
            "list-values-prefix": "",
280
            "list-values-suffix": ""
281
        }
282
        if 'parameters-handling-rules' in in_dict['session']:
283
            dictionary_to_return = in_dict['session']['parameters-handling-rules']
284
            if dictionary_to_return == 'inherit-from-parent':
285
                dictionary_to_return = in_dict['query']['parameters-handling-rules']
286
            elif dictionary_to_return == 'inherit-from-grand-parent':
287
                dictionary_to_return = in_dict['sequence']['parameters-handling-rules']
288
        return dictionary_to_return
289
290
    def store_result_set_to_disk(self, local_logger, in_data_frame, crt_session):
291
        output_file_setting_type = type(crt_session['output-file'])
292
        if output_file_setting_type in (dict, list):
293
            output_list = crt_session['output-file']
294
            if output_file_setting_type == dict:
295
                output_list = [crt_session['output-file']]
296
            for crt_output in output_list:
297
                self.class_dio.fn_store_data_frame_to_file(
298
                    local_logger, self.timer, in_data_frame, crt_output)
299
                self.class_fo.fn_store_file_statistics(
300
                    local_logger, self.timer, crt_output['name'],
301
                    self.locale.gettext('Output file name'))
302