Passed
Push — development/test ( ac4b33...352ca9 )
by Daniel
01:12
created

ExtractNeeds.evaluate_extraction_overwrite_condition()   A

Complexity

Conditions 5

Size

Total Lines 15
Code Lines 15

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 15
dl 0
loc 15
rs 9.1832
c 0
b 0
f 0
cc 5
nop 3
1
"""
2
main class to support Extract script
3
"""
4
# useful methods to measure time performance by small pieces of code
5
from codetiming import Timer
6
# package to facilitate operating system operations
7
import os
8
# package to add support for multi-language (i18n)
9
import gettext
10
# package to facilitate working with directories and files
11
from pathlib import Path
12
# custom classes specific to this project
13
from common.BasicNeeds import BasicNeeds
14
from common.CommandLineArgumentsManagement import CommandLineArgumentsManagement
15
from common.DataInputOutput import DataInputOutput
16
from common.DataManipulator import DataManipulator
17
from common.FileOperations import FileOperations
18
from common.LoggingNeeds import LoggingNeeds
19
from db_extractor.BasicNeedsForExtractor import BasicNeedsForExtractor
20
from db_extractor.DatabaseTalker import DatabaseTalker
21
from db_extractor.ParameterHandling import ParameterHandling
22
23
24
class ExtractNeeds:
25
    class_bn = None
26
    class_bnfe = None
27
    class_clam = None
28
    class_dbt = None
29
    class_dio = None
30
    class_dm = None
31
    class_fo = None
32
    class_ln = None
33
    class_ph = None
34
    config = None
35
    file_extract_sequence = None
36
    locale = None
37
    parameters = None
38
    script = None
39
    source_systems = None
40
    timer = None
41
    user_credentials = None
42
43
    def __init__(self, destination_script, default_language='en_US'):
44
        self.script = destination_script
45
        current_file_basename = os.path.basename(__file__).replace('.py', '')
46
        lang_folder = os.path.join(os.path.dirname(__file__), current_file_basename + '_Locale')
47
        self.locale = gettext.translation(current_file_basename, lang_folder,
48
                                          languages=[default_language])
49
        # instantiate Basic Needs class
50
        self.class_bn = BasicNeeds(default_language)
51
        # instantiate Extractor Specific Needs class
52
        self.class_bnfe = BasicNeedsForExtractor(default_language)
53
        # instantiate File Operations class
54
        self.class_fo = FileOperations(default_language)
55
        # instantiate File Operations class
56
        self.class_dbt = DatabaseTalker(default_language)
57
        # instantiate Data Manipulator class, useful to manipulate data frames
58
        self.class_dio = DataInputOutput(default_language)
59
        # instantiate Data Manipulator class, useful to manipulate data frames
60
        self.class_dm = DataManipulator(default_language)
61
        # instantiate Command Line Arguments class
62
        self.class_clam = CommandLineArgumentsManagement(default_language)
63
        # instantiate Logger class
64
        self.class_ln = LoggingNeeds()
65
        # instantiate Parameter Handling class
66
        self.class_ph = ParameterHandling(default_language)
67
68
    def close_connection(self, local_logger):
69
        self.timer.start()
70
        local_logger.info(self.locale.gettext('Closing DB connection'))
71
        self.class_dbt.conn.close()
72
        local_logger.info(self.locale.gettext('Closing DB completed'))
73
        self.timer.stop()
74
75
    def close_cursor(self, local_logger, in_cursor):
76
        self.timer.start()
77
        local_logger.info(self.locale.gettext('Free DB result-set started'))
78
        in_cursor.close()
79
        local_logger.info(self.locale.gettext('Free DB result-set completed'))
80
        self.timer.stop()
81
82
    def evaluate_extraction_overwrite_condition(self, extraction_required, in_dict):
83
        if in_dict['session']['extract-behaviour'] == 'overwrite-if-output-file-exists' \
84
                and 'extract-overwrite-condition' in in_dict['session'] \
85
                and Path(in_dict['file']['name']).is_file():
86
            fv = self.class_bnfe.fn_is_extraction_neccesary_additional(
87
                self.class_ln.logger, self.class_ph, self.class_fo, in_dict)
88
            extraction_required = False
89
            new_verdict = self.locale.gettext('not required')
90
            if fv == self.class_fo.lcl.gettext('older'):
91
                extraction_required = True
92
                new_verdict = self.locale.gettext('required')
93
            self.class_ln.logger.debug(self.locale.gettext(
94
                'Additional evaluation took place and new verdict is: {new_verdict}')
95
                                       .replace('{new_verdict}', new_verdict))
96
        return extraction_required
97
98
    def evaluate_if_extraction_is_required(self, in_dict):
99
        extraction_required = False
100
        if type(in_dict['session']['output-file']) == dict:
101
            extraction_required = self.evaluate_if_extraction_is_required_for_single_file({
102
                'session': in_dict['session'],
103
                'query': in_dict['query'],
104
                'sequence': in_dict['sequence'],
105
                'file': in_dict['session']['output-file'],
106
            })
