Passed
Push — development/test ( bb7713...53b357 )
by Daniel
01:16
created

ExtractNeeds.pack_three_levels()   A

Complexity

Conditions 1

Size

Total Lines 6
Code Lines 6

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 6
dl 0
loc 6
rs 10
c 0
b 0
f 0
cc 1
nop 3
1
"""
2
main class to support Extract script
3
"""
4
# useful methods to measure time performance by small pieces of code
5
from codetiming import Timer
6
# package to facilitate operating system operations
7
import os
8
# package to add support for multi-language (i18n)
9
import gettext
10
# package to facilitate working with directories and files
11
from pathlib import Path
12
# custom classes specific to this project
13
from common.BasicNeeds import BasicNeeds
14
from common.CommandLineArgumentsManagement import CommandLineArgumentsManagement
15
from common.DataInputOutput import DataInputOutput
16
from common.DataManipulator import DataManipulator
17
from common.FileOperations import FileOperations
18
from common.LoggingNeeds import LoggingNeeds
19
from db_extractor.BasicNeedsForExtractor import BasicNeedsForExtractor
20
from db_extractor.DatabaseTalker import DatabaseTalker
21
from db_extractor.ParameterHandling import ParameterHandling
22
23
24
class ExtractNeeds:
25
    class_bn = None
26
    class_bnfe = None
27
    class_clam = None
28
    class_dbt = None
29
    class_dio = None
30
    class_dm = None
31
    class_fo = None
32
    class_ln = None
33
    class_ph = None
34
    config = None
35
    file_extract_sequence = None
36
    locale = None
37
    parameters = None
38
    script = None
39
    source_systems = None
40
    timer = None
41
    user_credentials = None
42
43
    def __init__(self, destination_script, default_language='en_US'):
44
        self.script = destination_script
45
        current_file_basename = os.path.basename(__file__).replace('.py', '')
46
        lang_folder = os.path.join(os.path.dirname(__file__), current_file_basename + '_Locale')
47
        self.locale = gettext.translation(current_file_basename, lang_folder,
48
                                          languages=[default_language])
49
        # instantiate Basic Needs class
50
        self.class_bn = BasicNeeds(default_language)
51
        # instantiate Extractor Specific Needs class
52
        self.class_bnfe = BasicNeedsForExtractor(default_language)
53
        # instantiate File Operations class
54
        self.class_fo = FileOperations(default_language)
55
        # instantiate File Operations class
56
        self.class_dbt = DatabaseTalker(default_language)
57
        # instantiate Data Manipulator class, useful to manipulate data frames
58
        self.class_dio = DataInputOutput(default_language)
59
        # instantiate Data Manipulator class, useful to manipulate data frames
60
        self.class_dm = DataManipulator(default_language)
61
        # instantiate Command Line Arguments class
62
        self.class_clam = CommandLineArgumentsManagement(default_language)
63
        # instantiate Logger class
64
        self.class_ln = LoggingNeeds()
65
        # instantiate Parameter Handling class
66
        self.class_ph = ParameterHandling(default_language)
67
68
    def close_connection(self, local_logger):
69
        self.timer.start()
70
        local_logger.info(self.locale.gettext('Closing DB connection'))
71
        self.class_dbt.conn.close()
72
        local_logger.info(self.locale.gettext('Closing DB completed'))
73
        self.timer.stop()
74
75
    def close_cursor(self, local_logger, in_cursor):
76
        self.timer.start()
77
        local_logger.info(self.locale.gettext('Free DB result-set started'))
78
        in_cursor.close()
79
        local_logger.info(self.locale.gettext('Free DB result-set completed'))
80
        self.timer.stop()
81
82
    def evaluate_if_extraction_is_required(self, in_dict):
83
        extraction_required = False
84
        if type(in_dict['session']['output-file']) == dict:
85
            extraction_required = self.evaluate_if_extraction_is_required_for_single_file({
86
                'session': in_dict['session'],
87
                'query': in_dict['query'],
88
                'sequence': in_dict['sequence'],
89
                'file': in_dict['session']['output-file'],
90
            })
91
        elif type(in_dict['session']['output-file']) == list:
92
            evaluated_extraction = {}
93
            for crt_file in in_dict['session']['output-file']:
94
                crt_eval = self.evaluate_if_extraction_is_required_for_single_file({
95
                    'session': in_dict['session'],
96
                    'query': in_dict['query'],
97
                    'sequence': in_dict['sequence'],
98
                    'file': crt_file,
99
                })
100
                evaluated_extraction.update({str(crt_file['name']): crt_eval})
101
            extraction_required = self.class_bn.fn_evaluate_dict_values(evaluated_extraction)
102
            self.class_ln.logger.debug(evaluated_extraction)
103
            overall_verdict = self.locale.gettext('not required')
104
            if extraction_required:
105
                overall_verdict = self.locale.gettext('required')
