Passed
Push — master ( 1832b1...545613 )
by Daniel
01:20
created

ParameterHandling.build_parameters()   A

Complexity

Conditions 3

Size

Total Lines 14
Code Lines 13

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 3
eloc 13
nop 4
dl 0
loc 14
rs 9.75
c 0
b 0
f 0
1
"""
2
Class Parameter Handling
3
4
Facilitates handling parameters values
5
"""
6
from datetime import date
7
# package to facilitate common operations
8
from db_extractor.BasicNeeds import datetime, re, timedelta, BasicNeeds
9
# package to allow year and/or month operations based on a reference date
10
import datedelta
11
12
13
class ParameterHandling:
14
    lcl_bn = None
15
    known_expressions = {
16
        'year': ['CY', 'CurrentYear'],
17
        'month': ['CYCM', 'CurrentYearCurrentMonth'],
18
        'just_month': ['CM', 'CurrentMonth'],
19
        'week': ['CYCW', 'CurrentYearCurrentWeek'],
20
        'just_week': ['CW', 'CurrentWeek'],
21
        'day': ['CYCMCD', 'CurrentYearCurrentMonthCurrentDay'],
22
        'just_day': ['CD', 'CurrentDay']
23
    }
24
25
    def __init__(self):
26
        self.lcl_bn = BasicNeeds()
27
28
    def build_parameters(self, local_logger, query_session_parameters, in_parameter_rules):
29
        local_logger.debug('Seen Parameters are: ' + str(query_session_parameters))
30
        parameters_type = type(query_session_parameters)
31
        local_logger.debug('Parameters type is ' + str(parameters_type))
32
        tp = None
33
        if str(parameters_type) == "<class 'dict'>":
34
            tp = tuple(query_session_parameters.values())
35
        elif str(parameters_type) == "<class 'list'>":
36
            tp = tuple(query_session_parameters)
37
        else:
38
            local_logger.error('Unknown parameter type (expected either Dictionary or List)')
39
            exit(1)
40
        local_logger.debug('Initial Tuple for Parameters is: ' + str(tp))
41
        return self.stringify_parameters(local_logger, tp, in_parameter_rules)
42
43
    @staticmethod
44
    def calculate_date_deviation(in_date, deviation_type, expression_parts):
45
        final_date = in_date
46
        if deviation_type == 'year':
47
            final_date = in_date + datedelta.datedelta(years=int(expression_parts[2]))
48
        elif deviation_type == 'month':
49
            final_date = in_date + datedelta.datedelta(months=int(expression_parts[2]))
50
        elif deviation_type == 'week':
51
            final_date = in_date + timedelta(weeks=int(expression_parts[2]))
52
        elif deviation_type == 'day':
53
            final_date = in_date + timedelta(days=int(expression_parts[2]))
54
        return final_date
55
56
    def calculate_date_from_expression(self, local_logger, expression_parts):
57
        final_string = ''
58
        all_known_expressions = self.get_flattened_known_expressions()
59
        if expression_parts[1] in all_known_expressions:
60
            local_logger.debug('I have just been provided with a known expression "'
61
                               + '_'.join(expression_parts) + '" to interpret')
62
            final_string = self.interpret_known_expression(date.today(), expression_parts)
63
            local_logger.debug('Provided known expression "' + '_'.join(expression_parts)
64
                               + '" has been determined to be "' + final_string + '"')
65
        else:
66
            local_logger.error('Unknown expression encountered '
67
                               + str(expression_parts[1]) + '...')
68
            exit(1)
69
        return final_string
70
71
    def get_child_parent_expressions(self):
72
        child_parent_values = {}
73
        for current_expression_group in self.known_expressions.items():
74
            for current_expression in current_expression_group[1]:
75
                child_parent_values[current_expression] = current_expression_group[0]
76
        return child_parent_values
77
78
    def get_flattened_known_expressions(self):
79
        flat_values = []
80
        index_counter = 0
81
        for current_expression_group in self.known_expressions.items():
82
            for current_expression in current_expression_group[1]:
83
                flat_values.append(index_counter)
84
                flat_values[index_counter] = current_expression
85
                index_counter += 1
86
        return flat_values
87
88
    def handle_query_parameters(self, local_logger, given_session):
89
        tp = None
90
        if 'parameters' in given_session:
91
            parameter_rules = []
92
            if 'parameters-handling-rules' in given_session:
93
                parameter_rules = given_session['parameters-handling-rules']
94
            tp = self.build_parameters(local_logger, given_session['parameters'], parameter_rules)
