db_extractor.BasicNeedsForExtractor   B
last analyzed

Complexity

Total Complexity 45

Size/Duplication

Total Lines 335
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 284
dl 0
loc 335
rs 8.8
c 0
b 0
f 0
wmc 45

18 Methods

Rating   Name   Duplication   Size   Complexity  
A BasicNeedsForExtractor.fn_is_extraction_necessary() 0 21 4
A BasicNeedsForExtractor.__init__() 0 9 1
A BasicNeedsForExtractor.fn_validate_mandatory_properties() 0 10 3
A BasicNeedsForExtractor.fn_check_inputs_specific() 0 4 1
A BasicNeedsForExtractor.fn_is_extraction_necessary_additional() 0 17 3
A BasicNeedsForExtractor.validate_extraction_sequence_file() 0 13 3
A BasicNeedsForExtractor.validate_current_source_system_properties() 0 9 1
A BasicNeedsForExtractor.validate_user_and_establish_connection_details() 0 17 3
A BasicNeedsForExtractor.validate_current_source_system() 0 25 4
A BasicNeedsForExtractor.validate_extraction_sequence() 0 13 1
A BasicNeedsForExtractor.fn_manage_user_settings() 0 6 3
A BasicNeedsForExtractor.fn_manage_individual_user_settings() 0 14 4
A BasicNeedsForExtractor.validate_all_json_files_current() 0 40 3
A BasicNeedsForExtractor.validate_all_json_files() 0 9 2
A BasicNeedsForExtractor.validate_query_session() 0 9 1
A BasicNeedsForExtractor.validate_extraction_query() 0 12 1
A BasicNeedsForExtractor.validate_user_secrets_file() 0 25 4
A BasicNeedsForExtractor.validate_user_secrets() 0 40 3

How to fix   Complexity   

Complexity

