Passed
Push — development/test ( da436f...337cf1 )
by Daniel
01:22
created

ExtractNeeds.close_connection()   A

Complexity

Conditions 1

Size

Total Lines 6
Code Lines 6

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 6
dl 0
loc 6
rs 10
c 0
b 0
f 0
cc 1
nop 2
1
"""
2
main class to support Extract script
3
"""
4
# useful methods to measure time performance by small pieces of code
5
from codetiming import Timer
6
# package to facilitate operating system operations
7
import os
8
# package to add support for multi-language (i18n)
9
import gettext
10
# package to facilitate working with directories and files
11
from pathlib import Path
12
# custom classes specific to this project
13
from common.BasicNeeds import BasicNeeds
14
from common.CommandLineArgumentsManagement import CommandLineArgumentsManagement
15
from common.DataManipulator import DataManipulator
16
from common.FileOperations import FileOperations
17
from common.LoggingNeeds import LoggingNeeds
18
from db_extractor.BasicNeedsForExtractor import BasicNeedsForExtractor
19
from db_extractor.DatabaseTalker import DatabaseTalker
20
from db_extractor.ParameterHandling import ParameterHandling
21
22
23
class ExtractNeeds:
24
    class_bn = None
25
    class_bnfe = None
26
    class_clam = None
27
    class_dbt = None
28
    class_dm = None
29
    class_fo = None
30
    class_ln = None
31
    class_ph = None
32
    config = None
33
    file_extract_sequence = None
34
    locale = None
35
    parameters = None
36
    script = None
37
    source_systems = None
38
    timer = None
39
    user_credentials = None
40
41
    def __init__(self, destination_script, default_language='en_US'):
42
        self.script = destination_script
43
        current_file_basename = os.path.basename(__file__).replace('.py', '')
44
        lang_folder = os.path.join(os.path.dirname(__file__), current_file_basename + '_Locale')
45
        self.locale = gettext.translation(current_file_basename, lang_folder,
46
                                          languages=[default_language])
47
        # instantiate Basic Needs class
48
        self.class_bn = BasicNeeds(default_language)
49
        # instantiate Extractor Specific Needs class
50
        self.class_bnfe = BasicNeedsForExtractor(default_language)
51
        # instantiate File Operations class
52
        self.class_fo = FileOperations(default_language)
53
        # instantiate File Operations class
54
        self.class_dbt = DatabaseTalker(default_language)
55
        # instantiate Data Manipulator class, useful to manipulate data frames
56
        self.class_dm = DataManipulator(default_language)
57
        # instantiate Command Line Arguments class
58
        self.class_clam = CommandLineArgumentsManagement(default_language)
59
        # instantiate Logger class
60
        self.class_ln = LoggingNeeds()
61
        # instantiate Parameter Handling class
62
        self.class_ph = ParameterHandling(default_language)
63
64
    def close_connection(self, local_logger):
65
        self.timer.start()
66
        local_logger.info(self.locale.gettext('Closing DB connection'))
67
        self.class_dbt.conn.close()
68
        local_logger.info(self.locale.gettext('Closing DB completed'))
69
        self.timer.stop()
70
71
    def close_cursor(self, local_logger, in_cursor):
72
        self.timer.start()
73
        local_logger.info(self.locale.gettext('Free DB result-set started'))
74
        in_cursor.close()
75
        local_logger.info(self.locale.gettext('Free DB result-set completed'))
76
        self.timer.stop()
77
78
    def evaluate_if_extraction_is_required(self, crt_session):
79
        extraction_required = False
80
        if type(crt_session['output-file']) == dict:
81
            extraction_required = \
82
                self.evaluate_if_extraction_is_required_for_single_file(crt_session,
83
                                                                        crt_session['output-file'])
84
        elif type(crt_session['output-file']) == list:
85
            evaluated_extraction = {}
86
            for crt_file in crt_session['output-file']:
87
                crt_eval = self.evaluate_if_extraction_is_required_for_single_file(crt_session,
88
                                                                                   crt_file)
89
                evaluated_extraction.update({str(crt_file['name']): crt_eval})
90
            extraction_required = self.class_bn.fn_evaluate_dict_values(evaluated_extraction)
91
            self.class_ln.logger.debug(evaluated_extraction)
92
            overall_verdict = self.locale.gettext('not required')
93
            if extraction_required:
94
                overall_verdict = self.locale.gettext('required')
