Passed
Push — master ( fb209d...a1b1b6 )
by Daniel
04:57 queued 03:18
created

db_extractor.BasicNeedsForExtractor   B

Complexity

Total Complexity 47

Size/Duplication

Total Lines 333
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 287
dl 0
loc 333
rs 8.64
c 0
b 0
f 0
wmc 47

17 Methods

Rating   Name   Duplication   Size   Complexity  
A BasicNeedsForExtractor.validate_extraction_sequence_file() 0 13 3
A BasicNeedsForExtractor.fn_is_extraction_necessary() 0 21 4
A BasicNeedsForExtractor.validate_current_source_system_properties() 0 9 1
A BasicNeedsForExtractor.validate_current_source_system() 0 25 4
A BasicNeedsForExtractor.validate_extraction_sequence() 0 13 1
A BasicNeedsForExtractor.fn_set_extract_behaviour() 0 13 3
A BasicNeedsForExtractor.__init__() 0 9 1
A BasicNeedsForExtractor.fn_validate_mandatory_properties() 0 10 3
A BasicNeedsForExtractor.fn_check_inputs_specific() 0 4 1
A BasicNeedsForExtractor.validate_all_json_files_current() 0 40 3
A BasicNeedsForExtractor.validate_all_json_files() 0 9 2
A BasicNeedsForExtractor.validate_query_session() 0 9 1
A BasicNeedsForExtractor.validate_extraction_query() 0 12 1
A BasicNeedsForExtractor.fn_is_extraction_necessary_additional() 0 17 3
B BasicNeedsForExtractor.validate_user_and_establish_connection_details() 0 31 8
A BasicNeedsForExtractor.validate_user_secrets_file() 0 25 4
A BasicNeedsForExtractor.validate_user_secrets() 0 32 4

How to fix   Complexity   

Complexity