Complex classes like db_extractor.BasicNeedsForExtractor often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""
2
Basic Needs For Extractor class
3
4
Handling specific needs for Extractor script
5
"""
6
# package to facilitate time operations
7
from datetime import datetime
8
# package to add support for multi-language (i18n)
9
import gettext
10
# package to handle files/folders and related metadata/operations
11
import os
12
# package to facilitate working with directories and files
13
from pathlib import Path
14
# package to facilitate common operations
15
from db_extractor.BasicNeeds import BasicNeeds
16
17
18
class BasicNeedsForExtractor:
19
    connection_details = None
20
    class_bn = None
21
    locale = None
22
    str_ss = None
23
    srv = None
24
25
    def __init__(self, in_language='en_US'):
26
        self.class_bn = BasicNeeds(in_language)
27
        file_parts = os.path.normpath(os.path.abspath(__file__)).replace('\\', os.path.altsep)\
28
            .split(os.path.altsep)
29
        locale_domain = file_parts[(len(file_parts)-1)].replace('.py', '')
30
        locale_folder = os.path.normpath(os.path.join(
31
            os.path.join(os.path.altsep.join(file_parts[:-2]), 'project_locale'), locale_domain))
32
        self.locale = gettext.translation(locale_domain, localedir=locale_folder,
33
                                          languages=[in_language], fallback=True)
34
35
    def fn_check_inputs_specific(self, in_parameters):
36
        self.class_bn.fn_validate_single_value(in_parameters.input_source_system_file, 'file')
37
        self.class_bn.fn_validate_single_value(in_parameters.input_credentials_file, 'file')
38
        self.class_bn.fn_validate_single_value(in_parameters.input_extracting_sequence_file, 'file')
39
40
    def fn_validate_mandatory_properties(self, local_logger, who, properties_parent, in_list):
41
        is_valid = True
42
        for current_property in in_list:
43
            if current_property not in properties_parent:
44
                local_logger.error(self.locale.gettext(
45
                    '{who} does not contain "{current_property}" which is mandatory, '
46
                    + 'therefore extraction sequence will be ignored').replace('{who}', who)
47
                                   .replace('{current_property}', current_property))
48
                is_valid = False
49
        return is_valid
50
51
    def fn_is_extraction_necessary(self, local_logger, relevant_details):
52
        extraction_is_necessary = False
53
        local_logger.debug(self.locale.gettext('Extract behaviour is set to {extract_behaviour}')
54
                           .replace('{extract_behaviour}', relevant_details['extract-behaviour']))
55
        if relevant_details['extract-behaviour'] == 'skip-if-output-file-exists':
56
            if Path(relevant_details['output-csv-file']).is_file():
57
                local_logger.debug(self.locale.gettext(
58
                        'File {file_name} already exists, '
59
                        + 'so database extraction will not be performed')
60
                                   .replace('{file_name}', relevant_details['output-csv-file']))
61
            else:
62
                extraction_is_necessary = True
63
                local_logger.debug(self.locale.gettext(
64
                        'File {file_name} does not exist, '
65
                        + 'so database extraction has to be performed')
66
                                   .replace('{file_name}', relevant_details['output-csv-file']))
67
        elif relevant_details['extract-behaviour'] == 'overwrite-if-output-file-exists':
68
            extraction_is_necessary = True
69
            local_logger.debug(self.locale.gettext('Query extraction probably should be performed '
70
                                                   + '(other checks might be required)'))
71
        return extraction_is_necessary
72
73
    @staticmethod
74
    def fn_is_extraction_necessary_additional(local_logger, c_ph, c_fo, in_dict):
75
        if in_dict['session']['extract-overwrite-condition'] == 'inherit-from-parent':
76
            in_dict['session']['extract-overwrite-condition'] = \
77
                in_dict['query']['extract-overwrite-condition']
78
        elif in_dict['session']['extract-overwrite-condition'] == 'inherit-from-grand-parent':
79
            in_dict['session']['extract-overwrite-condition'] = \
80
                in_dict['sequence']['extract-overwrite-condition']
81
        ref_expr = in_dict['session']['extract-overwrite-condition']['reference-expression']
82
        reference_datetime = c_ph.eval_expression(
83
            local_logger, ref_expr, in_dict['session']['start-iso-weekday'])
84
        child_parent_expressions = c_ph.get_child_parent_expressions()
85
        deviation_original = child_parent_expressions.get(ref_expr.split('_')[1])
86
        r_dt = datetime.strptime(
87
            reference_datetime, c_ph.output_standard_formats.get(deviation_original))
88
        return c_fo.fn_get_file_datetime_verdict(
89
            local_logger, in_dict['file']['name'], 'last modified', datetime.timestamp(r_dt))
90
91
    def fn_manage_individual_user_settings(self, local_logger, u, current_key):
92
        if current_key in u['Storage']:
93
            if u['Storage'][current_key] == 'environment variable':  # handling special value
94
                local_logger.debug(self.locale.gettext(
95
                    'As {current_key} has been set to be an "environment variable"'
96
                    + ', value will be read using {environment_variable_name}')
97
                                   .replace('{current_key}', current_key)
98
                                   .replace('{environment_variable_name}', u[current_key]))
99
                env_var_value = os.getenv(u[current_key])
100
                if env_var_value is None:
101
                    local_logger.error('No such {environment_variable_name} has been found')
102
                else:
103
                    u[current_key] = env_var_value
104
        return u
105
106
    def fn_manage_user_settings(self, local_logger, u):
107
        if 'Storage' in u:
108
            known_account_keys = ['Name', 'Password', 'Username']
109
            for current_key in known_account_keys:
110
                u = self.fn_manage_individual_user_settings(local_logger, u, current_key)
111
        return u
112
113
    def validate_all_json_files(self, local_logger, timer, extracting_sequences):
114
        timer.start()
115
        are_json_files_valid = False
116
        # validation of the extraction sequence file
117
        if self.validate_extraction_sequence_file(local_logger, extracting_sequences):
118
            are_json_files_valid = True
119
            local_logger.debug(self.locale.gettext('Relevant JSON files are valid'))
120
        timer.stop()
121
        return are_json_files_valid
122
123
    def validate_all_json_files_current(self, local_logger, timer, in_sequence, seq_index,
124
                                        in_source_systems, in_user_secrets):
125
        timer.start()
126
        # setting initial checks statuses
127
        can_proceed = {'Extraction'                    : self.validate_extraction_sequence(
128
            local_logger, in_sequence), 'Source System': False,
129
            'Source System Properties'                 : False, 'User Secrets': False,
130
            'User Secrets Properties'                  : False}
131
        # actual check
132
        if can_proceed['Extraction']:
133
            # just few values that's going to be used a lot
134
            srv = {
135
                'vdtp': in_sequence['server-vendor'] + ' ' + in_sequence['server-type'],
136
                'vdr' : in_sequence['server-vendor'],
137
                'typ' : in_sequence['server-type'],
138
                'grp' : in_sequence['server-group'],
139
                'lyr' : in_sequence['server-layer'],
140
            }
141
            can_proceed['Source System'] = self.validate_current_source_system(
142
                local_logger, srv, in_source_systems)
143
            if can_proceed['Source System']:
144
                # variable for source server details
145
                ss = in_source_systems[srv['vdr']][srv['typ']]['Server'][srv['grp']][srv['lyr']]
146
                self.str_ss = '"' + '", "'.join(srv.values()) + '"'
147
                can_proceed['Source System Properties'] = \
148
                    self.validate_current_source_system_properties(local_logger, ss)
149
                can_proceed['User Secrets'] = self.validate_user_secrets_file(
150
                    local_logger, srv, in_user_secrets)
151
                srv.update(can_proceed)
152
                srv.update(ss)
153
                srv.update({'ac': in_sequence['account-label']})
154
                can_proceed['User Secrets Properties'] = \
155
                    self.validate_user_and_establish_connection_details(
156
                        local_logger, srv, in_user_secrets)
157
        local_logger.debug(self.locale.gettext('For the sequence number {sequence_number} '
158
                                               + 'the relevant details for database connection '
159
                                               + 'have been verified')
160
                           .replace('{sequence_number}', str(seq_index)))
161
        timer.stop()
162
        return can_proceed
163
164
    def validate_extraction_query(self, local_logger, timer, in_extraction_sequence):
165
        timer.start()
166
        mandatory_props_q = [
167
            'input-query-file',
168
            'sessions',
169
        ]
170
        is_valid = self.fn_validate_mandatory_properties(local_logger,
171
                                                         self.locale.gettext('Extraction Query'),
172
                                                         in_extraction_sequence,
173
                                                         mandatory_props_q)
174
        timer.stop()
175
        return is_valid
176
177
    def validate_extraction_sequence_file(self, local_logger, in_extract_sequence):
178
        is_valid = True
179
        if type(in_extract_sequence) != list:
180
            local_logger.error(self.locale.gettext(
181
                'JSON extraction sequence is not a LIST and must be, therefore cannot be used'))
182
            is_valid = False
183
        elif len(in_extract_sequence) < 1:
184
            local_logger.error(self.locale.gettext(
185
                'Extraction sequence file name is a LIST '
186
                + 'but does not contain at least 1 extraction sequence, '
187
                + 'therefore cannot be used'))
188
            is_valid = False
189
        return is_valid
190
191
    def validate_extraction_sequence(self, local_logger, in_extraction_sequence):
192
        mandatory_props_e = [
193
            'server-vendor',
194
            'server-type',
195
            'server-group',
196
            'server-layer',
197
            'account-label',
198
            'queries',
199
        ]
200
        is_valid = self.fn_validate_mandatory_properties(
201
                local_logger, self.locale.gettext('Extraction Sequence'),
202
                in_extraction_sequence, mandatory_props_e)
203
        return is_valid
204
205
    def validate_query_session(self, local_logger, in_session):
206
        mandatory_properties = [
207
            'extract-behaviour',
208
            'output-file',
209
        ]
210
        is_valid = self.fn_validate_mandatory_properties(
211
                local_logger, self.locale.gettext('Query Session') + ' ' + self.str_ss,
212
                in_session, mandatory_properties)
213
        return is_valid
214
215
    def validate_current_source_system_properties(self, local_logger, in_source_system):
216
        mandatory_properties = [
217
            'ServerName',
218
            'ServerPort',
219
        ]
220
        is_valid = self.fn_validate_mandatory_properties(
221
                local_logger, self.locale.gettext('Source System') + ' ' + self.str_ss,
222
                in_source_system, mandatory_properties)
223
        return is_valid
224
225
    def validate_current_source_system(self, in_logger, in_seq, in_source_systems):
226
        can_proceed_s = self.fn_validate_mandatory_properties(
227
                in_logger, self.locale.gettext('Source Systems'),
228
                in_source_systems, [in_seq['vdr']])
229
        can_proceed_s1 = False
230
        if can_proceed_s:
231
            item_checked = in_source_systems[in_seq['vdr']]
232
            can_proceed_s1 = self.fn_validate_mandatory_properties(
233
                    in_logger, self.locale.gettext('Source Systems') + ' "' + in_seq['vdr'] + '"',
234
                    item_checked, [in_seq['typ']])
235
        can_proceed_s2 = False
236
        if can_proceed_s1:
237
            item_checked = in_source_systems[in_seq['vdr']][in_seq['typ']]['Server']
238
            can_proceed_s2 = self.fn_validate_mandatory_properties(
239
                    in_logger, self.locale.gettext('Source Systems') + ' "' + in_seq['vdr']
240
                    + '", "' + in_seq['typ'] + '", "Server" ',
241
                    item_checked, [in_seq['grp']])
242
        can_proceed_s3 = False
243
        if can_proceed_s2:
244
            item_checked = in_source_systems[in_seq['vdr']][in_seq['typ']]['Server'][in_seq['grp']]
245
            can_proceed_s3 = self.fn_validate_mandatory_properties(
246
                    in_logger, self.locale.gettext('Source Systems') + ' "' + in_seq['vdr']
247
                    + '", "' + in_seq['typ'] + '", "' + in_seq['grp'] + '", "Server" ',
248
                    item_checked, [in_seq['lyr']])
249
        return can_proceed_s and can_proceed_s1 and can_proceed_s2 and can_proceed_s3
250
251
    def validate_user_and_establish_connection_details(self, local_logger, in_dc, in_user):
252
        is_valid = False
253
        if in_dc['Source System Properties'] and in_dc['User Secrets']:
254
            # variable with credentials for source server
255
            raw_user_settings = in_user[in_dc['vdr']][in_dc['typ']][in_dc['grp']][in_dc['lyr']][in_dc['ac']]
256
            u = self.fn_manage_user_settings(local_logger, raw_user_settings)
257
            is_valid = self.validate_user_secrets(local_logger, u)
258
            self.connection_details = {
259
                'server-vendor-and-type': in_dc['vdtp'],
260
                'server-layer'          : in_dc['lyr'],
261
                'ServerName'            : in_dc['ServerName'],
262
                'ServerPort'            : int(in_dc['ServerPort']),
263
                'Username'              : u['Username'],
264
                'Name'                  : u['Name'],
265
                'Password'              : u['Password'],
266
            }
267
        return is_valid
268
269
    def validate_user_secrets(self, local_logger, in_user_secrets):
270
        mandatory_properties_dict = {
271
            'Name': {
272
                'tuple': ['login', 'Your Full Name Here'],
273
                'variable': {
274
                    'name': 'default_name_value',
275
                    'value': in_user_secrets['Name']
276
                }
277
            },
278
            'Username': {
279
                'tuple': ['usrnme', 'your_username_goes_here'],
280
                'variable': {
281
                    'name': 'default_username_value',
282
                    'value': in_user_secrets['Username']
283
                }
284
            },
285
            'Password': {
286
                'tuple': ['pwd', 'your_password_goes_here'],
287
                'variable': {
288
                    'name': 'default_password_value',
289
                    'value': in_user_secrets['Password']
290
                }
291
            }
292
        }
293
        mandatory_properties = list(mandatory_properties_dict.keys())
294
        is_valid = self.fn_validate_mandatory_properties(local_logger,
295
                                                         self.locale.gettext('User Secrets')
296
                                                         + ' ' + self.str_ss,
297
                                                         in_user_secrets, mandatory_properties)
298
        for current_property in mandatory_properties_dict:
299
            if in_user_secrets[current_property] in tuple(mandatory_properties_dict[current_property]['tuple']):
300
                local_logger.warning(
301
                    self.locale.gettext('For {str_user_secrets} your "' + current_property + '" property '
302
                                        + 'has the default value: "{'
303
                                        + mandatory_properties_dict[current_property]['variable']['name']
304
                                        + '}" which is unusual')
305
                        .replace('{str_user_secrets}', self.str_ss)
306
                        .replace(mandatory_properties_dict[current_property]['variable']['name']
307
                                 , mandatory_properties_dict[current_property]['variable']['value']))
308
        return is_valid
309
310
    def validate_user_secrets_file(self, local_logger, in_seq, in_user_secrets):
311
        can_proceed_u = self.fn_validate_mandatory_properties(
312
                local_logger, self.locale.gettext('User Secrets'),
313
                in_user_secrets, [in_seq['vdr']])
314
        can_proceed_u1 = False
315
        if can_proceed_u:
316
            item_checked = in_user_secrets[in_seq['vdr']]
317
            can_proceed_u1 = self.fn_validate_mandatory_properties(
318
                    local_logger, self.locale.gettext('User Secrets') + ' "' + in_seq['vdr'] + '"',
319
                    item_checked, [in_seq['typ']])
320
        can_proceed_u2 = False
321
        if can_proceed_u1:
322
            item_checked = in_user_secrets[in_seq['vdr']][in_seq['typ']]
323
            can_proceed_u2 = self.fn_validate_mandatory_properties(
324
                    local_logger, self.locale.gettext('User Secrets') + ' "' + in_seq['vdr']
325
                    + '", "' + in_seq['typ'] + '", "Server" ',
326
                    item_checked, [in_seq['grp']])
327
        can_proceed_u3 = False
328
        if can_proceed_u2:
329
            item_checked = in_user_secrets[in_seq['vdr']][in_seq['typ']][in_seq['grp']]
330
            can_proceed_u3 = self.fn_validate_mandatory_properties(
331
                    local_logger, self.locale.gettext('User Secrets') + ' "' + in_seq['vdr']
332
                    + '", "' + in_seq['typ'] + '", "' + in_seq['grp'] + '", "Server" ',
333
                    item_checked, [in_seq['lyr']])
334
        return can_proceed_u and can_proceed_u1 and can_proceed_u2 and can_proceed_u3
335