Passed
Push — master ( 5ef4cb...89c1da )
by Daniel
01:13
created

ParameterHandling.calculate_date_from_expression()   A

Complexity

Conditions 3

Size

Total Lines 14
Code Lines 13

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 3
eloc 13
nop 3
dl 0
loc 14
rs 9.75
c 0
b 0
f 0
1
2
# standard Python packages
3
from datetime import datetime as ClassDT
4
from datetime import timedelta
5
6
# Custom classes specific to this package
7
from db_extractor.BasicNeeds import BasicNeeds
8
9
10
class ParameterHandling:
11
    lcl_bn = None
12
13
    def __init__(self):
14
        self.lcl_bn = BasicNeeds()
15
16
    def build_parameters(self, local_logger, query_session_parameters, in_parameter_rules):
17
        local_logger.debug('Seen Parameters are: ' + str(query_session_parameters))
18
        parameters_type = type(query_session_parameters)
19
        local_logger.debug('Parameters type is ' + str(parameters_type))
20
        if str(parameters_type) == "<class 'dict'>":
21
            tp = tuple(query_session_parameters.values())
22
        elif str(parameters_type) == "<class 'list'>":
23
            tp = tuple(query_session_parameters)
24
        else:
25
            local_logger.error('Unknown parameter type (expected either Dictionary or List)')
26
            exit(1)
27
        local_logger.debug('Initial Tuple for Parameters is: ' + str(tp))
0 ignored issues
show
introduced by
The variable tp does not seem to be defined for all execution paths.
Loading history...
28
        return self.stringify_parameters(local_logger, tp, in_parameter_rules)
29
30
    def calculate_date_from_expression(self, local_logger, expression_parts):
31
        final_string = ''
32
        current_date = ClassDT.today()
33
        if expression_parts[1] == 'CYCW':
34
            if len(expression_parts) >= 3:
35
                current_date = current_date + timedelta(weeks=int(expression_parts[2]))
36
            week_iso_num = ClassDT.isocalendar(current_date)[1]
37
            final_string = str(ClassDT.isocalendar(current_date)[0])\
38
                           + self.lcl_bn.fn_numbers_with_leading_zero(str(week_iso_num), 2)
39
        else:
40
            local_logger.error('Unknown expression encountered '
41
                               + str(expression_parts[1]) + '...')
42
            exit(1)
43
        return final_string
44
45
    def handle_query(self, local_logger, timered, given_session, given_query):
46
        timered.start()
47
        query_to_run = given_query
48
        if 'parameters' in given_session:
49
            parameter_rules = []
50
            if 'parameters-handling-rules' in given_session:
51
                parameter_rules = given_session['parameters-handling-rules']
52
            tp = self.build_parameters(local_logger, given_session['parameters'], parameter_rules)
53
            parameters_expected = given_query.count('%s')
54
            try:
55
                query_to_run = given_query % tp
56
                local_logger.info('Query with parameters interpreted is: '
57
                                  + self.lcl_bn.fn_multi_line_string_to_single_line(query_to_run))
58
            except TypeError as e:
59
                local_logger.debug('Initial query expects ' + str(parameters_expected)
60
                                   + ' parameters but only ' + str(len(tp))
61
                                   + ' parameters were provided!')
62
                local_logger.error(e)
63
        timered.stop()
64
        return query_to_run
65
66
    @staticmethod
67
    def manage_parameter_value(given_prefix, given_parameter, given_parameter_rules):
68
        element_to_join = ''
69
        if given_prefix == 'dict':
70
            element_to_join = given_parameter.values()
71
        elif given_prefix == 'list':
72
            element_to_join = given_parameter
73
        return given_parameter_rules[given_prefix + '-values-prefix'] \
74
               + given_parameter_rules[given_prefix + '-values-glue'].join(element_to_join) \
75
               + given_parameter_rules[given_prefix + '-values-suffix']
76
77
78
    def stringify_parameters(self, local_logger, tuple_parameters, given_parameter_rules):
79
        working_list = []
80
        for ndx, crt_parameter in enumerate(tuple_parameters):
81
            current_parameter_type = str(type(crt_parameter))
82
            working_list.append(ndx)
83
            if current_parameter_type == "<class 'str'>":
84
                local_logger.debug('Current Parameter is STR and has the value: ' + crt_parameter)
85
                working_list[ndx] = self.special_case_string(local_logger, crt_parameter)
86
            elif current_parameter_type in ("<class 'list'>", "<class 'dict'>"):
87
                prefix = current_parameter_type.replace("<class '", '').replace("'>", '')
88
                local_logger.debug('Current Parameter is ' + prefix.upper()
89
                                   + ' and has the value: ' + str(crt_parameter))
90
                working_list[ndx] = self.manage_parameter_value(prefix.lower(), crt_parameter,
91
                                                                given_parameter_rules)
92
        final_tuple = tuple(working_list)
93
        local_logger.debug('Final Tuple for Parameters is: ' + str(final_tuple))
94
        return final_tuple
95
96
    def special_case_string(self, local_logger, crt_parameter):
97
        resulted_parameter_value = crt_parameter
98
        if len(crt_parameter) > 14 and crt_parameter[:15] == 'CalculatedDate_':
99
            parameter_value_parts = crt_parameter.split('_')
100
            resulted_parameter_value = self.calculate_date_from_expression(local_logger,
101
                                                                           parameter_value_parts)
102
            local_logger.debug('Current Parameter is STR '
103
                               + 'and has been re-interpreted as value: '
104
                               + str(crt_parameter))
105
        return resulted_parameter_value
106