Completed
Push — master ( 545613...55df2f )
by Daniel
14s queued 11s
created

sources.common.ParameterHandling   A

Complexity

Total Complexity 35

Size/Duplication

Total Lines 222
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 35
eloc 192
dl 0
loc 222
rs 9.6
c 0
b 0
f 0

13 Methods

Rating   Name   Duplication   Size   Complexity  
A ParameterHandling.stringify_parameters() 0 21 4
A ParameterHandling.__init__() 0 4 1
A ParameterHandling.calculate_date_from_expression() 0 23 2
A ParameterHandling.get_child_parent_expressions() 0 6 3
A ParameterHandling.simulate_final_query() 0 16 3
A ParameterHandling.get_week_number_as_two_digits_string() 0 9 3
A ParameterHandling.manage_parameter_value() 0 14 3
A ParameterHandling.get_flattened_known_expressions() 0 5 2
A ParameterHandling.build_parameters() 0 22 3
A ParameterHandling.eval_expression() 0 12 2
A ParameterHandling.interpret_known_expression() 0 17 4
A ParameterHandling.calculate_date_deviation() 0 12 2
A ParameterHandling.handle_query_parameters() 0 10 3
1
"""
2
Class Parameter Handling
3
4
Facilitates handling parameters values
5
"""
6
# package to handle date and times
7
from datetime import datetime, timedelta
8
# package to allow year and/or month operations based on a reference date
9
import datedelta
10
# package to add support for multi-language (i18n)
11
import gettext
12
# package to handle files/folders and related metadata/operations
13
import os
14
# package regular expressions
15
import re
16
17
18
class ParameterHandling:
19
    known_expressions = {
20
        'year': ['CY', 'CurrentYear'],
21
        'month': ['CYCM', 'CurrentYearCurrentMonth'],
22
        'just_month': ['CM', 'CurrentMonth'],
23
        'week': ['CYCW', 'CurrentYearCurrentWeek'],
24
        'just_week': ['CW', 'CurrentWeek'],
25
        'day': ['CYCMCD', 'CurrentYearCurrentMonthCurrentDay'],
26
        'just_day': ['CD', 'CurrentDay'],
27
        'hour': ['CYCMCDCH', 'CurrentYearCurrentMonthCurrentDayCurrentHour'],
28
    }
29
    output_standard_formats = {
30
        'year': '%Y',
31
        'month': '%Y%m',
32
        'just_month': '%m',
33
        'day': '%Y%m%d',
34
        'just_day': '%d',
35
        'hour': '%Y%m%d%H%M%S%f',
36
    }
37
    locale = None
38
39
    def __init__(self, in_language='en_US'):
40
        current_script = os.path.basename(__file__).replace('.py', '')
41
        lang_folder = os.path.join(os.path.dirname(__file__), current_script + '_Locale')
42
        self.locale = gettext.translation(current_script, lang_folder, languages=[in_language])
43
44
    def build_parameters(self, local_logger, query_session_parameters, in_parameter_rules
45
                         , in_start_iso_weekday):
46
        local_logger.debug(self.locale.gettext('Seen Parameters are: {parameters}')
47
                           .replace('{parameters}', str(query_session_parameters)))
48
        parameters_type = type(query_session_parameters)
49
        local_logger.debug(self.locale.gettext('Parameters type is {parameter_type}')
50
                           .replace('{parameter_type}', str(parameters_type)))
51
        tp = None
52
        if parameters_type == dict:
53
            tp = tuple(query_session_parameters.values())
54
        elif parameters_type == list:
55
            tp = tuple(query_session_parameters)
56
        else:
57
            local_logger.error(self.locale.gettext(
58
                'Unexpected parameter type, either Dictionary or List expected, '
59
                + 'but seen is {parameter_type}')
60
                               .replace('{parameter_type}', str(parameters_type)))
61
            exit(1)
62
        local_logger.debug(self.locale.gettext(
63
            'Initial Tuple for Parameters is: {parameters_tuple}')
64
                           .replace('{parameters_tuple}', str(tp)))