Complex classes like db_extractor.BasicNeedsForExtractor often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""
2
Basic Needs For Extractor class
3
4
Handling specific needs for Extractor script
5
"""
6
# package to facilitate time operations
7
from datetime import datetime
8
# package to add support for multi-language (i18n)
9
import gettext
10
# package to handle files/folders and related metadata/operations
11
import os
12
# package to facilitate working with directories and files
13
from pathlib import Path
14
# package to facilitate common operations
15
from db_extractor.BasicNeeds import BasicNeeds
16
17
18
class BasicNeedsForExtractor:
19
    connection_details = None
20
    class_bn = None
21
    locale = None
22
    str_ss = None
23
    srv = None
24
25
    def __init__(self, in_language='en_US'):
26
        self.class_bn = BasicNeeds(in_language)
27
        file_parts = os.path.normpath(os.path.abspath(__file__)).replace('\\', os.path.altsep)\
28
            .split(os.path.altsep)
29
        locale_domain = file_parts[(len(file_parts)-1)].replace('.py', '')
30
        locale_folder = os.path.normpath(os.path.join(
31
            os.path.join(os.path.altsep.join(file_parts[:-2]), 'project_locale'), locale_domain))
32
        self.locale = gettext.translation(locale_domain, localedir=locale_folder,
33
                                          languages=[in_language], fallback=True)
34
35
    def fn_check_inputs_specific(self, in_parameters):
36
        self.class_bn.fn_validate_single_value(in_parameters.input_source_system_file, 'file')
37
        self.class_bn.fn_validate_single_value(in_parameters.input_credentials_file, 'file')
38
        self.class_bn.fn_validate_single_value(in_parameters.input_extracting_sequence_file, 'file')
39
40
    def fn_validate_mandatory_properties(self, local_logger, who, properties_parent, in_list):
41
        is_valid = True
42
        for current_property in in_list:
43
            if current_property not in properties_parent:
44
                local_logger.error(self.locale.gettext(
45
                    '{who} does not contain "{current_property}" which is mandatory, '
46
                    + 'therefore extraction sequence will be ignored').replace('{who}', who)
47
                                   .replace('{current_property}', current_property))
48
                is_valid = False
49
        return is_valid
50
51
    def fn_is_extraction_necessary(self, local_logger, relevant_details):
52
        extraction_is_necessary = False
53
        local_logger.debug(self.locale.gettext('Extract behaviour is set to {extract_behaviour}')
54
                           .replace('{extract_behaviour}', relevant_details['extract-behaviour']))
55
        if relevant_details['extract-behaviour'] == 'skip-if-output-file-exists':
56
            if Path(relevant_details['output-csv-file']).is_file():
57
                local_logger.debug(self.locale.gettext(
58
                        'File {file_name} already exists, '
59
                        + 'so database extraction will not be performed')
60
                                   .replace('{file_name}', relevant_details['output-csv-file']))
61
            else:
62
                extraction_is_necessary = True
63
                local_logger.debug(self.locale.gettext(
64
                        'File {file_name} does not exist, '
65
                        + 'so database extraction has to be performed')
66
                                   .replace('{file_name}', relevant_details['output-csv-file']))
67
        elif relevant_details['extract-behaviour'] == 'overwrite-if-output-file-exists':
68
            extraction_is_necessary = True
69
            local_logger.debug(self.locale.gettext('Query extraction probably should be performed '
70
                                                   + '(other checks might be required)'))
71
        return extraction_is_necessary
72
73
    @staticmethod
74
    def fn_is_extraction_necessary_additional(local_logger, c_ph, c_fo, in_dict):
75
        if in_dict['session']['extract-overwrite-condition'] == 'inherit-from-parent':
76
            in_dict['session']['extract-overwrite-condition'] = \
77
                in_dict['query']['extract-overwrite-condition']
78
        elif in_dict['session']['extract-overwrite-condition'] == 'inherit-from-grand-parent':
79
            in_dict['session']['extract-overwrite-condition'] = \
80
                in_dict['sequence']['extract-overwrite-condition']
81
        ref_expr = in_dict['session']['extract-overwrite-condition']['reference-expression']
82
        reference_datetime = c_ph.eval_expression(
83
            local_logger, ref_expr, in_dict['session']['start-iso-weekday'])
84
        child_parent_expressions = c_ph.get_child_parent_expressions()
85
        deviation_original = child_parent_expressions.get(ref_expr.split('_')[1])
86
        r_dt = datetime.strptime(
87
            reference_datetime, c_ph.output_standard_formats.get(deviation_original))
88
        return c_fo.fn_get_file_datetime_verdict(
89
            local_logger, in_dict['file']['name'], 'last modified', datetime.timestamp(r_dt))
90
91
    @staticmethod
92
    def fn_set_extract_behaviour(in_session):
93
        default_value = 'skip-if-output-file-exists'
94
        allowed_values = [
95
            'skip-if-output-file-exists',
96
            'overwrite-if-output-file-exists',
97
        ]
98
        if ('extract-behaviour' not in in_session) \
99
                or (in_session['extract-behaviour'] not in allowed_values):
100
            value_to_return = default_value
101
        else:
102
            value_to_return = in_session['extract-behaviour']
103
        return value_to_return
104
105
    def validate_all_json_files(self, local_logger, timer, extracting_sequences):
106
        timer.start()
107
        are_json_files_valid = False
108
        # validation of the extraction sequence file
109
        if self.validate_extraction_sequence_file(local_logger, extracting_sequences):
110
            are_json_files_valid = True
111
            local_logger.debug(self.locale.gettext('Relevant JSON files are valid'))
112
        timer.stop()
113
        return are_json_files_valid
114
115
    def validate_all_json_files_current(self, local_logger, timer, in_sequence, seq_index,
116
                                        in_source_systems, in_user_secrets):
117
        timer.start()
118
        # setting initial checks statuses
119
        can_proceed = {'Extraction'                    : self.validate_extraction_sequence(
120
            local_logger, in_sequence), 'Source System': False,
121
            'Source System Properties'                 : False, 'User Secrets': False,
122
            'User Secrets Properties'                  : False}
123
        # actual check
124
        if can_proceed['Extraction']:
125
            # just few values that's going to be used a lot
126
            srv = {
127
                'vdtp': in_sequence['server-vendor'] + ' ' + in_sequence['server-type'],
128
                'vdr' : in_sequence['server-vendor'],
129
                'typ' : in_sequence['server-type'],
130
                'grp' : in_sequence['server-group'],
131
                'lyr' : in_sequence['server-layer'],
132
            }
133
            can_proceed['Source System'] = self.validate_current_source_system(
134
                local_logger, srv, in_source_systems)
135
            if can_proceed['Source System']:
136
                # variable for source server details
137
                ss = in_source_systems[srv['vdr']][srv['typ']]['Server'][srv['grp']][srv['lyr']]
138
                self.str_ss = '"' + '", "'.join(srv.values()) + '"'
139
                can_proceed['Source System Properties'] = \
140
                    self.validate_current_source_system_properties(local_logger, ss)
141
                can_proceed['User Secrets'] = self.validate_user_secrets_file(
142
                    local_logger, srv, in_user_secrets)
143
                srv.update(can_proceed)
144
                srv.update(ss)
145
                srv.update({'ac': in_sequence['account-label']})
146
                can_proceed['User Secrets Properties'] = \
147
                    self.validate_user_and_establish_connection_details(
148
                        local_logger, srv, in_user_secrets)
149
        local_logger.debug(self.locale.gettext('For the sequence number {sequence_number} '
150
                                               + 'the relevant details for database connection '
151
                                               + 'have been verified')
152
                           .replace('{sequence_number}', str(seq_index)))
153
        timer.stop()
154
        return can_proceed
155
156
    def validate_extraction_query(self, local_logger, timer, in_extraction_sequence):
157
        timer.start()
158
        mandatory_props_q = [
159
            'input-query-file',
160
            'sessions',
161
        ]
162
        is_valid = self.fn_validate_mandatory_properties(local_logger,
163
                                                         self.locale.gettext('Extraction Query'),
164
                                                         in_extraction_sequence,
165
                                                         mandatory_props_q)
166
        timer.stop()
167
        return is_valid
168
169
    def validate_extraction_sequence_file(self, local_logger, in_extract_sequence):
170
        is_valid = True
171
        if type(in_extract_sequence) != list:
172
            local_logger.error(self.locale.gettext(
173
                'JSON extraction sequence is not a LIST and must be, therefore cannot be used'))
174
            is_valid = False
175
        elif len(in_extract_sequence) < 1:
176
            local_logger.error(self.locale.gettext(
177
                'Extraction sequence file name is a LIST '
178
                + 'but does not contain at least 1 extraction sequence, '
179
                + 'therefore cannot be used'))
180
            is_valid = False
181
        return is_valid
182
183
    def validate_extraction_sequence(self, local_logger, in_extraction_sequence):
184
        mandatory_props_e = [
185
            'server-vendor',
186
            'server-type',
187
            'server-group',
188
            'server-layer',
189
            'account-label',
190
            'queries',
191
        ]
192
        is_valid = self.fn_validate_mandatory_properties(
193
                local_logger, self.locale.gettext('Extraction Sequence'),
194
                in_extraction_sequence, mandatory_props_e)
195
        return is_valid
196
197
    def validate_query_session(self, local_logger, in_session):
198
        mandatory_properties = [
199
            'extract-behaviour',
200
            'output-file',
201
        ]
202
        is_valid = self.fn_validate_mandatory_properties(
203
                local_logger, self.locale.gettext('Query Session') + ' ' + self.str_ss,
204
                in_session, mandatory_properties)
205
        return is_valid
206
207
    def validate_current_source_system_properties(self, local_logger, in_source_system):
208
        mandatory_properties = [
209
            'ServerName',
210
            'ServerPort',
211
        ]
212
        is_valid = self.fn_validate_mandatory_properties(
213
                local_logger, self.locale.gettext('Source System') + ' ' + self.str_ss,
214
                in_source_system, mandatory_properties)
215
        return is_valid
216
217
    def validate_current_source_system(self, in_logger, in_seq, in_source_systems):
218
        can_proceed_s = self.fn_validate_mandatory_properties(
219
                in_logger, self.locale.gettext('Source Systems'),
220
                in_source_systems, [in_seq['vdr']])
221
        can_proceed_s1 = False
222
        if can_proceed_s:
223
            item_checked = in_source_systems[in_seq['vdr']]
224
            can_proceed_s1 = self.fn_validate_mandatory_properties(
225
                    in_logger, self.locale.gettext('Source Systems') + ' "' + in_seq['vdr'] + '"',
226
                    item_checked, [in_seq['typ']])
227
        can_proceed_s2 = False
228
        if can_proceed_s1:
229
            item_checked = in_source_systems[in_seq['vdr']][in_seq['typ']]['Server']
230
            can_proceed_s2 = self.fn_validate_mandatory_properties(
231
                    in_logger, self.locale.gettext('Source Systems') + ' "' + in_seq['vdr']
232
                    + '", "' + in_seq['typ'] + '", "Server" ',
233
                    item_checked, [in_seq['grp']])
234
        can_proceed_s3 = False
235
        if can_proceed_s2:
236
            item_checked = in_source_systems[in_seq['vdr']][in_seq['typ']]['Server'][in_seq['grp']]
237
            can_proceed_s3 = self.fn_validate_mandatory_properties(
238
                    in_logger, self.locale.gettext('Source Systems') + ' "' + in_seq['vdr']
239
                    + '", "' + in_seq['typ'] + '", "' + in_seq['grp'] + '", "Server" ',
240
                    item_checked, [in_seq['lyr']])
241
        return can_proceed_s and can_proceed_s1 and can_proceed_s2 and can_proceed_s3
242
243
    def validate_user_and_establish_connection_details(self, local_logger, in_dc, in_user):
244
        is_valid = False
245
        if in_dc['Source System Properties'] and in_dc['User Secrets']:
246
            # variable with credentials for source server
247
            u = in_user[in_dc['vdr']][in_dc['typ']][in_dc['grp']][in_dc['lyr']][in_dc['ac']]
248
            if 'Storage' in u:
249
                known_account_keys = ['Name', 'Password', 'Username']
250
                for current_key in known_account_keys:
251
                    if current_key in u['Storage']:
252
                        if u['Storage'][current_key] == 'environment variable':  # handling special value
253
                            local_logger.debug(self.locale.gettext(
254
                                'As {current_key} has been set to be an "environment variable"'
255
                                + ', value will be read using {environment_variable_name}')
256
                                              .replace('{current_key}', current_key)
257
                                              .replace('{environment_variable_name}', u[current_key]))
258
                            env_var_value = os.getenv(u[current_key])
259
                            if env_var_value is None:
260
                                local_logger.error('No such {environment_variable_name} has been found')
261
                            else:
262
                                u[current_key] = env_var_value
263
            is_valid = self.validate_user_secrets(local_logger, u)
264
            self.connection_details = {
265
                'server-vendor-and-type': in_dc['vdtp'],
266
                'server-layer'          : in_dc['lyr'],
267
                'ServerName'            : in_dc['ServerName'],
268
                'ServerPort'            : int(in_dc['ServerPort']),
269
                'Username'              : u['Username'],
270
                'Name'                  : u['Name'],
271
                'Password'              : u['Password'],
272
            }
273
        return is_valid
274
275
    def validate_user_secrets(self, local_logger, in_user_secrets):
276
        mandatory_properties = [
277
            'Name',
278
            'Username',
279
            'Password',
280
        ]
281
        is_valid = self.fn_validate_mandatory_properties(local_logger,
282
                                                         self.locale.gettext('User Secrets')
283
                                                         + ' ' + self.str_ss,
284
                                                         in_user_secrets, mandatory_properties)
285
        if in_user_secrets['Name'] in ('login', 'Your Full Name Here'):
286
            local_logger.warning(
287
                    self.locale.gettext('For {str_user_secrets} your "Name" property '
288
                                        + 'has the default value: "{default_name_value}" '
289
                                        + 'which is unusual')
290
                        .replace('{str_user_secrets}', self.str_ss)
291
                        .replace('{default_name_value}', in_user_secrets['Name']))
292
        if in_user_secrets['Username'] in ('usrnme', 'your_username_goes_here'):
293
            local_logger.warning(
294
                    self.locale.gettext('For {str_user_secrets} your "Username" property '
295
                                        + 'has the default value: "{default_username_value}" '
296
                                        + 'which is unusual')
297
                        .replace('{str_user_secrets}', self.str_ss)
298
                        .replace('{default_username_value}', in_user_secrets['Username']))
299
        if in_user_secrets['Password'] in ('pwd', 'your_password_goes_here'):
300
            local_logger.warning(
301
                    self.locale.gettext('For {str_user_secrets} your "Password" property '
302
                                        + 'has the default value: "{default_password_value}" '
303
                                        + 'which is unusual')
304
                        .replace('{str_user_secrets}', self.str_ss)
305
                        .replace('{default_password_value}', in_user_secrets['Password']))
306
        return is_valid
307
308
    def validate_user_secrets_file(self, local_logger, in_seq, in_user_secrets):
309
        can_proceed_u = self.fn_validate_mandatory_properties(
310
                local_logger, self.locale.gettext('User Secrets'),
311
                in_user_secrets, [in_seq['vdr']])
312
        can_proceed_u1 = False
313
        if can_proceed_u:
314
            item_checked = in_user_secrets[in_seq['vdr']]
315
            can_proceed_u1 = self.fn_validate_mandatory_properties(
316
                    local_logger, self.locale.gettext('User Secrets') + ' "' + in_seq['vdr'] + '"',
317
                    item_checked, [in_seq['typ']])
318
        can_proceed_u2 = False
319
        if can_proceed_u1:
320
            item_checked = in_user_secrets[in_seq['vdr']][in_seq['typ']]
321
            can_proceed_u2 = self.fn_validate_mandatory_properties(
322
                    local_logger, self.locale.gettext('User Secrets') + ' "' + in_seq['vdr']
323
                    + '", "' + in_seq['typ'] + '", "Server" ',
324
                    item_checked, [in_seq['grp']])
325
        can_proceed_u3 = False
326
        if can_proceed_u2:
327
            item_checked = in_user_secrets[in_seq['vdr']][in_seq['typ']][in_seq['grp']]
328
            can_proceed_u3 = self.fn_validate_mandatory_properties(
329
                    local_logger, self.locale.gettext('User Secrets') + ' "' + in_seq['vdr']
330
                    + '", "' + in_seq['typ'] + '", "' + in_seq['grp'] + '", "Server" ',
331
                    item_checked, [in_seq['lyr']])
332
        return can_proceed_u and can_proceed_u1 and can_proceed_u2 and can_proceed_u3
333