107
        elif type(in_dict['session']['output-file']) == list:
108
            extraction_required = self.evaluate_if_extraction_is_required_list(in_dict)
109
        return extraction_required
110
111
    def evaluate_if_extraction_is_required_list(self, in_dict):
112
        evaluated_extraction = {}
113
        for crt_file in in_dict['session']['output-file']:
114
            crt_eval = self.evaluate_if_extraction_is_required_for_single_file({
115
                'session': in_dict['session'],
116
                'query': in_dict['query'],
117
                'sequence': in_dict['sequence'],
118
                'file': crt_file,
119
            })
120
            evaluated_extraction.update({str(crt_file['name']): crt_eval})
121
        extraction_required = self.class_bn.fn_evaluate_dict_values(evaluated_extraction)
122
        self.class_ln.logger.debug(evaluated_extraction)
123
        overall_verdict = self.locale.gettext('not required')
124
        if extraction_required:
125
            overall_verdict = self.locale.gettext('required')
126
        self.class_ln.logger.debug(self.locale.gettext(
127
            'Overall new verdict after considering multiple files is: {overall_verdict}')
128
                                   .replace('{overall_verdict}', overall_verdict))
129
        return extraction_required
130
131
    def evaluate_if_extraction_is_required_for_single_file(self, in_dict):
132
        in_dict['file']['name'] = self.class_ph.eval_expression(
133
            self.class_ln.logger, in_dict['file']['name'], in_dict['session']['start-isoweekday'])
134
        e_dict = {
135
            'extract-behaviour': in_dict['session']['extract-behaviour'],
136
            'output-csv-file': in_dict['file']['name'],
137
        }
138
        extraction_required = self.class_bnfe.fn_is_extraction_necessary(
139
            self.class_ln.logger, e_dict)
140
        extraction_required = self.evaluate_extraction_overwrite_condition(
141
            extraction_required, in_dict)
142
        return extraction_required
143
144
    def extract_query_to_result_set(self, local_logger, in_cursor, in_dictionary):
145
        this_session = in_dictionary['session']
146
        this_query = in_dictionary['query']
147
        # get query parameters into a tuple
148
        tuple_parameters = self.class_ph.handle_query_parameters(local_logger, this_session,
149
                                                                 this_session['start-isoweekday'])
150
        # measure expected number of parameters
151
        expected_number_of_parameters = str(this_query).count('%s')
152
        # simulate final query to log (useful for debugging purposes)
153
        simulated_query = self.class_ph.simulate_final_query(local_logger, self.timer, this_query,
154
                                                             expected_number_of_parameters,
155
                                                             tuple_parameters)
156
        local_logger.info(self.locale.gettext('Query with parameters interpreted is: %s') \
157
                          .replace('%s',
158
                                   self.class_bn.fn_multi_line_string_to_single(simulated_query)))
159
        # actual execution of the query
160
        in_cursor = self.class_dbt.execute_query(local_logger, self.timer, in_cursor, this_query,
161
                                                 expected_number_of_parameters, tuple_parameters)
162
        # bringing the information from server (data transfer)
163
        dict_to_return = {
164
            'rows_counted': 0
165
        }
166
        if in_cursor is not None:
167
            dict_to_return = {
168
                'columns': self.class_dbt.get_column_names(local_logger, self.timer, in_cursor),
169
                'result_set': self.class_dbt.fetch_executed_query(local_logger, self.timer,
170
                                                                  in_cursor),
171
                'rows_counted': in_cursor.rowcount,
172
            }
173
        return dict_to_return
174
175
    def initiate_logger_and_timer(self):
176
        # initiate logger
177
        self.class_ln.initiate_logger(self.parameters.output_log_file, self.script)
178
        # initiate localization specific for this script
179
        # define global timer to use
180
        self.timer = Timer(self.script,
181
                           text=self.locale.gettext('Time spent is {seconds}'),
182
                           logger=self.class_ln.logger.debug)
183
184
    def load_configuration(self):
185
        # load application configuration (inputs are defined into a json file)
186
        ref_folder = os.path.dirname(__file__).replace('db_extractor', 'config')
187
        config_file = os.path.join(ref_folder, 'db-extractor.json').replace('\\', '/')
188
        self.config = self.class_fo.fn_open_file_and_get_content(config_file)
189
        # get command line parameter values
190
        self.parameters = self.class_clam.parse_arguments(self.config['input_options'][self.script])
191
        # checking inputs, if anything is invalid an exit(1) will take place
192
        self.class_bn.fn_check_inputs(self.parameters)
193
        # checking inputs, if anything is invalid an exit(1) will take place
194
        self.class_bnfe.fn_check_inputs_specific(self.parameters)
195
196
    def load_extraction_sequence_and_dependencies(self):
197
        self.timer.start()
198
        self.file_extract_sequence = self.class_fo.fn_open_file_and_get_content(
199
            self.parameters.input_extracting_sequence_file, 'json')
200
        self.class_ln.logger.info(self.locale.gettext(
201
            'Configuration file name with extracting sequence(es) has been loaded'))