106
            self.class_ln.logger.debug(self.locale.gettext( \
107
                'Overall new verdict after considering multiple files is: {overall_verdict}') \
108
                                       .replace('{overall_verdict}', overall_verdict))
109
        return extraction_required
110
111
    def evaluate_if_extraction_is_required_for_single_file(self, in_dict):
112
        in_dict['file']['name'] = self.class_ph.eval_expression(
113
            self.class_ln.logger, in_dict['file']['name'], in_dict['session']['start-isoweekday'])
114
        e_dict = {
115
            'extract-behaviour': in_dict['session']['extract-behaviour'],
116
            'output-csv-file': in_dict['file']['name'],
117
        }
118
        extraction_required = \
119
            self.class_bnfe.fn_is_extraction_necessary(self.class_ln.logger, e_dict)
120
        if in_dict['session']['extract-behaviour'] == 'overwrite-if-output-file-exists' \
121
                and 'extract-overwrite-condition' in in_dict['session'] \
122
                and Path(in_dict['file']['name']).is_file():
123
            fv = self.class_bnfe.fn_is_extraction_neccesary_additional(
124
                self.class_ln.logger, self.class_ph, self.class_fo, in_dict)
125
            extraction_required = False
126
            new_verdict = self.locale.gettext('not required')
127
            if fv == self.class_fo.lcl.gettext('older'):
128
                extraction_required = True
129
                new_verdict = self.locale.gettext('required')
130
            self.class_ln.logger.debug(self.locale.gettext(
131
                'Additional evaluation took place and new verdict is: {new_verdict}')
132
                                       .replace('{new_verdict}', new_verdict))
133
        return extraction_required
134
135
    def extract_query_to_result_set(self, local_logger, in_cursor, in_dictionary):
136
        this_session = in_dictionary['session']
137
        this_query = in_dictionary['query']
138
        # get query parameters into a tuple
139
        tuple_parameters = self.class_ph.handle_query_parameters(local_logger, this_session,
140
                                                                 this_session['start-isoweekday'])
141
        # measure expected number of parameters
142
        expected_number_of_parameters = str(this_query).count('%s')
143
        # simulate final query to log (useful for debugging purposes)
144
        simulated_query = self.class_ph.simulate_final_query(local_logger, self.timer, this_query,
145
                                                             expected_number_of_parameters,
146
                                                             tuple_parameters)
147
        local_logger.info(self.locale.gettext('Query with parameters interpreted is: %s') \
148
                          .replace('%s',
149
                                   self.class_bn.fn_multi_line_string_to_single(simulated_query)))
150
        # actual execution of the query
151
        in_cursor = self.class_dbt.execute_query(local_logger, self.timer, in_cursor, this_query,
152
                                                 expected_number_of_parameters, tuple_parameters)
153
        # bringing the information from server (data transfer)
154
        dict_to_return = {
155
            'rows_counted': 0
156
        }
157
        if in_cursor is not None:
158
            dict_to_return = {
159
                'columns': self.class_dbt.get_column_names(local_logger, self.timer, in_cursor),
160
                'result_set': self.class_dbt.fetch_executed_query(local_logger, self.timer,
161
                                                                  in_cursor),
162
                'rows_counted': in_cursor.rowcount,
163
            }
164
        return dict_to_return
165
166
    def initiate_logger_and_timer(self):
167
        # initiate logger
168
        self.class_ln.initiate_logger(self.parameters.output_log_file, self.script)
169
        # initiate localization specific for this script
170
        # define global timer to use
171
        self.timer = Timer(self.script,
172
                           text=self.locale.gettext('Time spent is {seconds}'),
173
                           logger=self.class_ln.logger.debug)
174
175
    def load_configuration(self):
176
        # load application configuration (inputs are defined into a json file)
177
        ref_folder = os.path.dirname(__file__).replace('db_extractor', 'config')
178
        config_file = os.path.join(ref_folder, 'db-extractor.json').replace('\\', '/')
179
        self.config = self.class_fo.fn_open_file_and_get_content(config_file)
180
        # get command line parameter values
181
        self.parameters = self.class_clam.parse_arguments(self.config['input_options'][self.script])
182
        # checking inputs, if anything is invalid an exit(1) will take place
183
        self.class_bn.fn_check_inputs(self.parameters)
184
        # checking inputs, if anything is invalid an exit(1) will take place
185
        self.class_bnfe.fn_check_inputs_specific(self.parameters)
186
187
    def load_extraction_sequence_and_dependencies(self):
188
        self.timer.start()
189
        self.file_extract_sequence = self.class_fo.fn_open_file_and_get_content(
190
            self.parameters.input_extracting_sequence_file, 'json')
191
        self.class_ln.logger.info(self.locale.gettext(
192
            'Configuration file name with extracting sequence(es) has been loaded'))
193
        self.timer.stop()
194
        # store file statistics
195
        self.class_fo.fn_store_file_statistics(self.class_ln.logger, self.timer,
196
                                               self.parameters.input_extracting_sequence_file,
197
                                               self.locale.gettext('Configuration file name with '
198
                                                                   + 'extracting sequence(es)'))
