Passed
Push — development/test ( 8eb624...e06103 )
by Daniel
03:34
created

ParameterHandling.interpret_known_expression()   A

Complexity

Conditions 4

Size

Total Lines 23
Code Lines 21

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 4
eloc 21
nop 3
dl 0
loc 23
rs 9.376
c 0
b 0
f 0
1
"""
2
Class Parameter Handling
3
4
Facilitates handling parameters values
5
"""
6
# package to handle date and times
7
from datetime import date, datetime, timedelta
8
# package regular expressions
9
import re
10
# package to allow year and/or month operations based on a reference date
11
import datedelta
12
13
14
class ParameterHandling:
15
    known_expressions = {
16
        'year': ['CY', 'CurrentYear'],
17
        'month': ['CYCM', 'CurrentYearCurrentMonth'],
18
        'just_month': ['CM', 'CurrentMonth'],
19
        'week': ['CYCW', 'CurrentYearCurrentWeek'],
20
        'just_week': ['CW', 'CurrentWeek'],
21
        'day': ['CYCMCD', 'CurrentYearCurrentMonthCurrentDay'],
22
        'just_day': ['CD', 'CurrentDay']
23
    }
24
25
    def build_parameters(self, local_logger, query_session_parameters, in_parameter_rules):
26
        local_logger.debug('Seen Parameters are: ' + str(query_session_parameters))
27
        parameters_type = type(query_session_parameters)
28
        local_logger.debug('Parameters type is ' + str(parameters_type))
29
        tp = None
30
        if str(parameters_type) == "<class 'dict'>":
31
            tp = tuple(query_session_parameters.values())
32
        elif str(parameters_type) == "<class 'list'>":
33
            tp = tuple(query_session_parameters)
34
        else:
35
            local_logger.error('Unknown parameter type (expected either Dictionary or List)')
36
            exit(1)
37
        local_logger.debug('Initial Tuple for Parameters is: ' + str(tp))
38
        return self.stringify_parameters(local_logger, tp, in_parameter_rules)
39
40
    @staticmethod
41
    def calculate_date_deviation(in_date, deviation_type, expression_parts):
42
        final_date = in_date
43
        if deviation_type == 'year':
44
            final_date = in_date + datedelta.datedelta(years=int(expression_parts[2]))
45
        elif deviation_type == 'month':
46
            final_date = in_date + datedelta.datedelta(months=int(expression_parts[2]))
47
        elif deviation_type == 'week':
48
            final_date = in_date + timedelta(weeks=int(expression_parts[2]))
49
        elif deviation_type == 'day':
50
            final_date = in_date + timedelta(days=int(expression_parts[2]))
51
        return final_date
52
53
    def calculate_date_from_expression(self, local_logger, expression_parts):
54
        final_string = ''
55
        all_known_expressions = self.get_flattened_known_expressions()
56
        if expression_parts[1] in all_known_expressions:
57
            local_logger.debug('I have just been provided with a known expression "'
58
                               + '_'.join(expression_parts) + '" to interpret')
59
            final_string = self.interpret_known_expression(date.today(), expression_parts)
60
            local_logger.debug('Provided known expression "' + '_'.join(expression_parts)
61
                               + '" has been determined to be "' + final_string + '"')
62
        else:
63
            local_logger.error('Unknown expression encountered '
64
                               + str(expression_parts[1]) + '...')
65
            exit(1)
66
        return final_string
67
68
    def eval_expression(self, local_logger, crt_parameter):
69
        value_to_return = crt_parameter
70
        reg_ex = re.search(r'(CalculatedDate\_[A-Za-z]{2,62}\_*(-*)[0-9]{0,2})', crt_parameter)
71
        if reg_ex:
72
            parameter_value_parts = reg_ex.group().split('_')
73
            calculated = self.calculate_date_from_expression(local_logger, parameter_value_parts)
74
            value_to_return = re.sub(reg_ex.group(), calculated, crt_parameter)
75
            local_logger.debug('Current Parameter is STR and has been re-interpreted as value: '
76
                               + str(value_to_return))
77
        return value_to_return
78
79
    def get_child_parent_expressions(self):
80
        child_parent_values = {}
81
        for current_expression_group in self.known_expressions.items():
82
            for current_expression in current_expression_group[1]:
83
                child_parent_values[current_expression] = current_expression_group[0]
84
        return child_parent_values