65
        return self.stringify_parameters(local_logger, tp, in_parameter_rules, in_start_iso_weekday)
66
67
    @staticmethod
68
    def calculate_date_deviation(in_date, deviation_type, expression_parts):
69
        if expression_parts[2] is None:
70
            expression_parts[2] = 0
71
        final_dates = {
72
            'year': in_date + datedelta.datedelta(years=int(expression_parts[2])),
73
            'month': in_date + datedelta.datedelta(months=int(expression_parts[2])),
74
            'week': in_date + timedelta(weeks=int(expression_parts[2])),
75
            'day': in_date + timedelta(days=int(expression_parts[2])),
76
            'hour': in_date + timedelta(hours=int(expression_parts[2])),
77
        }
78
        return final_dates.get(deviation_type)
79
80
    def calculate_date_from_expression(self, local_logger, expression_parts, in_start_iso_weekday):
81
        final_string = ''
82
        all_known_expressions = self.get_flattened_known_expressions()
83
        if expression_parts[1] in all_known_expressions:
84
            str_expression = '_'.join(expression_parts)
85
            local_logger.debug(self.locale.gettext(
86
                'A known expression "{expression_parts}" has to be interpreted')
87
                               .replace('{expression_parts}', str_expression))
88
            final_string = self.interpret_known_expression(
89
                    datetime.utcnow(), expression_parts, in_start_iso_weekday)
90
            local_logger.debug(self.locale.gettext(
91
                'Known expression "{expression_parts}" has been interpreted as {final_string}')
92
                               .replace('{expression_parts}', str_expression)
93
                               .replace('{final_string}', final_string))
94
        else:
95
            local_logger.debug(self.locale.gettext(
96
                'Unknown expression encountered: provided was "{given_expression}" '
97
                + 'which is not among known ones: "{all_known_expressions}"')
98
                               .replace('{given_expression}', str(expression_parts[1]))
99
                               .replace('{all_known_expressions}',
100
                                        '", "'.join(all_known_expressions)))
101
            exit(1)
102
        return final_string
103
104
    def eval_expression(self, local_logger, crt_parameter, in_start_iso_weekday):
105
        value_to_return = crt_parameter
106
        reg_ex = re.search(r'(CalculatedDate_[A-Za-z]{2,75}_*(-*)[0-9]{0,2})', crt_parameter)
107
        if reg_ex:
108
            parameter_value_parts = reg_ex.group().split('_')
109
            calculated = self.calculate_date_from_expression(
110
                    local_logger, parameter_value_parts, in_start_iso_weekday)
111
            value_to_return = re.sub(reg_ex.group(), calculated, crt_parameter)
112
            local_logger.debug(self.locale.gettext(
113
                'Current Parameter is STR and has been re-interpreted as value: "{str_value}"')
114
                               .replace('{str_value}', str(value_to_return)))
115
        return value_to_return
116
117
    def get_child_parent_expressions(self):
118
        child_parent_values = {}
119
        for current_expression_group in self.known_expressions.items():
120
            for current_expression in current_expression_group[1]:
121
                child_parent_values[current_expression] = current_expression_group[0]
122
        return child_parent_values
123
124
    def get_flattened_known_expressions(self):
125
        flat_values = []
126
        for crt_list in self.known_expressions.values():
127
            flat_values += crt_list
128
        return flat_values
129
130
    @staticmethod
131
    def get_week_number_as_two_digits_string(in_date, in_start_iso_weekday=1):
132
        if in_start_iso_weekday == 7:
133
            in_date = in_date + timedelta(days=1)
134
        week_iso_num = datetime.isocalendar(in_date)[1]
135
        iso_wek_to_return = str(week_iso_num)
136
        if week_iso_num < 10:
137
            iso_wek_to_return = '0' + iso_wek_to_return
138
        return str(datetime.isocalendar(in_date)[0]), iso_wek_to_return
139
140
    def handle_query_parameters(self, local_logger, given_session, in_start_iso_weekday):
141
        tp = None
142
        if 'parameters' in given_session:
143
            parameter_rules = []
144
            if 'parameters-handling-rules' in given_session:
145
                parameter_rules = given_session['parameters-handling-rules']
146
            tp = self.build_parameters(
147
                    local_logger, given_session['parameters'], parameter_rules,
148
                    in_start_iso_weekday)
149
        return tp
150
151
    def interpret_known_expression(self, ref_date, expression_parts, in_start_iso_weekday):
152
        child_parent_expressions = self.get_child_parent_expressions()
153
        deviation_original = child_parent_expressions.get(expression_parts[1])
154
        deviation = deviation_original.replace('just_', '')
155
        finalized_date = ref_date
156
        if len(expression_parts) > 2:
157
            finalized_date = self.calculate_date_deviation(ref_date, deviation, expression_parts)
158
        year_number_string, week_number_string = self.get_week_number_as_two_digits_string(
159
                finalized_date, in_start_iso_weekday)
160
        if deviation_original == 'week':
161
            final_string = year_number_string + week_number_string
162
        elif deviation_original == 'just_week':
163
            final_string = week_number_string
164
        else:
165
            final_string = datetime.strftime(
166
                    finalized_date, self.output_standard_formats.get(deviation_original))
167
        return final_string
168
169
    def manage_parameter_value(self, local_logger, given_prefix, given_parameter,
170
                               given_parameter_rules):
171
        element_to_join = ''
172
        if given_prefix == 'dict':
173
            element_to_join = given_parameter.values()
174
        elif given_prefix == 'list':
175
            element_to_join = given_parameter
176
        local_logger.debug(self.locale.gettext(
177
            'Current Parameter is {parameter_type} and has the value: "{str_value}"')
178
                           .replace('{parameter_type}', self.locale.gettext(given_prefix.upper()))
179
                           .replace('{str_value}', str(element_to_join)))
180
        return given_parameter_rules[given_prefix + '-values-prefix'] \
181
            + given_parameter_rules[given_prefix + '-values-glue'].join(element_to_join) \
182
            + given_parameter_rules[given_prefix + '-values-suffix']
183
184
    def simulate_final_query(self, local_logger, timer, in_query, in_parameters_number, in_tp):
185
        timer.start()
186
        return_query = in_query
187
        if in_parameters_number > 0:
188
            try:
189
                return_query = in_query % in_tp
190
            except TypeError as e:
191
                local_logger.error(self.locale.gettext(
192
                    'Initial query expects {expected_parameters_counted} '
193
                    + 'but only {given_parameters_counted} were provided')
194
                                   .replace('{expected_parameters_counted}',
195
                                            str(in_parameters_number))
196
                                   .replace('{given_parameters_counted}', str(len(in_tp))))
197
                local_logger.error(e)
198
        timer.stop()
199
        return return_query
200
201
    def stringify_parameters(self, local_logger, tuple_parameters, given_parameter_rules,
202
                             in_start_iso_weekday):
203
        working_list = []
204
        for ndx, crt_parameter in enumerate(tuple_parameters):
205
            current_parameter_type = type(crt_parameter)
206
            working_list.append(ndx)
207
            if current_parameter_type == str:
208
                local_logger.debug(self.locale.gettext(
209
                    'Current Parameter is {parameter_type} and has the value: "{str_value}"')
210
                                   .replace('{parameter_type}', self.locale.gettext('STR'))
211
                                   .replace('{str_value}', crt_parameter))
212
                working_list[ndx] = self.eval_expression(
213
                        local_logger, crt_parameter, in_start_iso_weekday)
214
            elif current_parameter_type in (list, dict):
215
                prefix = str(current_parameter_type).replace("<class '", '').replace("'>", '')
216
                working_list[ndx] = self.manage_parameter_value(
217
                        local_logger, prefix.lower(), crt_parameter, given_parameter_rules)
218
        final_tuple = tuple(working_list)
219
        local_logger.debug(self.locale.gettext('Final Tuple for Parameters is: {final_tuple}')
220
                           .replace('{final_tuple}', str(final_tuple)))
221
        return final_tuple
222