199
        # get the source system details from provided file
200
        self.timer.start()
201
        self.source_systems = self.class_fo.fn_open_file_and_get_content( \
202
            self.parameters.input_source_system_file, 'json')['Systems']
203
        self.class_ln.logger.info(self.locale.gettext('Source Systems file name has been loaded'))
204
        self.timer.stop()
205
        self.class_fo.fn_store_file_statistics(self.class_ln.logger, self.timer,
206
                                               self.parameters.input_source_system_file,
207
                                               self.locale.gettext('Source Systems file name'))
208
        # get the source system details from provided file
209
        self.timer.start()
210
        self.user_credentials = self.class_fo.fn_open_file_and_get_content(
211
            self.parameters.input_credentials_file, 'json')['Credentials']
212
        self.class_ln.logger.info(self.locale.gettext( \
213
            'Configuration file name with credentials has been loaded'))
214
        self.timer.stop()
215
        self.class_fo.fn_store_file_statistics(self.class_ln.logger, self.timer,
216
                                               self.parameters.input_credentials_file,
217
                                               self.locale.gettext( \
218
                                                   'Configuration file name with credentials'))
219
220
    def load_query(self, crt_query):
221
        self.timer.start()
222
        query = self.class_fo.fn_open_file_and_get_content(crt_query['input-query-file'], 'raw')
223
        feedback = self.locale.gettext('Generic query is: %s') \
224
            .replace('%s', self.class_bn.fn_multi_line_string_to_single(query))
225
        self.class_ln.logger.info(feedback)
226
        self.timer.stop()
227
        return query
228
229
    @staticmethod
230
    def pack_three_levels(in_session, in_query, in_sequence):
231
        return {
232
            'session': in_session,
233
            'query': in_query,
234
            'sequence': in_sequence,
235
        }
236
237
    def result_set_into_data_frame(self, local_logger, stats, in_dict):
238
        result_df = self.class_dbt.result_set_to_data_frame(
239
            local_logger, self.timer, stats['columns'], stats['result_set'])
240
        if 'additional-columns' in in_dict['session']:
241
            if in_dict['session']['additional-columns'] == 'inherit-from-parent':
242
                in_dict['session']['additional-columns'] = in_dict['query']['additional-columns']
243
            elif in_dict['session']['additional-columns'] == 'inherit-from-grand-parent':
244
                in_dict['session']['additional-columns'] = in_dict['sequence']['additional-columns']
245
            rdf = self.class_dbt.append_additional_columns_to_df(
246
                local_logger, self.timer, result_df, in_dict['session'])
247
        return rdf
0 ignored issues
show
introduced by
The variable rdf does not seem to be defined in case 'additional-columns' in SubscriptNode on line 240 is False. Are you sure this can never be the case?
Loading history...
248
249
    def set_default_starting_weekday(self, crt_session):
250
        week_starts_with_isoweekday = 1
251
        if 'start_isoweekday' in crt_session:
252
            week_starts_with_isoweekday = crt_session['start-isoweekday']
253
        return week_starts_with_isoweekday
254
255
    def set_default_parameter_rules(self, in_dict):
256
        # assumption is for either DICT or LIST values are numeric
257
        # in case text is given different rules have to be specified
258
        dictionary_to_return = {
259
            "dict-values-glue": ", ",
260
            "dict-values-prefix": "IN (",
261
            "dict-values-suffix": ")",
262
            "list-values-glue": ", ",
263
            "list-values-prefix": "",
264
            "list-values-suffix": ""
265
        }
266
        if 'parameters-handling-rules' in in_dict['session']:
267
            dictionary_to_return = in_dict['session']['parameters-handling-rules']
268
            if dictionary_to_return == 'inherit-from-parent':
269
                dictionary_to_return = in_dict['query']['parameters-handling-rules']
270
            elif dictionary_to_return == 'inherit-from-grand-parent':
271
                dictionary_to_return = in_dict['sequence']['parameters-handling-rules']
272
        return dictionary_to_return
273
274
    def store_result_set_to_disk(self, local_logger, in_data_frame, crt_session):
275
        output_file_setting_type = type(crt_session['output-file'])
276
        if output_file_setting_type == dict:
277
            self.class_dio.fn_store_data_frame_to_file(local_logger, self.timer, in_data_frame,
278
                                                       crt_session['output-file'])
279
            self.class_fo.fn_store_file_statistics(local_logger, self.timer,
280
                                                   crt_session['output-file']['name'],
281
                                                   self.locale.gettext('Output file name'))
282
        elif output_file_setting_type == list:
283
            for crt_output in crt_session['output-file']:
284
                self.class_dio.fn_store_data_frame_to_file(local_logger, self.timer,
285
                                                           in_data_frame, crt_output)
286
                self.class_fo.fn_store_file_statistics(local_logger, self.timer, crt_output['name'],
287
                                                       self.locale.gettext('Output file name'))
288