85
86
    def get_flattened_known_expressions(self):
87
        flat_values = []
88
        index_counter = 0
89
        for current_expression_group in self.known_expressions.items():
90
            for current_expression in current_expression_group[1]:
91
                flat_values.append(index_counter)
92
                flat_values[index_counter] = current_expression
93
                index_counter += 1
94
        return flat_values
95
96
    @staticmethod
97
    def get_week_number_as_two_digits_string(in_date):
98
        week_iso_num = datetime.isocalendar(in_date)[1]
99
        value_to_return = str(week_iso_num)
100
        if week_iso_num < 10:
101
            value_to_return = '0' + value_to_return
102
        return value_to_return
103
104
    def handle_query_parameters(self, local_logger, given_session):
105
        tp = None
106
        if 'parameters' in given_session:
107
            parameter_rules = []
108
            if 'parameters-handling-rules' in given_session:
109
                parameter_rules = given_session['parameters-handling-rules']
110
            tp = self.build_parameters(local_logger, given_session['parameters'], parameter_rules)
111
        return tp
112
113
    def interpret_known_expression(self, ref_date, expression_parts):
114
        child_parent_expressions = self.get_child_parent_expressions()
115
        deviation_original = child_parent_expressions.get(expression_parts[1])
116
        deviation = deviation_original.replace('just_', '')
117
        finalized_date = ref_date
118
        if len(expression_parts) >= 3:
119
            finalized_date = self.calculate_date_deviation(ref_date, deviation, expression_parts)
120
        week_number_string = self.get_week_number_as_two_digits_string(finalized_date)
121
        if deviation_original == 'week':
122
            final_string = str(datetime.isocalendar(finalized_date)[0]) + week_number_string
123
        elif deviation_original == 'just_week':
124
            final_string = week_number_string
125
        else:
126
            standard_formats = {
127
                'year': '%Y',
128
                'month': '%Y%m',
129
                'just_month': '%m',
130
                'day': '%Y%m%d',
131
                'just_day': '%d',
132
            }
133
            target_format = standard_formats.get(deviation_original)
134
            final_string = datetime.strftime(finalized_date, target_format)
135
        return final_string
136
137
    @staticmethod
138
    def manage_parameter_value(given_prefix, given_parameter, given_parameter_rules):
139
        element_to_join = ''
140
        if given_prefix == 'dict':
141
            element_to_join = given_parameter.values()
142
        elif given_prefix == 'list':
143
            element_to_join = given_parameter
144
        return given_parameter_rules[given_prefix + '-values-prefix'] \
145
               + given_parameter_rules[given_prefix + '-values-glue'].join(element_to_join) \
146
               + given_parameter_rules[given_prefix + '-values-suffix']
147
148
    def simulate_final_query(self, local_logger, timered, in_query, in_parameters_number, in_tp):
149
        timered.start()
150
        return_query = in_query
151
        if in_parameters_number > 0:
152
            try:
153
                return_query = in_query % in_tp
154
            except TypeError as e:
155
                local_logger.debug('Initial query expects ' + str(in_parameters_number)
156
                                   + ' parameters but only ' + str(len(in_tp))
157
                                   + ' parameters were provided!')
158
                local_logger.error(e)
159
        timered.stop()
160
        return return_query
161
162
    def stringify_parameters(self, local_logger, tuple_parameters, given_parameter_rules):
163
        working_list = []
164
        for ndx, crt_parameter in enumerate(tuple_parameters):
165
            current_parameter_type = str(type(crt_parameter))
166
            working_list.append(ndx)
167
            if current_parameter_type == "<class 'str'>":
168
                local_logger.debug('Current Parameter is STR and has the value: ' + crt_parameter)
169
                working_list[ndx] = self.eval_expression(local_logger, crt_parameter)
170
            elif current_parameter_type in ("<class 'list'>", "<class 'dict'>"):
171
                prefix = current_parameter_type.replace("<class '", '').replace("'>", '')
172
                local_logger.debug('Current Parameter is ' + prefix.upper()
173
                                   + ' and has the value: ' + str(crt_parameter))
174
                working_list[ndx] = self.manage_parameter_value(prefix.lower(), crt_parameter,
175
                                                                given_parameter_rules)
176
        final_tuple = tuple(working_list)
177
        local_logger.debug('Final Tuple for Parameters is: ' + str(final_tuple))
178
        return final_tuple
179