202
        self.timer.stop()
203
        # store file statistics
204
        self.class_fo.fn_store_file_statistics(self.class_ln.logger, self.timer,
205
                                               self.parameters.input_extracting_sequence_file,
206
                                               self.locale.gettext('Configuration file name with '
207
                                                                   + 'extracting sequence(es)'))
208
        # get the source system details from provided file
209
        self.timer.start()
210
        self.source_systems = self.class_fo.fn_open_file_and_get_content( \
211
            self.parameters.input_source_system_file, 'json')['Systems']
212
        self.class_ln.logger.info(self.locale.gettext('Source Systems file name has been loaded'))
213
        self.timer.stop()
214
        self.class_fo.fn_store_file_statistics(self.class_ln.logger, self.timer,
215
                                               self.parameters.input_source_system_file,
216
                                               self.locale.gettext('Source Systems file name'))
217
        # get the source system details from provided file
218
        self.timer.start()
219
        self.user_credentials = self.class_fo.fn_open_file_and_get_content(
220
            self.parameters.input_credentials_file, 'json')['Credentials']
221
        self.class_ln.logger.info(self.locale.gettext( \
222
            'Configuration file name with credentials has been loaded'))
223
        self.timer.stop()
224
        self.class_fo.fn_store_file_statistics(self.class_ln.logger, self.timer,
225
                                               self.parameters.input_credentials_file,
226
                                               self.locale.gettext( \
227
                                                   'Configuration file name with credentials'))
228
229
    def load_query(self, crt_query):
230
        self.timer.start()
231
        query = self.class_fo.fn_open_file_and_get_content(crt_query['input-query-file'], 'raw')
232
        feedback = self.locale.gettext('Generic query is: %s') \
233
            .replace('%s', self.class_bn.fn_multi_line_string_to_single(query))
234
        self.class_ln.logger.info(feedback)
235
        self.timer.stop()
236
        return query
237
238
    @staticmethod
239
    def pack_three_levels(in_session, in_query, in_sequence):
240
        return {
241
            'session': in_session,
242
            'query': in_query,
243
            'sequence': in_sequence,
244
        }
245
246
    def result_set_into_data_frame(self, local_logger, stats, in_dict):
247
        result_df = self.class_dbt.result_set_to_data_frame(
248
            local_logger, self.timer, stats['columns'], stats['result_set'])
249
        if 'additional-columns' in in_dict['session']:
250
            if in_dict['session']['additional-columns'] == 'inherit-from-parent':
251
                in_dict['session']['additional-columns'] = in_dict['query']['additional-columns']
252
            elif in_dict['session']['additional-columns'] == 'inherit-from-grand-parent':
253
                in_dict['session']['additional-columns'] = in_dict['sequence']['additional-columns']
254
            result_df = self.class_dbt.append_additional_columns_to_df(
255
                local_logger, self.timer, result_df, in_dict['session'])
256
        return result_df
257
258
    def set_default_starting_weekday(self, crt_session):
259
        week_starts_with_isoweekday = 1
260
        if 'start_isoweekday' in crt_session:
261
            week_starts_with_isoweekday = crt_session['start-isoweekday']
262
        return week_starts_with_isoweekday
263
264
    def set_default_parameter_rules(self, in_dict):
265
        # assumption is for either DICT or LIST values are numeric
266
        # in case text is given different rules have to be specified
267
        dictionary_to_return = {
268
            "dict-values-glue": ", ",
269
            "dict-values-prefix": "IN (",
270
            "dict-values-suffix": ")",
271
            "list-values-glue": ", ",
272
            "list-values-prefix": "",
273
            "list-values-suffix": ""
274
        }
275
        if 'parameters-handling-rules' in in_dict['session']:
276
            dictionary_to_return = in_dict['session']['parameters-handling-rules']
277
            if dictionary_to_return == 'inherit-from-parent':
278
                dictionary_to_return = in_dict['query']['parameters-handling-rules']
279
            elif dictionary_to_return == 'inherit-from-grand-parent':
280
                dictionary_to_return = in_dict['sequence']['parameters-handling-rules']
281
        return dictionary_to_return
282
283
    def store_result_set_to_disk(self, local_logger, in_data_frame, crt_session):
284
        output_file_setting_type = type(crt_session['output-file'])
285
        if output_file_setting_type == dict:
286
            self.class_dio.fn_store_data_frame_to_file(local_logger, self.timer, in_data_frame,
287
                                                       crt_session['output-file'])
288
            self.class_fo.fn_store_file_statistics(local_logger, self.timer,
289
                                                   crt_session['output-file']['name'],
290
                                                   self.locale.gettext('Output file name'))
291
        elif output_file_setting_type == list:
292
            for crt_output in crt_session['output-file']:
293
                self.class_dio.fn_store_data_frame_to_file(local_logger, self.timer,
294
                                                           in_data_frame, crt_output)
295
                self.class_fo.fn_store_file_statistics(local_logger, self.timer, crt_output['name'],
296
                                                       self.locale.gettext('Output file name'))
297