Passed
Push — development/test ( 6383cd...222d55 )
by Daniel
02:57
created

ParameterHandling.get_child_parent_expressions()   A

Complexity

Conditions 3

Size

Total Lines 6
Code Lines 6

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 3
eloc 6
nop 1
dl 0
loc 6
rs 10
c 0
b 0
f 0
1
"""
2
Class Parameter Handling
3
4
Facilitates handling parameters values
5
"""
6
# package to handle date and times
7
from datetime import date, datetime, timedelta
8
# package to allow year and/or month operations based on a reference date
9
import datedelta
10
# package to add support for multi-language (i18n)
11
import gettext
12
# package to handle files/folders and related metadata/operations
13
import os
14
# package regular expressions
15
import re
16
17
18
class ParameterHandling:
19
    known_expressions = {
20
        'year': ['CY', 'CurrentYear'],
21
        'month': ['CYCM', 'CurrentYearCurrentMonth'],
22
        'just_month': ['CM', 'CurrentMonth'],
23
        'week': ['CYCW', 'CurrentYearCurrentWeek'],
24
        'just_week': ['CW', 'CurrentWeek'],
25
        'day': ['CYCMCD', 'CurrentYearCurrentMonthCurrentDay'],
26
        'just_day': ['CD', 'CurrentDay'],
27
        'hour': ['CYCMCDCH', 'CurrentYearCurrentMonthCurrentDayCurrentHour'],
28
    }
29
    lcl = None
30
31
    def __init__(self, default_language='en_US'):
32
        current_script = os.path.basename(__file__).replace('.py', '')
33
        lang_folder = os.path.join(os.path.dirname(__file__), current_script + '_Locale')
34
        self.lcl = gettext.translation(current_script, lang_folder, languages=[default_language])
35
36
    def build_parameters(self, local_logger, query_session_parameters, in_parameter_rules
37
                         , in_start_isoweekday):
38
        local_logger.debug(self.lcl.gettext('Seen Parameters are: {parameters}') \
39
                           .replace('{parameters}', str(query_session_parameters)))
40
        parameters_type = type(query_session_parameters)
41
        local_logger.debug(self.lcl.gettext('Parameters type is {parameter_type}') \
42
                           .replace('{parameter_type}', str(parameters_type)))
43
        tp = None
44
        if parameters_type == dict:
45
            tp = tuple(query_session_parameters.values())
46
        elif parameters_type == list:
47
            tp = tuple(query_session_parameters)
48
        else:
49
            local_logger.error(self.lcl.gettext( \
50
                'Unexpected parameter type, either Dictionary or List expected '
51
                + 'but seen is {parameter_type}') \
52
                               .replace('{parameter_type}', str(parameters_type)))
53
            exit(1)
54
        local_logger.debug(self.lcl.gettext( \
55
            'Initial Tuple for Parameters is: {parameters_tupple}') \
56
                           .replace('{parameters_tupple}', str(tp)))
57
        return self.stringify_parameters(local_logger, tp, in_parameter_rules, in_start_isoweekday)
58
59
    @staticmethod
60
    def calculate_date_deviation(in_date, deviation_type, expression_parts):
61
        if expression_parts[2] is None:
62
            expression_parts[2] = 0
63
        final_dates = {
64
            'year': in_date + datedelta.datedelta(years=int(expression_parts[2])),
65
            'month': in_date + datedelta.datedelta(months=int(expression_parts[2])),
66
            'week': in_date + timedelta(weeks=int(expression_parts[2])),
67
            'day': in_date + timedelta(days=int(expression_parts[2])),
68
            'hour': in_date + timedelta(hours=int(expression_parts[2])),
69
        }
70
        return final_dates.get(deviation_type)
71
72
    def calculate_date_from_expression(self, local_logger, expression_parts, in_start_isoweekday):
73
        final_string = ''
74
        all_known_expressions = self.get_flattened_known_expressions()
75
        if expression_parts[1] in all_known_expressions:
76
            str_expression = '_'.join(expression_parts)
77
            local_logger.debug(self.lcl.gettext( \
78
                'A known expression "{expression_parts}" has to be interpreted') \
79
                               .replace('{expression_parts}', str_expression))
80
            final_string = self.interpret_known_expression(date.today(), expression_parts,
81
                                                           in_start_isoweekday)
82
            local_logger.debug(self.lcl.gettext( \
83
                'Known expression "{expression_parts}" has been interpreted as {final_string}') \
84
                               .replace('{expression_parts}', str_expression) \
85
                               .replace('{final_string}', final_string))
86
        else:
87
            local_logger.debug(self.lcl.gettext( \
88
                'Unknown expression encountered: provided was "{given_expression}" '
89
                + 'which is not among known ones: "{all_known_expressions}"') \
90
                               .replace('{given_expression}', str(expression_parts[1])) \
91
                               .replace('{all_known_expressions}',
92
                                        '", "'.join(all_known_expressions)))
93
            exit(1)
94
        return final_string
95
96
    def eval_expression(self, local_logger, crt_parameter, in_start_isoweekday):
97
        value_to_return = crt_parameter
98
        reg_ex = re.search(r'(CalculatedDate\_[A-Za-z]{2,75}\_*(-*)[0-9]{0,2})', crt_parameter)
99
        if reg_ex:
100
            parameter_value_parts = reg_ex.group().split('_')
101
            calculated = self.calculate_date_from_expression(local_logger, parameter_value_parts,
102
                                                             in_start_isoweekday)
103
            value_to_return = re.sub(reg_ex.group(), calculated, crt_parameter)
