Test Failed
Push — master ( 7c9493...8a6195 )
by P.R.
09:18
created

MySqlRoutineLoaderHelper.__init__()   A

Complexity

Conditions 1

Size

Total Lines 50
Code Lines 22

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 1
CRAP Score 1.6296

Importance

Changes 0
Metric Value
eloc 22
dl 0
loc 50
ccs 1
cts 7
cp 0.1429
rs 9.352
c 0
b 0
f 0
cc 1
nop 11
crap 1.6296

How to fix   Many Parameters   

Many Parameters

Methods with many parameters are not only hard to understand, but their parameters also often become inconsistent when you need more, or different data.

There are several approaches to avoid long parameter lists:

1 1
import re
2 1
from typing import Dict, Optional, Any
3
4 1
from mysql import connector
5 1
from pystratum_backend.StratumStyle import StratumStyle
6
7 1
from pystratum_common.exception.LoaderException import LoaderException
8 1
from pystratum_common.helper.DataTypeHelper import DataTypeHelper
9 1
from pystratum_common.helper.RoutineLoaderHelper import RoutineLoaderHelper
10 1
from pystratum_mysql.helper.MySqlDataTypeHelper import MySqlDataTypeHelper
11 1
from pystratum_mysql.MySqlMetadataDataLayer import MySqlMetadataDataLayer
12
13
14 1
class MySqlRoutineLoaderHelper(RoutineLoaderHelper):
15
    """
16
    Class for loading a single stored routine into a MySQL instance from a (pseudo) SQL file.
17
    """
18
19
    # ------------------------------------------------------------------------------------------------------------------
20 1
    def __init__(self,
21
                 io: StratumStyle,
22
                 dl: MySqlMetadataDataLayer,
23
                 routine_filename: str,
24
                 routine_file_encoding: str,
25
                 pystratum_old_metadata: Optional[Dict],
26
                 replace_pairs: Dict[str, Any],
27
                 rdbms_old_metadata: Optional[Dict],
28
                 sql_mode: str,
29
                 character_set: str,
30
                 collate: str):
31
        """
32
        Object constructor.
33
                                
34
        :param PyStratumStyle io: The output decorator.
35
        :param MySqlMetadataDataLayer dl: The metadata layer.
36
        :param str routine_filename: The filename of the source of the stored routine.
37
        :param str routine_file_encoding: The encoding of the source file.
38
        :param dict pystratum_old_metadata: The metadata of the stored routine from PyStratum.
39
        :param dict[str,str] replace_pairs: A map from placeholders to their actual values.
40
        :param dict rdbms_old_metadata: The old metadata of the stored routine from MS SQL Server.
41
        :param str sql_mode: The SQL mode under which the stored routine must be loaded and run.
42
        :param str character_set: The default character set under which the stored routine must be loaded and run.
43
        :param str collate: The default collate under which the stored routine must be loaded and run.
44
        """
45
        RoutineLoaderHelper.__init__(self,
46
                                     io,
47
                                     routine_filename,
48
                                     routine_file_encoding,
49
                                     pystratum_old_metadata,
50
                                     replace_pairs,
51
                                     rdbms_old_metadata)
52
53
        self._character_set: str = character_set
54
        """
55
        The default character set under which the stored routine will be loaded and run.
56
        """
57
58
        self._collate: str = collate
59
        """
60
        The default collate under which the stored routine will be loaded and run.
61
        """
62
63
        self._sql_mode: str = sql_mode
64
        """
65
        The SQL-mode under which the stored routine will be loaded and run.
66
        """
67
        
68
        self._dl: MySqlMetadataDataLayer = dl
69
        """
70
        The metadata layer.
71
        """
72
73
    # ------------------------------------------------------------------------------------------------------------------
74 1
    def _get_bulk_insert_table_columns_info(self) -> None:
75
        """
76
        Gets the column names and column types of the current table for bulk insert.
77
        """
78
        table_is_non_temporary = self._dl.check_table_exists(self._table_name)
79
80
        if not table_is_non_temporary:
81
            self._dl.call_stored_routine(self._routine_name)
82
83
        columns = self._dl.describe_table(self._table_name)
84
85
        tmp_column_types = []
86
        tmp_fields = []
87
88
        n1 = 0
89
        for column in columns:
90
            prog = re.compile('(\\w+)')
91
            c_type = prog.findall(column['Type'])
92
            tmp_column_types.append(c_type[0])
93
            tmp_fields.append(column['Field'])
94
            n1 += 1
95
96
        n2 = len(self._columns)
97
98
        if not table_is_non_temporary:
99
            self._dl.drop_temporary_table(self._table_name)
100
101
        if n1 != n2:
102
            raise LoaderException("Number of fields %d and number of columns %d don't match." % (n1, n2))
103
104
        self._columns_types = tmp_column_types
105
        self._fields = tmp_fields
106
107
    # ------------------------------------------------------------------------------------------------------------------
108 1
    def _get_data_type_helper(self) -> DataTypeHelper:
109
        """
110
        Returns a data type helper object for MySQL.
111
112
        :rtype: DataTypeHelper
113
        """
114
        return MySqlDataTypeHelper()
115
116
    # ------------------------------------------------------------------------------------------------------------------
117 1
    def _get_name(self) -> None:
118
        """
119
        Extracts the name of the stored routine and the stored routine type (i.e. procedure or function) source.
120
        """