95
        return tp
96
97
    def interpret_known_expression(self, ref_date, expression_parts):
98
        child_parent_expressions = self.get_child_parent_expressions()
99
        deviation_original = child_parent_expressions.get(expression_parts[1])
100
        deviation = deviation_original.replace('just_', '')
101
        finalized_date = ref_date
102
        if len(expression_parts) >= 3:
103
            finalized_date = self.calculate_date_deviation(ref_date, deviation, expression_parts)
104
        week_iso_num = datetime.isocalendar(finalized_date)[1]
105
        if deviation_original == 'week':
106
            final_string = str(datetime.isocalendar(finalized_date)[0]) \
107
                           + self.lcl_bn.fn_numbers_with_leading_zero(str(week_iso_num), 2)
108
        elif deviation_original == 'just_week':
109
            final_string = self.lcl_bn.fn_numbers_with_leading_zero(str(week_iso_num), 2)
110
        else:
111
            standard_formats = {
112
                'year': '%Y',
113
                'month': '%Y%m',
114
                'just_month': '%m',
115
                'day': '%Y%m%d',
116
                'just_day': '%d',
117
            }
118
            target_format = standard_formats.get(deviation_original)
119
            final_string = datetime.strftime(finalized_date, target_format)
120
        return final_string
121
122
    @staticmethod
123
    def manage_parameter_value(given_prefix, given_parameter, given_parameter_rules):
124
        element_to_join = ''
125
        if given_prefix == 'dict':
126
            element_to_join = given_parameter.values()
127
        elif given_prefix == 'list':
128
            element_to_join = given_parameter
129
        return given_parameter_rules[given_prefix + '-values-prefix'] \
130
               + given_parameter_rules[given_prefix + '-values-glue'].join(element_to_join) \
131
               + given_parameter_rules[given_prefix + '-values-suffix']
132
133
    def simulate_final_query(self, local_logger, timered, in_query, in_parameters_number, in_tp):
134
        timered.start()
135
        return_query = in_query
136
        if in_parameters_number > 0:
137
            try:
138
                return_query = in_query % in_tp
139
                local_logger.info('Query with parameters interpreted is: '
140
                                  + self.lcl_bn.fn_multi_line_string_to_single_line(return_query))
141
            except TypeError as e:
142
                local_logger.debug('Initial query expects ' + str(in_parameters_number)
143
                                   + ' parameters but only ' + str(len(in_tp))
144
                                   + ' parameters were provided!')
145
                local_logger.error(e)
146
        timered.stop()
147
148
    def special_case_string(self, local_logger, crt_parameter):
149
        resulted_parameter_value = crt_parameter
150
        matching_rule = re.search(r'(CalculatedDate\_[A-Za-z]{2,62}\_*(-*)[0-9]{0,2})', crt_parameter)
151
        if matching_rule:
152
            parameter_value_parts = matching_rule.group().split('_')
153
            calculated_parameter_value = self.calculate_date_from_expression(local_logger,
154
                                                                             parameter_value_parts)
155
            resulted_parameter_value = re.sub(matching_rule.group(), calculated_parameter_value,
156
                                              crt_parameter)
157
            local_logger.debug('Current Parameter is STR '
158
                               + 'and has been re-interpreted as value: '
159
                               + str(resulted_parameter_value))
160
        return resulted_parameter_value
161
162
    def stringify_parameters(self, local_logger, tuple_parameters, given_parameter_rules):
163
        working_list = []
164
        for ndx, crt_parameter in enumerate(tuple_parameters):
165
            current_parameter_type = str(type(crt_parameter))
166
            working_list.append(ndx)
167
            if current_parameter_type == "<class 'str'>":
168
                local_logger.debug('Current Parameter is STR and has the value: ' + crt_parameter)
169
                working_list[ndx] = self.special_case_string(local_logger, crt_parameter)
170
            elif current_parameter_type in ("<class 'list'>", "<class 'dict'>"):
171
                prefix = current_parameter_type.replace("<class '", '').replace("'>", '')
172
                local_logger.debug('Current Parameter is ' + prefix.upper()
173
                                   + ' and has the value: ' + str(crt_parameter))
174
                working_list[ndx] = self.manage_parameter_value(prefix.lower(), crt_parameter,
175
                                                                given_parameter_rules)
176
        final_tuple = tuple(working_list)
177
        local_logger.debug('Final Tuple for Parameters is: ' + str(final_tuple))
178
        return final_tuple
179