104
            local_logger.debug(self.lcl.gettext( \
105
                'Current Parameter is STR and has been re-interpreted as value: "{str_value}"') \
106
                               .replace('{str_value}', str(value_to_return)))
107
        return value_to_return
108
109
    def get_child_parent_expressions(self):
110
        child_parent_values = {}
111
        for current_expression_group in self.known_expressions.items():
112
            for current_expression in current_expression_group[1]:
113
                child_parent_values[current_expression] = current_expression_group[0]
114
        return child_parent_values
115
116
    def get_flattened_known_expressions(self):
117
        flat_values = []
118
        index_counter = 0
119
        for current_expression_group in self.known_expressions.items():
120
            for current_expression in current_expression_group[1]:
121
                flat_values.append(index_counter)
122
                flat_values[index_counter] = current_expression
123
                index_counter += 1
124
        return flat_values
125
126
    @staticmethod
127
    def get_week_number_as_two_digits_string(in_date, in_start_isoweekday=1):
128
        if in_start_isoweekday == 7:
129
            in_date = in_date + timedelta(days=1)
130
        week_iso_num = datetime.isocalendar(in_date)[1]
131
        value_to_return = str(week_iso_num)
132
        if week_iso_num < 10:
133
            value_to_return = '0' + value_to_return
134
        return value_to_return
135
136
    def handle_query_parameters(self, local_logger, given_session, in_start_isoweekday):
137
        tp = None
138
        if 'parameters' in given_session:
139
            parameter_rules = []
140
            if 'parameters-handling-rules' in given_session:
141
                parameter_rules = given_session['parameters-handling-rules']
142
            tp = self.build_parameters(local_logger, given_session['parameters'], parameter_rules,
143
                                       in_start_isoweekday)
144
        return tp
145
146
    def interpret_known_expression(self, ref_date, expression_parts, in_start_isoweekday):
147
        child_parent_expressions = self.get_child_parent_expressions()
148
        deviation_original = child_parent_expressions.get(expression_parts[1])
149
        deviation = deviation_original.replace('just_', '')
150
        finalized_date = ref_date
151
        if len(expression_parts) > 2:
152
            finalized_date = self.calculate_date_deviation(ref_date, deviation, expression_parts)
153
        week_number_string = self.get_week_number_as_two_digits_string(finalized_date,
154
                                                                       in_start_isoweekday)
155
        if deviation_original == 'week':
156
            final_string = str(datetime.isocalendar(finalized_date)[0]) + week_number_string
157
        elif deviation_original == 'just_week':
158
            final_string = week_number_string
159
        else:
160
            standard_formats = {
161
                'year': '%Y',
162
                'month': '%Y%m',
163
                'just_month': '%m',
164
                'day': '%Y%m%d',
165
                'just_day': '%d',
166
                'hour': '%Y%m%d%H',
167
            }
168
            target_format = standard_formats.get(deviation_original)
169
            final_string = datetime.strftime(finalized_date, target_format)
170
        return final_string
171
172
    def manage_parameter_value(self, local_logger, given_prefix, given_parameter,
173
                               given_parameter_rules):
174
        element_to_join = ''
175
        if given_prefix == 'dict':
176
            element_to_join = given_parameter.values()
177
        elif given_prefix == 'list':
178
            element_to_join = given_parameter
179
        local_logger.debug(self.lcl.gettext( \
180
            'Current Parameter is {parameter_type} and has the value: {str_value}') \
181
                           .replace('{parameter_type}', given_prefix.upper()) \
182
                           .replace('{str_value}', str(element_to_join)))
183
        return given_parameter_rules[given_prefix + '-values-prefix'] \
184
               + given_parameter_rules[given_prefix + '-values-glue'].join(element_to_join) \
185
               + given_parameter_rules[given_prefix + '-values-suffix']
186
187
    def simulate_final_query(self, local_logger, timered, in_query, in_parameters_number, in_tp):
188
        timered.start()
189
        return_query = in_query
190
        if in_parameters_number > 0:
191
            try:
192
                return_query = in_query % in_tp
193
            except TypeError as e:
194
                local_logger.error(self.lcl.gettext( \
195
                    'Initial query expects {expected_parameters_counted} '
196
                    + 'but only {given_parameters_counted} were provided') \
197
                                   .replace('{expected_parameters_counted}',
198
                                            str(in_parameters_number)) \
199
                                   .replace('{given_parameters_counted}', str(len(in_tp))))
200
                local_logger.error(e)
201
        timered.stop()
202
        return return_query
203
204
    def stringify_parameters(self, local_logger, tuple_parameters, given_parameter_rules,
205
                             in_start_isoweekday):
206
        working_list = []
207
        for ndx, crt_parameter in enumerate(tuple_parameters):
208
            current_parameter_type = type(crt_parameter)
209
            working_list.append(ndx)
210
            if current_parameter_type == str:
211
                local_logger.debug(self.lcl.gettext( \
212
                    'Current Parameter is STR and has the value: {str_value}') \
213
                                   .replace('{str_value}', crt_parameter))
214
                working_list[ndx] = self.eval_expression(local_logger, crt_parameter,
215
                                                         in_start_isoweekday)
216
            elif current_parameter_type in (list, dict):
217
                prefix = str(current_parameter_type).replace("<class '", '').replace("'>", '')
218
                working_list[ndx] = self.manage_parameter_value(local_logger, prefix.lower(),
219
                                                                crt_parameter,
220
                                                                given_parameter_rules)
221
        final_tuple = tuple(working_list)
222
        local_logger.debug(self.lcl.gettext('Final Tuple for Parameters is: {final_tuple}') \
223
                           .replace('{final_tuple}', str(final_tuple)))
224
        return final_tuple
225