95
            self.class_ln.logger.debug(self.locale.gettext( \
96
                    'Overall new verdict after considering multiple files is: {overall_verdict}') \
97
                                       .replace('{overall_verdict}', overall_verdict))
98
        return extraction_required
99
100
    def evaluate_if_extraction_is_required_for_single_file(self, crt_session, crt_file):
101
        crt_file['name'] = self.class_ph.eval_expression(self.class_ln.logger,
102
                                                         crt_file['name'],
103
                                                         crt_session['start-isoweekday'])
104
        e_dict = {
105
            'extract-behaviour': crt_session['extract-behaviour'],
106
            'output-csv-file'  : crt_file['name'],
107
        }
108
        extraction_required = \
109
            self.class_bnfe.fn_is_extraction_necessary(self.class_ln.logger, e_dict)
110
        if crt_session['extract-behaviour'] == 'overwrite-if-output-file-exists' \
111
                and 'extract-overwrite-condition' in crt_session \
112
                and Path(crt_file['name']).is_file():
113
            fv = self.class_bnfe.fn_is_extraction_neccesary_additional(self.class_ln.logger,
114
                                                                       self.class_ph,
115
                                                                       self.class_fo,
116
                                                                       crt_session, crt_file)
117
            extraction_required = False
118
            new_verdict = self.locale.gettext('not required')
119
            if fv == self.class_fo.lcl.gettext('older'):
120
                extraction_required = True
121
                new_verdict = self.locale.gettext('required')
122
            self.class_ln.logger.debug(self.locale.gettext( \
123
                    'Additional evaluation took place and new verdict is: {new_verdict}') \
124
                                       .replace('{new_verdict}', new_verdict))
125
        return extraction_required
126
127
    def extract_query_to_result_set(self, local_logger, in_cursor, in_dictionary):
128
        this_session = in_dictionary['session']
129
        this_query = in_dictionary['query']
130
        # get query parameters into a tuple
131
        tuple_parameters = self.class_ph.handle_query_parameters(local_logger, this_session,
132
                                                                 this_session['start-isoweekday'])
133
        # measure expected number of parameters
134
        expected_number_of_parameters = str(this_query).count('%s')
135
        # simulate final query to log (useful for debugging purposes)
136
        simulated_query = self.class_ph.simulate_final_query(local_logger, self.timer, this_query,
137
                                                             expected_number_of_parameters,
138
                                                             tuple_parameters)
139
        local_logger.info(self.locale.gettext('Query with parameters interpreted is: %s') \
140
                          .replace('%s',
141
                                   self.class_bn.fn_multi_line_string_to_single(simulated_query)))
142
        # actual execution of the query
143
        in_cursor = self.class_dbt.execute_query(local_logger, self.timer, in_cursor, this_query,
144
                                                 expected_number_of_parameters, tuple_parameters)
145
        # bringing the information from server (data transfer)
146
        return {
147
            'columns'   : self.class_dbt.get_column_names(local_logger, self.timer, in_cursor),
148
            'result_set': self.class_dbt.fetch_executed_query(local_logger, self.timer, in_cursor),
149
            'rows_counted': in_cursor.rowcount,
150
        }
151
152
    def initiate_logger_and_timer(self):
153
        # initiate logger
154
        self.class_ln.initiate_logger(self.parameters.output_log_file, self.script)
155
        # initiate localization specific for this script
156
        # define global timer to use
157
        self.timer = Timer(self.script,
158
                           text = self.locale.gettext('Time spent is {seconds}'),
159
                           logger = self.class_ln.logger.debug)
160
161
    def load_configuration(self):
162
        # load application configuration (inputs are defined into a json file)
163
        ref_folder = os.path.dirname(__file__).replace('db_extractor', 'config')
164
        config_file = os.path.join(ref_folder, 'db-extractor.json').replace('\\', '/')
165
        self.config = self.class_fo.fn_open_file_and_get_content(config_file)
166
        # get command line parameter values
167
        self.parameters = self.class_clam.parse_arguments(self.config['input_options'][self.script])
168
        # checking inputs, if anything is invalid an exit(1) will take place
169
        self.class_bn.fn_check_inputs(self.parameters)
170
        # checking inputs, if anything is invalid an exit(1) will take place
171
        self.class_bnfe.fn_check_inputs_specific(self.parameters)
172
173
    def load_extraction_sequence_and_dependencies(self):
174
        self.timer.start()
175
        self.file_extract_sequence = self.class_fo.fn_open_file_and_get_content(
176
                self.parameters.input_extracting_sequence_file, 'json')
