Completed
Push — master ( 55df2f...e17347 )
by Daniel
16s queued 12s
created

sources.db_extractor.BasicNeedsForExtractor   A

Complexity

Total Complexity 42

Size/Duplication

Total Lines 315
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 42
eloc 270
dl 0
loc 315
rs 9.0399
c 0
b 0
f 0

17 Methods

Rating   Name   Duplication   Size   Complexity  
A db_extractor.BasicNeedsForExtractor.BasicNeedsForExtractor.validate_extraction_sequence_file() 0 13 3
A db_extractor.BasicNeedsForExtractor.BasicNeedsForExtractor.fn_is_extraction_necessary() 0 21 4
A db_extractor.BasicNeedsForExtractor.BasicNeedsForExtractor.validate_current_source_system_properties() 0 9 1
A db_extractor.BasicNeedsForExtractor.BasicNeedsForExtractor.validate_user_and_establish_connection_details() 0 16 3
A db_extractor.BasicNeedsForExtractor.BasicNeedsForExtractor.validate_current_source_system() 0 25 4
A db_extractor.BasicNeedsForExtractor.BasicNeedsForExtractor.validate_extraction_sequence() 0 13 1
A db_extractor.BasicNeedsForExtractor.BasicNeedsForExtractor.fn_set_extract_behaviour() 0 13 3
A db_extractor.BasicNeedsForExtractor.BasicNeedsForExtractor.__init__() 0 5 1
A db_extractor.BasicNeedsForExtractor.BasicNeedsForExtractor.fn_validate_mandatory_properties() 0 10 3
A db_extractor.BasicNeedsForExtractor.BasicNeedsForExtractor.fn_check_inputs_specific() 0 4 1
A db_extractor.BasicNeedsForExtractor.BasicNeedsForExtractor.validate_user_secrets_file() 0 26 4
A db_extractor.BasicNeedsForExtractor.BasicNeedsForExtractor.validate_all_json_files_current() 0 40 3
A db_extractor.BasicNeedsForExtractor.BasicNeedsForExtractor.validate_all_json_files() 0 9 2
A db_extractor.BasicNeedsForExtractor.BasicNeedsForExtractor.validate_query_session() 0 9 1
A db_extractor.BasicNeedsForExtractor.BasicNeedsForExtractor.validate_user_secrets() 0 32 4
A db_extractor.BasicNeedsForExtractor.BasicNeedsForExtractor.validate_extraction_query() 0 12 1
A db_extractor.BasicNeedsForExtractor.BasicNeedsForExtractor.fn_is_extraction_necessary_additional() 0 17 3

How to fix   Complexity   

Complexity

