1
|
|
|
""" |
2
|
|
|
BasicNeeds - useful functions library |
3
|
|
|
|
4
|
|
|
This library has functions useful to keep main logic short and simple |
5
|
|
|
""" |
6
|
|
|
# package to handle date and times |
7
|
|
|
from datetime import datetime, timedelta |
8
|
|
|
# package to add support for multi-language (i18n) |
9
|
|
|
import gettext |
10
|
|
|
# package to handle files/folders and related metadata/operations |
11
|
|
|
import os |
12
|
|
|
# package regular expressions |
13
|
|
|
import re |
14
|
|
|
|
15
|
|
|
|
16
|
|
|
class BasicNeeds: |
17
|
|
|
locale = None |
18
|
|
|
|
19
|
|
|
def __init__(self, in_language='en_US'): |
20
|
|
|
current_script = os.path.basename(__file__).replace('.py', '') |
21
|
|
|
lang_folder = os.path.join(os.path.dirname(__file__), current_script + '_Locale') |
22
|
|
|
self.locale = gettext.translation(current_script, lang_folder, languages=[in_language]) |
23
|
|
|
|
24
|
|
|
def fn_add_value_to_dictionary(self, in_list, adding_value, adding_type, reference_column): |
25
|
|
|
add_type = adding_type.lower() |
26
|
|
|
total_columns = len(in_list.copy()) |
27
|
|
|
reference_indexes = { |
28
|
|
|
'add': {'after': 0, 'before': 0}, |
29
|
|
|
'cycle_down_to': {'after': 0, 'before': 0} |
30
|
|
|
} |
31
|
|
|
if reference_column is not None: |
32
|
|
|
reference_indexes = { |
33
|
|
|
'add': { |
34
|
|
|
'after': in_list.copy().index(reference_column) + 1, |
35
|
|
|
'before': in_list.copy().index(reference_column), |
36
|
|
|
}, |
37
|
|
|
'cycle_down_to': { |
38
|
|
|
'after': in_list.copy().index(reference_column), |
39
|
|
|
'before': in_list.copy().index(reference_column), |
40
|
|
|
} |
41
|
|
|
} |
42
|
|
|
positions = { |
43
|
|
|
'after': { |
44
|
|
|
'cycle_down_to': reference_indexes.get('cycle_down_to').get('after'), |
45
|
|
|
'add': reference_indexes.get('add').get('after'), |
46
|
|
|
}, |
47
|
|
|
'before': { |
48
|
|
|
'cycle_down_to': reference_indexes.get('cycle_down_to').get('before'), |
49
|
|
|
'add': reference_indexes.get('add').get('before'), |
50
|
|
|
}, |
51
|
|
|
'first': { |
52
|
|
|
'cycle_down_to': 0, |
53
|
|
|
'add': 0, |
54
|
|
|
}, |
55
|
|
|
'last': { |
56
|
|
|
'cycle_down_to': total_columns, |
57
|
|
|
'add': total_columns, |
58
|
|
|
} |
59
|
|
|
} |
60
|
|
|
return self.add_value_to_dictionary_by_position({ |
61
|
|
|
'adding_value': adding_value, |
62
|
|
|
'list': in_list.copy(), |
63
|
|
|
'position_to_add': positions.get(add_type).get('add'), |
64
|
|
|
'position_to_cycle_down_to': positions.get(add_type).get('cycle_down_to'), |
65
|
|
|
'total_columns': total_columns, |
66
|
|
|
}) |
67
|
|
|
|
68
|
|
|
@staticmethod |
69
|
|
|
def add_value_to_dictionary_by_position(adding_dictionary): |
70
|
|
|
list_with_values = adding_dictionary['list'] |
71
|
|
|
list_with_values.append(adding_dictionary['total_columns']) |
72
|
|
|
for counter in range(adding_dictionary['total_columns'], |
73
|
|
|
adding_dictionary['position_to_cycle_down_to'], -1): |
74
|
|
|
list_with_values[counter] = list_with_values[(counter - 1)] |
75
|
|
|
list_with_values[adding_dictionary['position_to_add']] = adding_dictionary['adding_value'] |
76
|
|
|
return list_with_values |
77
|
|
|
|
78
|
|
|
def fn_check_inputs(self, input_parameters): |
79
|
|
|
if input_parameters.output_log_file is not None: |
80
|
|
|
# checking log folder first as there's all further messages will be stored |
81
|
|
|
self.fn_validate_single_value( |
82
|
|
|
os.path.dirname(input_parameters.output_log_file), 'folder') |
83
|
|
|
|
84
|
|
|
@staticmethod |
85
|
|
|
def fn_decide_by_omission_or_specific_true(in_dictionary, key_decision_factor): |
86
|
|
|
""" |
87
|
|
|
Evaluates if a property is specified in a Dict structure |
88
|
|
|
|
89
|
|
|
@param in_dictionary: input Dict structure |
90
|
|
|
@param key_decision_factor: key used to search value in Dict structure |
91
|
|
|
@return: True|False |
92
|
|
|
""" |
93
|
|
|
final_decision = False |
94
|
|
|
if key_decision_factor in in_dictionary: |
95
|
|
|
final_decision = True |
96
|
|
|
elif in_dictionary[key_decision_factor]: |
97
|
|
|
final_decision = True |
98
|
|
|
return final_decision |
99
|
|
|
|
100
|
|
|
@staticmethod |
101
|
|
|
def fn_evaluate_dict_values(in_dict): |
102
|
|
|
true_counted = 0 |
103
|
|
|
for crt_value in in_dict: |
104
|
|
|
if in_dict[crt_value]: |
105
|
|
|
true_counted += 1 |
106
|
|
|
all_true = False |
107
|
|
|
if true_counted == len(in_dict): |
108
|
|
|
all_true = True |
109
|
|
|
return all_true |
110
|
|
|
|
111
|
|
|
@staticmethod |
112
|
|
|
def fn_evaluate_list_values(in_list): |
113
|
|
|
true_counted = 0 |
114
|
|
|
for crt_value in in_list: |
115
|
|
|
if crt_value: |
116
|
|
|
true_counted += 1 |
117
|
|
|
all_true = False |
118
|
|
|
if true_counted == len(in_list): |
119
|
|
|
all_true = True |
120
|
|
|
return all_true |
121
|
|
|
|
122
|
|
|
def fn_final_message(self, local_logger, log_file_name, performance_in_seconds): |
123
|
|
|
total_time_string = str(timedelta(seconds=performance_in_seconds)) |
124
|
|
|
if log_file_name == 'None': |
125
|
|
|
self.fn_timestamped_print(self.locale.gettext( |
126
|
|
|
'Application finished, whole script took {total_time_string}') |
127
|
|
|
.replace('{total_time_string}', total_time_string)) |
128
|
|
|
else: |
129
|
|
|
local_logger.info(self.locale.gettext('Total execution time was {total_time_string}') |
130
|
|
|
.replace('{total_time_string}', total_time_string)) |
131
|
|
|
self.fn_timestamped_print(self.locale.gettext( |
132
|
|
|
'Application finished, for complete logged details please check {log_file_name}') |
133
|
|
|
.replace('{log_file_name}', log_file_name)) |
134
|
|
|
|
135
|
|
|
@staticmethod |
136
|
|
|
def fn_multi_line_string_to_single(input_string): |
137
|
|
|
string_to_return = input_string.replace('\n', ' ').replace('\r', ' ') |
138
|
|
|
return re.sub(r'\s{2,100}', ' ', string_to_return).replace(' , ', ', ').strip() |
139
|
|
|
|
140
|
|
|
@staticmethod |
141
|
|
|
def fn_numbers_with_leading_zero(input_number_as_string, digits): |
142
|
|
|
final_number = input_number_as_string |
143
|
|
|
if len(input_number_as_string) < digits: |
144
|
|
|
final_number = '0' * (digits - len(input_number_as_string)) + input_number_as_string |
145
|
|
|
return final_number |
146
|
|
|
|
147
|
|
|
def fn_optional_print(self, boolean_variable, string_to_print): |
148
|
|
|
if boolean_variable: |
149
|
|
|
self.fn_timestamped_print(string_to_print) |
150
|
|
|
|
151
|
|
|
@staticmethod |
152
|
|
|
def fn_timestamped_print(string_to_print): |
153
|
|
|
print(datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S.%f %Z") + '- ' + string_to_print) |
154
|
|
|
|
155
|
|
|
def fn_validate_one_value(self, value_to_validate, validation_type): |
156
|
|
|
is_invalid = False |
157
|
|
|
message = '' |
158
|
|
|
if validation_type == 'file': |
159
|
|
|
is_invalid = (not os.path.isfile(value_to_validate)) |
160
|
|
|
message = self.locale.gettext('Given file "{value_to_validate}" does not exist') |
161
|
|
|
elif validation_type == 'folder': |
162
|
|
|
is_invalid = (not os.path.isdir(value_to_validate)) |
163
|
|
|
message = self.locale.gettext('Given folder "{value_to_validate}" does not exist') |
164
|
|
|
elif validation_type == 'url': |
165
|
|
|
url_reg_expression = 'https?://(?:www)?(?:[\\w-]{2,255}(?:\\.\\w{2,66}){1,2})' |
166
|
|
|
is_invalid = (not re.match(url_reg_expression, value_to_validate)) |
167
|
|
|
message = self.locale.gettext('Given url "{value_to_validate}" is not valid') |
168
|
|
|
return { |
169
|
|
|
'is_invalid': is_invalid, |
170
|
|
|
'message': message.replace('{value_to_validate}', value_to_validate), |
171
|
|
|
} |
172
|
|
|
|
173
|
|
|
def fn_validate_single_value(self, value_to_validate, validation_type): |
174
|
|
|
validation_details = self.fn_validate_one_value(value_to_validate, validation_type) |
175
|
|
|
if validation_details['is_invalid']: |
176
|
|
|
self.fn_timestamped_print(validation_details['message']) |
177
|
|
|
exit(1) |
178
|
|
|
|