121
        prog = re.compile("create\\s+(procedure|function)\\s+([a-zA-Z0-9_]+)")
122
        matches = prog.findall(self._routine_source_code)
123
124
        if matches:
125
            self._routine_type = matches[0][0].lower()
126
127
            if self._routine_name != matches[0][1]:
128
                raise LoaderException('Stored routine name {0} does not match filename in file {1}'.
129
                                      format(matches[0][1], self._source_filename))
130
131
        if not self._routine_type:
132
            raise LoaderException('Unable to find the stored routine name and type in file {0}'.
133
                                  format(self._source_filename))
134
135
    # ------------------------------------------------------------------------------------------------------------------
136 1
    def _get_routine_parameters_info(self) -> None:
137
        """
138
        Retrieves information about the stored routine parameters from the meta data of MySQL.
139
        """
140
        routine_parameters = self._dl.get_routine_parameters(self._routine_name)
141
        for routine_parameter in routine_parameters:
142
            if routine_parameter['parameter_name']:
143
                value = routine_parameter['column_type']
144
                if 'character_set_name' in routine_parameter:
145
                    if routine_parameter['character_set_name']:
146
                        value += ' character set %s' % routine_parameter['character_set_name']
147
                if 'collation' in routine_parameter:
148
                    if routine_parameter['character_set_name']:
149
                        value += ' collation %s' % routine_parameter['collation']
150
151
                self._parameters.append({'name':                 routine_parameter['parameter_name'],
152
                                         'data_type':            routine_parameter['parameter_type'],
153
                                         'numeric_precision':    routine_parameter['numeric_precision'],
154
                                         'numeric_scale':        routine_parameter['numeric_scale'],
155
                                         'data_type_descriptor': value})
156
157
    # ------------------------------------------------------------------------------------------------------------------
158 1
    def _is_start_of_stored_routine(self, line: str) -> bool:
159
        """
160
        Returns True if a line is the start of the code of the stored routine.
161
162
        :param str line: The line with source code of the stored routine.
163
164
        :rtype: bool
165
        """
166
        return re.match(r'^\s*create\s+(procedure|function)', line, re.IGNORECASE) is not None
167
168
    # ------------------------------------------------------------------------------------------------------------------
169 1
    def _is_start_of_stored_routine_body(self, line: str) -> bool:
170
        """
171
        Returns True if a line is the start of the body of the stored routine.
172
173
        :param str line: The line with source code of the stored routine.
174
175
        :rtype: bool
176
        """
177
        return re.match(r'^\s*begin', line, re.IGNORECASE) is not None
178
179
    # ------------------------------------------------------------------------------------------------------------------
180 1
    def _load_routine_file(self) -> None:
181
        """
182
        Loads the stored routine into the MySQL instance.
183
        """
184
        self._io.text('Loading {0} <dbo>{1}</dbo>'.format(self._routine_type, self._routine_name))
185
186
        self._unset_magic_constants()
187
        self._drop_routine()
188
189
        self._dl.set_sql_mode(self._sql_mode)
190
191
        self._dl.set_character_set(self._character_set, self._collate)
192
193
        self._dl.execute_none(self._routine_source_code)
194
195
    # ------------------------------------------------------------------------------------------------------------------
196 1
    def _log_exception(self, exception: Exception) -> None:
197
        """
198
        Logs an exception.
199
200
        :param Exception exception: The exception.
201
        """
202
        RoutineLoaderHelper._log_exception(self, exception)
203
204
        if isinstance(exception, connector.errors.Error):
205
            if exception.errno == 1064:
206
                # Exception is caused by an invalid SQL statement.
207
                sql = self._dl.last_sql()
208
                if sql:
209
                    sql = sql.strip()
210
                    # The format of a 1064 message is: %s near '%s' at line %d
211
                    parts = re.search(r'(\d+)$', exception.msg)
212
                    if parts:
213
                        error_line = int(parts.group(1))
214
                    else:
215
                        error_line = 0
216
217
                    self._print_sql_with_error(sql, error_line)
218
219
    # ------------------------------------------------------------------------------------------------------------------
220 1
    def _must_reload(self) -> bool:
221
        """
222
        Returns True if the source file must be load or reloaded. Otherwise returns False.
223
224
        :rtype: bool
225
        """
226
        if not self._pystratum_old_metadata:
227
            return True
228
229
        if self._pystratum_old_metadata['timestamp'] != self._m_time:
230
            return True
231
232
        if self._pystratum_old_metadata['replace']:
233
            for key, value in self._pystratum_old_metadata['replace'].items():
234
                if key.lower() not in self._replace_pairs or self._replace_pairs[key.lower()] != value:
235
                    return True
236
237
        if not self._rdbms_old_metadata:
238
            return True
239
240
        if self._rdbms_old_metadata['sql_mode'] != self._sql_mode:
241
            return True
242
243
        if self._rdbms_old_metadata['character_set_client'] != self._character_set:
244
            return True
245
246
        if self._rdbms_old_metadata['collation_connection'] != self._collate:
247
            return True
248
249
        return False
250
251
    # ------------------------------------------------------------------------------------------------------------------
252 1
    def _drop_routine(self) -> None:
253
        """
254
        Drops the stored routine if it exists.
255
        """
256
        if self._rdbms_old_metadata:
257
            self._dl.drop_stored_routine(self._rdbms_old_metadata['routine_type'], self._routine_name)
258
259
# ----------------------------------------------------------------------------------------------------------------------
260