Complex classes like sources.db_extractor.BasicNeedsForExtractor often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""
2
Basic Needs For Extractor class
3
4
Handling specific needs for Extractor script
5
"""
6
# package to facilitate time operations
7
from datetime import datetime
8
# package to add support for multi-language (i18n)
9
import gettext
10
# package to handle files/folders and related metadata/operations
11
import os
12
# package to facilitate working with directories and files
13
from pathlib import Path
14
# package to facilitate common operations
15
from db_extractor.BasicNeeds import BasicNeeds
16
17
18
class BasicNeedsForExtractor:
19
    connection_details = None
20
    class_bn = None
21
    locale = None
22
    str_ss = None
23
    srv = None
24
25
    def __init__(self, in_language='en_US'):
26
        self.class_bn = BasicNeeds(in_language)
27
        current_script = os.path.basename(__file__).replace('.py', '')
28
        lang_folder = os.path.join(os.path.dirname(__file__), current_script + '_Locale')
29
        self.locale = gettext.translation(current_script, lang_folder, languages=[in_language])
30
31
    def fn_check_inputs_specific(self, in_parameters):
32
        self.class_bn.fn_validate_single_value(in_parameters.input_source_system_file, 'file')
33
        self.class_bn.fn_validate_single_value(in_parameters.input_credentials_file, 'file')
34
        self.class_bn.fn_validate_single_value(in_parameters.input_extracting_sequence_file, 'file')
35
36
    def fn_validate_mandatory_properties(self, local_logger, who, properties_parent, in_list):
37
        is_valid = True
38
        for current_property in in_list:
39
            if current_property not in properties_parent:
40
                local_logger.error(self.locale.gettext(
41
                    '{who} does not contain "{current_property}" which is mandatory, '
42
                    + 'therefore extraction sequence will be ignored').replace('{who}', who)
43
                                   .replace('{current_property}', current_property))
44
                is_valid = False
45
        return is_valid
46
47
    def fn_is_extraction_necessary(self, local_logger, relevant_details):
48
        extraction_is_necessary = False
49
        local_logger.debug(self.locale.gettext('Extract behaviour is set to {extract_behaviour}')
50
                           .replace('{extract_behaviour}', relevant_details['extract-behaviour']))
51
        if relevant_details['extract-behaviour'] == 'skip-if-output-file-exists':
52
            if Path(relevant_details['output-csv-file']).is_file():
53
                local_logger.debug(self.locale.gettext(
54
                        'File {file_name} already exists, '
55
                        + 'so database extraction will not be performed')
56
                                   .replace('{file_name}', relevant_details['output-csv-file']))
57
            else:
58
                extraction_is_necessary = True
59
                local_logger.debug(self.locale.gettext(
60
                        'File {file_name} does not exist, '
61
                        + 'so database extraction has to be performed')
62
                                   .replace('{file_name}', relevant_details['output-csv-file']))
63
        elif relevant_details['extract-behaviour'] == 'overwrite-if-output-file-exists':
64
            extraction_is_necessary = True
65
            local_logger.debug(self.locale.gettext('Query extraction probably should be performed '
66
                                                   + '(other checks might be required)'))
67
        return extraction_is_necessary
68
69
    @staticmethod
70
    def fn_is_extraction_necessary_additional(local_logger, c_ph, c_fo, in_dict):
71
        if in_dict['session']['extract-overwrite-condition'] == 'inherit-from-parent':
72
            in_dict['session']['extract-overwrite-condition'] = \
73
                in_dict['query']['extract-overwrite-condition']
74
        elif in_dict['session']['extract-overwrite-condition'] == 'inherit-from-grand-parent':
75
            in_dict['session']['extract-overwrite-condition'] = \
76
                in_dict['sequence']['extract-overwrite-condition']
77
        ref_expr = in_dict['session']['extract-overwrite-condition']['reference-expression']
78
        reference_datetime = c_ph.eval_expression(
79
            local_logger, ref_expr, in_dict['session']['start-iso-weekday'])
80
        child_parent_expressions = c_ph.get_child_parent_expressions()
81
        deviation_original = child_parent_expressions.get(ref_expr.split('_')[1])
82
        r_dt = datetime.strptime(reference_datetime,
83
                                 c_ph.output_standard_formats.get(deviation_original))
84
        return c_fo.fn_get_file_datetime_verdict(
85
            local_logger, in_dict['file']['name'], 'last modified', r_dt)
86
87
    @staticmethod
88
    def fn_set_extract_behaviour(in_session):
89
        default_value = 'skip-if-output-file-exists'
90
        allowed_values = [
91
            'skip-if-output-file-exists',
92
            'overwrite-if-output-file-exists',
93
        ]
94
        if ('extract-behaviour' not in in_session) \
95
                or (in_session['extract-behaviour'] not in allowed_values):
96
            value_to_return = default_value
97
        else:
98
            value_to_return = in_session['extract-behaviour']
99
        return value_to_return
100
101
    def validate_all_json_files(self, local_logger, timer, extracting_sequences):
102
        timer.start()
103
        are_json_files_valid = False
104
        # validation of the extraction sequence file
105
        if self.validate_extraction_sequence_file(local_logger, extracting_sequences):
106
            are_json_files_valid = True
107
            local_logger.debug(self.locale.gettext('Relevant JSON files are valid'))
108
        timer.stop()
109
        return are_json_files_valid
110
111
    def validate_all_json_files_current(self, local_logger, timer, in_sequence, seq_index,
112
                                        in_source_systems, in_user_secrets):
113
        timer.start()
114
        # setting initial checks statuses
115
        can_proceed = {'Extraction'                    : self.validate_extraction_sequence(
116
            local_logger, in_sequence), 'Source System': False,
117
            'Source System Properties'                 : False, 'User Secrets': False,
118
            'User Secrets Properties'                  : False}
119
        # actual check
120
        if can_proceed['Extraction']:
121
            # just few values that's going to be used a lot
122
            srv = {
123
                'vdtp': in_sequence['server-vendor'] + ' ' + in_sequence['server-type'],
124
                'vdr' : in_sequence['server-vendor'],
125
                'typ' : in_sequence['server-type'],
126
                'grp' : in_sequence['server-group'],
127
                'lyr' : in_sequence['server-layer'],
128
            }
129
            can_proceed['Source System'] = self.validate_current_source_system(
130
                local_logger, srv, in_source_systems)
131
            if can_proceed['Source System']:
132
                # variable for source server details
133
                ss = in_source_systems[srv['vdr']][srv['typ']]['Server'][srv['grp']][srv['lyr']]
134
                self.str_ss = '"' + '", "'.join(srv.values()) + '"'
135
                can_proceed['Source System Properties'] = \
136
                    self.validate_current_source_system_properties(local_logger, ss)
137
                can_proceed['User Secrets'] = self.validate_user_secrets_file(
138
                    local_logger, srv, in_user_secrets)
139
                srv.update(can_proceed)
140
                srv.update(ss)
141
                srv.update({'ac': in_sequence['account-label']})
142
                can_proceed['User Secrets Properties'] = \
143
                    self.validate_user_and_establish_connection_details(
144
                        local_logger, srv, in_user_secrets)
145
        local_logger.debug(self.locale.gettext('For the sequence number {sequence_number} '
146
                                               + 'the relevant details for database connection '
147
                                               + 'have been verified')
148
                           .replace('{sequence_number}', str(seq_index)))
149
        timer.stop()
150
        return can_proceed
151
152
    def validate_extraction_query(self, local_logger, timer, in_extraction_sequence):
153
        timer.start()
154
        mandatory_props_q = [
155
            'input-query-file',
156
            'sessions',
157
        ]
158
        is_valid = self.fn_validate_mandatory_properties(local_logger,
159
                                                         self.locale.gettext('Extraction Query'),
160
                                                         in_extraction_sequence,
161
                                                         mandatory_props_q)
162
        timer.stop()
163
        return is_valid
164
165
    def validate_extraction_sequence_file(self, local_logger, in_extract_sequence):
166
        is_valid = True
167
        if type(in_extract_sequence) != list:
168
            local_logger.error(self.locale.gettext(
169
                'JSON extraction sequence is not a LIST and must be, therefore cannot be used'))
170
            is_valid = False
171
        elif len(in_extract_sequence) < 1:
172
            local_logger.error(self.locale.gettext(
173
                'Extraction sequence file name is a LIST '
174
                + 'but does not contain at least 1 extraction sequence, '
175
                + 'therefore cannot be used'))
176
            is_valid = False
177
        return is_valid
178
179
    def validate_extraction_sequence(self, local_logger, in_extraction_sequence):
180
        mandatory_props_e = [
181
            'server-vendor',
182
            'server-type',
183
            'server-group',
184
            'server-layer',
185
            'account-label',
186
            'queries',
187
        ]
188
        is_valid = self.fn_validate_mandatory_properties(
189
                local_logger, self.locale.gettext('Extraction Sequence'),
190
                in_extraction_sequence, mandatory_props_e)
191
        return is_valid
192
193
    def validate_query_session(self, local_logger, in_session):
194
        mandatory_properties = [
195
            'extract-behaviour',
196
            'output-file',
197
        ]
198
        is_valid = self.fn_validate_mandatory_properties(
199
                local_logger, self.locale.gettext('Query Session') + ' ' + self.str_ss,
200
                in_session, mandatory_properties)
201
        return is_valid
202
203
    def validate_current_source_system_properties(self, local_logger, in_source_system):
204
        mandatory_properties = [
205
            'ServerName',
206
            'ServerPort',
207
        ]
208
        is_valid = self.fn_validate_mandatory_properties(
209
                local_logger, self.locale.gettext('Source System') + ' ' + self.str_ss,
210
                in_source_system, mandatory_properties)
211
        return is_valid
212
213
    def validate_current_source_system(self, in_logger, in_seq, in_source_systems):
214
        can_proceed_s = self.fn_validate_mandatory_properties(
215
                in_logger, self.locale.gettext('Source Systems'),
216
                in_source_systems, [in_seq['vdr']])
217
        can_proceed_s1 = False
218
        if can_proceed_s:
219
            item_checked = in_source_systems[in_seq['vdr']]
220
            can_proceed_s1 = self.fn_validate_mandatory_properties(
221
                    in_logger, self.locale.gettext('Source Systems') + ' "' + in_seq['vdr'] + '"',
222
                    item_checked, [in_seq['typ']])
223
        can_proceed_s2 = False
224
        if can_proceed_s1:
225
            item_checked = in_source_systems[in_seq['vdr']][in_seq['typ']]['Server']
226
            can_proceed_s2 = self.fn_validate_mandatory_properties(
227
                    in_logger, self.locale.gettext('Source Systems') + ' "' + in_seq['vdr']
228
                               + '", "' + in_seq['typ'] + '", "Server" ',
229
                    item_checked, [in_seq['grp']])
230
        can_proceed_s3 = False
231
        if can_proceed_s2:
232
            item_checked = in_source_systems[in_seq['vdr']][in_seq['typ']]['Server'][in_seq['grp']]
233
            can_proceed_s3 = self.fn_validate_mandatory_properties(
234
                    in_logger, self.locale.gettext('Source Systems') + ' "' + in_seq['vdr']
235
                               + '", "' + in_seq['typ'] + '", "' + in_seq['grp'] + '", "Server" ',
236
                    item_checked, [in_seq['lyr']])
237
        return can_proceed_s and can_proceed_s1 and can_proceed_s2 and can_proceed_s3
238
239
    def validate_user_and_establish_connection_details(self, local_logger, in_dc, in_user):
240
        is_valid = False
241
        if in_dc['Source System Properties'] and in_dc['User Secrets']:
242
            # variable with credentials for source server
243
            u = in_user[in_dc['vdr']][in_dc['typ']][in_dc['grp']][in_dc['lyr']][in_dc['ac']]
244
            is_valid = self.validate_user_secrets(local_logger, u)
245
            self.connection_details = {
246
                'server-vendor-and-type': in_dc['vdtp'],
247
                'server-layer'          : in_dc['lyr'],
248
                'ServerName'            : in_dc['ServerName'],
249
                'ServerPort'            : int(in_dc['ServerPort']),
250
                'Username'              : u['Username'],
251
                'Name'                  : u['Name'],
252
                'Password'              : u['Password'],
253
            }
254
        return is_valid
255
256
    def validate_user_secrets(self, local_logger, in_user_secrets):
257
        mandatory_properties = [
258
            'Name',
259
            'Username',
260
            'Password',
261
        ]
262
        is_valid = self.fn_validate_mandatory_properties(local_logger,
263
                                                         self.locale.gettext('User Secrets')
264
                                                         + ' ' + self.str_ss,
265
                                                         in_user_secrets, mandatory_properties)
266
        if in_user_secrets['Name'] in ('login', 'Your Full Name Here'):
267
            local_logger.warning(
268
                    self.locale.gettext('For {str_user_secrets} your "Name" property '
269
                                        + 'has the default value: "{default_name_value}" '
270
                                        + 'which is unusual')
271
                        .replace('{str_user_secrets}', self.str_ss)
272
                        .replace('{default_name_value}', in_user_secrets['Name']))
273
        if in_user_secrets['Username'] in ('usrnme', 'your_username_goes_here'):
274
            local_logger.warning(
275
                    self.locale.gettext('For {str_user_secrets} your "Username" property '
276
                                        + 'has the default value: "{default_username_value}" '
277
                                        + 'which is unusual')
278
                        .replace('{str_user_secrets}', self.str_ss)
279
                        .replace('{default_username_value}', in_user_secrets['Username']))
280
        if in_user_secrets['Password'] in ('pwd', 'your_password_goes_here'):
281
            local_logger.warning(
282
                    self.locale.gettext('For {str_user_secrets} your "Password" property '
283
                                        + 'has the default value: "{default_password_value}" '
284
                                        + 'which is unusual')
285
                        .replace('{str_user_secrets}', self.str_ss)
286
                        .replace('{default_password_value}', in_user_secrets['Password']))
287
        return is_valid
288
289
    def validate_user_secrets_file(self, local_logger, in_seq, in_user_secrets):
290
        can_proceed_u = self.fn_validate_mandatory_properties(
291
                local_logger, self.locale.gettext('User Secrets'),
292
                in_user_secrets, [in_seq['vdr']])
293
        can_proceed_u1 = False
294
        if can_proceed_u:
295
            item_checked = in_user_secrets[in_seq['vdr']]
296
            can_proceed_u1 = self.fn_validate_mandatory_properties(
297
                    local_logger, self.locale.gettext('User Secrets') + ' "' + in_seq['vdr'] + '"',
298
                    item_checked, [in_seq['typ']])
299
        can_proceed_u2 = False
300
        if can_proceed_u1:
301
            item_checked = in_user_secrets[in_seq['vdr']][in_seq['typ']]
302
            can_proceed_u2 = self.fn_validate_mandatory_properties(
303
                    local_logger, self.locale.gettext('User Secrets') + ' "' + in_seq['vdr']
304
                                  + '", "' + in_seq['typ'] + '", "Server" ',
305
                    item_checked, [in_seq['grp']])
306
        can_proceed_u3 = False
307
        if can_proceed_u2:
308
            item_checked = in_user_secrets[in_seq['vdr']][in_seq['typ']][in_seq['grp']]
309
            can_proceed_u3 = self.fn_validate_mandatory_properties(
310
                    local_logger, self.locale.gettext('User Secrets') + ' "' + in_seq['vdr']
311
                                  + '", "' + in_seq['typ'] + '", "'
312
                                  + in_seq['grp'] + '", "Server" ',
313
                    item_checked, [in_seq['lyr']])
314
        return can_proceed_u and can_proceed_u1 and can_proceed_u2 and can_proceed_u3
315