Passed
Push — master ( 68cef4...1832b1 )
by Daniel
01:14
created

ParameterHandling.get_flattened_known_expressions()   A

Complexity

Conditions 3

Size

Total Lines 9
Code Lines 9

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 3
eloc 9
nop 1
dl 0
loc 9
rs 9.95
c 0
b 0
f 0
1
"""
2
Class Parameter Handling
3
4
Facilitates handling parameters values
5
"""
6
# package to facilitate common operations
7
from db_extractor.BasicNeeds import date, datetime, re, timedelta, BasicNeeds
8
# package to allow year and/or month operations based on a reference date
9
import datedelta
10
11
12
class ParameterHandling:
13
    lcl_bn = None
14
    known_expressions = {
15
        'year' : ['CY', 'CurrentYear'],
16
        'month': ['CYCM', 'CurrentYearCurrentMonth', 'CM'],
17
        'week' : ['CYCW', 'CurrentYearCurrentWeek', 'CW'],
18
        'day' : ['CYCMCD', 'CurrentYearCurrentMonthCurrentDay', 'CD']
19
    }
20
21
    def __init__(self):
22
        self.lcl_bn = BasicNeeds()
23
24
    def build_parameters(self, local_logger, query_session_parameters, in_parameter_rules):
25
        local_logger.debug('Seen Parameters are: ' + str(query_session_parameters))
26
        parameters_type = type(query_session_parameters)
27
        local_logger.debug('Parameters type is ' + str(parameters_type))
28
        if str(parameters_type) == "<class 'dict'>":
29
            tp = tuple(query_session_parameters.values())
30
        elif str(parameters_type) == "<class 'list'>":
31
            tp = tuple(query_session_parameters)
32
        else:
33
            local_logger.error('Unknown parameter type (expected either Dictionary or List)')
34
            exit(1)
35
        local_logger.debug('Initial Tuple for Parameters is: ' + str(tp))
0 ignored issues
show
introduced by
The variable tp does not seem to be defined for all execution paths.
Loading history...
36
        return self.stringify_parameters(local_logger, tp, in_parameter_rules)
37
38
    @staticmethod
39
    def calculate_date_deviation(in_date, deviation_type, expression_parts):
40
        final_date = in_date
41
        if len(expression_parts) >= 3:
42
            if deviation_type == 'year':
43
                final_date = in_date + datedelta.datedelta(years=int(expression_parts[2]))
44
            elif deviation_type == 'month':
45
                final_date = in_date + datedelta.datedelta(months=int(expression_parts[2]))
46
            elif deviation_type == 'week':
47
                final_date = in_date + timedelta(weeks=int(expression_parts[2]))
48
            elif deviation_type == 'day':
49
                final_date = in_date + timedelta(days=int(expression_parts[2]))
50
        return final_date
51
52
    def calculate_date_from_expression(self, local_logger, expression_parts):
53
        final_string = ''
54
        all_known_expressions = self.get_flattened_known_expressions()
55
        if expression_parts[1] in all_known_expressions:
56
            final_string = self.interpret_known_expression(local_logger, expression_parts)
57
        else:
58
            local_logger.error('Unknown expression encountered '
59
                               + str(expression_parts[1]) + '...')
60
            exit(1)
61
        return final_string
62
63
    def get_flattened_known_expressions(self):
64
        flat_values = []
65
        index_counter = 0
66
        for current_expression_group in self.known_expressions.items():
67
            for current_expression in current_expression_group[1]:
68
                flat_values.append(index_counter)
69
                flat_values[index_counter] = current_expression
70
                index_counter += 1
71
        return flat_values
72
73
    def handle_query_parameters(self, local_logger, given_session):
74
        tp = None
75
        if 'parameters' in given_session:
76
            parameter_rules = []
77
            if 'parameters-handling-rules' in given_session:
78
                parameter_rules = given_session['parameters-handling-rules']
79
            tp = self.build_parameters(local_logger, given_session['parameters'], parameter_rules)
80
        return tp
81
82
    @staticmethod
83
    def handle_prefix_for_date_format(date_expression, expected_prefixes, in_date):
84
        prefix_for_date_format = ''
85
        matching_rule_short = re.search(r'CY', date_expression)
86
        matching_rule_long = re.search(r'CurrentYear', date_expression)
87
        if matching_rule_short or matching_rule_long:
88
            prefix_for_date_format = str(datetime.isocalendar(in_date)[0])
89
        if expected_prefixes == 'CYCM':
90
            matching_rule2_short = re.search(r'CM', date_expression)
91
            matching_rule2_long = re.search(r'CurrentMonth', date_expression)
92
            if matching_rule2_short or matching_rule2_long:
93
                prefix_for_date_format += datetime.strftime(in_date, '%m')
94
        return prefix_for_date_format
95
96
    def interpret_known_expression(self, local_logger, expression_parts):
97
        final_string = ''