177
        self.class_ln.logger.info(self.locale.gettext( \
178
                'Configuration file name with extracting sequence(es) has been loaded'))
179
        self.timer.stop()
180
        # store file statistics
181
        self.class_fo.fn_store_file_statistics(self.class_ln.logger, self.timer,
182
                                               self.parameters.input_extracting_sequence_file,
183
                                               self.locale.gettext('Configuration file name with '
184
                                                                   + 'extracting sequence(es)'))
185
        # get the source system details from provided file
186
        self.timer.start()
187
        self.source_systems = self.class_fo.fn_open_file_and_get_content( \
188
                self.parameters.input_source_system_file, 'json')['Systems']
189
        self.class_ln.logger.info(self.locale.gettext('Source Systems file name has been loaded'))
190
        self.timer.stop()
191
        self.class_fo.fn_store_file_statistics(self.class_ln.logger, self.timer,
192
                                               self.parameters.input_source_system_file,
193
                                               self.locale.gettext('Source Systems file name'))
194
        # get the source system details from provided file
195
        self.timer.start()
196
        self.user_credentials = self.class_fo.fn_open_file_and_get_content(
197
                self.parameters.input_credentials_file, 'json')['Credentials']
198
        self.class_ln.logger.info(self.locale.gettext( \
199
                'Configuration file name with credentials has been loaded'))
200
        self.timer.stop()
201
        self.class_fo.fn_store_file_statistics(self.class_ln.logger, self.timer,
202
                                               self.parameters.input_credentials_file,
203
                                               self.locale.gettext( \
204
                                                    'Configuration file name with credentials'))
205
206
    def load_query(self, crt_query):
207
        self.timer.start()
208
        query = self.class_fo.fn_open_file_and_get_content(crt_query['input-query-file'], 'raw')
209
        feedback = self.locale.gettext('Generic query is: %s') \
210
            .replace('%s', self.class_bn.fn_multi_line_string_to_single(query))
211
        self.class_ln.logger.info(feedback)
212
        self.timer.stop()
213
        return query
214
215
    def result_set_into_data_frame(self, local_logger, stats, crt_session):
216
        result_df = self.class_dbt.result_set_to_data_frame(local_logger, self.timer,
217
                                                            stats['columns'], stats['result_set'])
218
        rdf = self.class_dbt.append_additional_columns_to_df(local_logger, self.timer,
219
                                                             result_df, crt_session)
220
        return rdf
221
222
    def set_default_starting_weekday(self, crt_session):
223
        if 'start-isoweekday' not in crt_session:
224
            week_starts_with_isoweekday = 1
225
            if 'start_isoweekday' in crt_session:
226
                week_starts_with_isoweekday = crt_session['start_isoweekday']
227
        return week_starts_with_isoweekday
0 ignored issues
show
introduced by
The variable week_starts_with_isoweekday does not seem to be defined in case 'start-isoweekday' not in crt_session on line 223 is False. Are you sure this can never be the case?
Loading history...
228
229
    def set_default_parameter_rules(self, crt_session):
230
        if 'parameters-handling-rules' in crt_session:
231
            dictionary_to_return = crt_session['parameters-handling-rules']
232
        else:
233
            # assumption is for either DICT or LIST values are numeric
234
            # in case text is given different rules have to be specified
235
            dictionary_to_return  = {
236
                "dict-values-glue"  : ", ",
237
                "dict-values-prefix": "IN (",
238
                "dict-values-suffix": ")",
239
                "list-values-glue"  : ", ",
240
                "list-values-prefix": "",
241
                "list-values-suffix": ""
242
            }
243
        return dictionary_to_return
244
245
    def store_result_set_to_disk(self, local_logger, in_data_frame, crt_session):
246
        output_file_setting_type = type(crt_session['output-file'])
247
        if output_file_setting_type == dict:
248
            self.class_dm.fn_store_data_frame_to_file(local_logger, self.timer, in_data_frame,
249
                                                      crt_session['output-file'])
250
            self.class_fo.fn_store_file_statistics(local_logger, self.timer,
251
                                                   crt_session['output-file']['name'],
252
                                                   self.locale.gettext('Output file name'))
253
        elif output_file_setting_type == list:
254
            for crt_output in crt_session['output-file']:
255
                self.class_dm.fn_store_data_frame_to_file(local_logger, self.timer,
256
                                                          in_data_frame, crt_output)
257
                self.class_fo.fn_store_file_statistics(local_logger, self.timer, crt_output['name'],
258
                                                       self.locale.gettext('Output file name'))
259