98
        current_date = date.today()
99
        local_logger.debug('I have just been provided with a known expression "'
100
                           + '_'.join(expression_parts) + '" to interpret')
101
        if expression_parts[1] in self.known_expressions['year']:
102
            finalized_date = self.calculate_date_deviation(current_date, 'year', expression_parts)
103
            final_string = datetime.strftime(finalized_date, '%Y')
104
        elif expression_parts[1] in self.known_expressions['month']:
105
            finalized_date = self.calculate_date_deviation(current_date, 'month', expression_parts)
106
            final_string = self.handle_prefix_for_date_format(expression_parts[1], 'CY',
107
                                                              finalized_date) \
108
                           + datetime.strftime(finalized_date, '%m')
109
        elif expression_parts[1] in self.known_expressions['week']:
110
            finalized_date = self.calculate_date_deviation(current_date, 'week', expression_parts)
111
            week_iso_num = datetime.isocalendar(finalized_date)[1]
112
            final_string = self.handle_prefix_for_date_format(expression_parts[1], 'CY',
113
                                                              finalized_date) \
114
                           + self.lcl_bn.fn_numbers_with_leading_zero(str(week_iso_num), 2)
115
        elif expression_parts[1] in self.known_expressions['day']:
116
            finalized_date = self.calculate_date_deviation(current_date, 'day', expression_parts)
117
            final_string = self.handle_prefix_for_date_format(expression_parts[1], 'CYCM',
118
                                                              finalized_date) \
119
                           + datetime.strftime(finalized_date, '%d')
120
        local_logger.debug('Provided known expression "' + '_'.join(expression_parts)
121
                           + '" has been determined to be "' + final_string + '"')
122
        return final_string
123
124
    @staticmethod
125
    def manage_parameter_value(given_prefix, given_parameter, given_parameter_rules):
126
        element_to_join = ''
127
        if given_prefix == 'dict':
128
            element_to_join = given_parameter.values()
129
        elif given_prefix == 'list':
130
            element_to_join = given_parameter
131
        return given_parameter_rules[given_prefix + '-values-prefix'] \
132
               + given_parameter_rules[given_prefix + '-values-glue'].join(element_to_join) \
133
               + given_parameter_rules[given_prefix + '-values-suffix']
134
135
    def simulate_final_query(self, local_logger, timered, in_query, in_parameters_number, in_tp):
136
        timered.start()
137
        return_query = in_query
138
        if in_parameters_number > 0:
139
            try:
140
                return_query = in_query % in_tp
141
                local_logger.info('Query with parameters interpreted is: '
142
                                  + self.lcl_bn.fn_multi_line_string_to_single_line(return_query))
143
            except TypeError as e:
144
                local_logger.debug('Initial query expects ' + str(in_parameters_number)
145
                                   + ' parameters but only ' + str(len(in_tp))
146
                                   + ' parameters were provided!')
147
                local_logger.error(e)
148
        timered.stop()
149
150
    def special_case_string(self, local_logger, crt_parameter):
151
        resulted_parameter_value = crt_parameter
152
        matching_rule = re.search(r'(CalculatedDate\_[A-Za-z]{2,62}\_*(-*)[0-9]{0,2})', crt_parameter)
153
        if matching_rule:
154
            parameter_value_parts = matching_rule.group().split('_')
155
            calculated_parameter_value = self.calculate_date_from_expression(local_logger,
156
                                                                             parameter_value_parts)
157
            resulted_parameter_value = re.sub(matching_rule.group(), calculated_parameter_value,
158
                                              crt_parameter)
159
            local_logger.debug('Current Parameter is STR '
160
                               + 'and has been re-interpreted as value: '
161
                               + str(resulted_parameter_value))
162
        return resulted_parameter_value
163
164
    def stringify_parameters(self, local_logger, tuple_parameters, given_parameter_rules):
165
        working_list = []
166
        for ndx, crt_parameter in enumerate(tuple_parameters):
167
            current_parameter_type = str(type(crt_parameter))
168
            working_list.append(ndx)
169
            if current_parameter_type == "<class 'str'>":
170
                local_logger.debug('Current Parameter is STR and has the value: ' + crt_parameter)
171
                working_list[ndx] = self.special_case_string(local_logger, crt_parameter)
172
            elif current_parameter_type in ("<class 'list'>", "<class 'dict'>"):
173
                prefix = current_parameter_type.replace("<class '", '').replace("'>", '')
174
                local_logger.debug('Current Parameter is ' + prefix.upper()
175
                                   + ' and has the value: ' + str(crt_parameter))
176
                working_list[ndx] = self.manage_parameter_value(prefix.lower(), crt_parameter,
177
                                                                given_parameter_rules)
178
        final_tuple = tuple(working_list)
179
        local_logger.debug('Final Tuple for Parameters is: ' + str(final_tuple))
180
        return final_tuple
181