MsSqlRoutineLoaderHelper.__init__()   A
last analyzed

Complexity

Conditions 1

Size

Total Lines 38
Code Lines 18

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 18
dl 0
loc 38
rs 9.5
c 0
b 0
f 0
cc 1
nop 8

How to fix   Many Parameters   

Many Parameters

Methods with many parameters are not only hard to understand, but their parameters also often become inconsistent when you need more, or different data.

There are several approaches to avoid long parameter lists:

1
import re
2
3
from pystratum_backend.StratumStyle import StratumStyle
4
from pystratum_common.exception.LoaderException import LoaderException
5
from pystratum_common.helper.DataTypeHelper import DataTypeHelper
6
from pystratum_common.helper.RoutineLoaderHelper import RoutineLoaderHelper
7
8
from pystratum_mssql.helper.MsSqlDataTypeHelper import MsSqlDataTypeHelper
9
from pystratum_mssql.MsSqlMetadataDataLayer import MsSqlMetadataDataLayer
10
11
12
class MsSqlRoutineLoaderHelper(RoutineLoaderHelper):
13
    """
14
    Class for loading a single stored routine into a SQL Server instance from a (pseudo) SQL file.
15
    """
16
17
    # ------------------------------------------------------------------------------------------------------------------
18
    def __init__(self,
19
                 io: StratumStyle,
20
                 dl: MsSqlMetadataDataLayer,
21
                 routine_filename,
22
                 routine_file_encoding,
23
                 pystratum_old_metadata,
24
                 replace_pairs,
25
                 rdbms_old_metadata):
26
        """
27
        Object constructor.
28
29
        :param str routine_filename: The filename of the source of the stored routine.
30
        :param str routine_file_encoding: The encoding of the source file.
31
        :param dict pystratum_old_metadata: The metadata of the stored routine from PyStratum.
32
        :param dict[str,str] replace_pairs: A map from placeholders to their actual values.
33
        :param dict rdbms_old_metadata: The old metadata of the stored routine from MS SQL Server.
34
        :param pystratum.style.PyStratumStyle.PyStratumStyle io: The output decorator.
35
        """
36
        RoutineLoaderHelper.__init__(self,
37
                                     io,
38
                                     routine_filename,
39
                                     routine_file_encoding,
40
                                     pystratum_old_metadata,
41
                                     replace_pairs,
42
                                     rdbms_old_metadata)
43
44
        self._routine_base_name: str = ''
45
        """
46
        The name of the stored routine without schema name.
47
        """
48
49
        self._routines_schema_name: str = ''
50
        """
51
        The name of the schema of the stored routine.
52
        """
53
54
        self._dl: MsSqlMetadataDataLayer = dl
55
        """
56
        The metadata layer.
57
        """
58
59
    # ------------------------------------------------------------------------------------------------------------------
60
    def _drop_routine(self) -> None:
61
        """
62
        Drops the stored routine if it exists.
63
        """
64
        if self._rdbms_old_metadata:
65
            if self._rdbms_old_metadata['routine_type'].strip() == 'P':
66
                routine_type = 'procedure'
67
            elif self._rdbms_old_metadata['routine_type'].strip() in ('FN', 'TF'):
68
                routine_type = 'function'
69
            else:
70
                raise Exception("Unknown routine type '{0}'.".format(self._rdbms_old_metadata['routine_type']))
71
72
            self._dl.drop_stored_routine(routine_type,
73
                                         self._rdbms_old_metadata['schema_name'],
74
                                         self._routine_base_name)
75
76
    # ------------------------------------------------------------------------------------------------------------------
77
    def _get_bulk_insert_table_columns_info(self) -> None:
78
        """
79
        Gets the column names and column types of the current table for bulk insert.
80
        """
81
        raise NotImplementedError()
82
83
    # ------------------------------------------------------------------------------------------------------------------
84
    def _get_data_type_helper(self) -> DataTypeHelper:
85
        """
86
        Returns a data type helper object for SQL Server.
87
88
        :rtype: pystratum.helper.DataTypeHelper.DataTypeHelper
89
        """
90
        return MsSqlDataTypeHelper()
91
92
    # ------------------------------------------------------------------------------------------------------------------
93
    def _get_name(self) -> None:
94
        """
95
        Extracts the name of the stored routine and the stored routine type (i.e. procedure or function) source.
96
        """
97
        p = re.compile(r"create\s+(procedure|function)\s+(?:(\w+)\.([a-zA-Z0-9_]+))", re.IGNORECASE)
98
        matches = p.findall(self._routine_source_code)
99
100
        if matches:
101
            self._routine_type = matches[0][0].lower()
102
            self._routines_schema_name = matches[0][1]
103
            self._routine_base_name = matches[0][2]
104
105
            if self._routine_name != (matches[0][1] + '.' + matches[0][2]):
106
                raise LoaderException(
107
                        'Stored routine name {0}.{1} does not match filename in file {2}'.format(matches[0][1],
108
                                                                                                 matches[0][2],
109
                                                                                                 self._source_filename))
110
111
        if not self._routine_type:
112
            raise LoaderException('Unable to find the stored routine name and type in file {0}'.
113
                                  format(self._source_filename))
114
115
    # ------------------------------------------------------------------------------------------------------------------
116
    def _get_routine_parameters_info(self) -> None:
117
        """
118
        Retrieves information about the stored routine parameters from the meta data of SQL Server.
119
        """
120
        routine_parameters = self._dl.get_routine_parameters(self._routines_schema_name,
121
                                                             self._routine_base_name)
122
        if len(routine_parameters) != 0:
123
            for routine_parameter in routine_parameters:
124
                if routine_parameter['parameter_name']:
125
                    parameter_name = routine_parameter['parameter_name'][1:]
126
                    value = routine_parameter['type_name']
127
128
                    self._parameters.append({'name':                 parameter_name,
129
                                             'data_type':            routine_parameter['type_name'],
130
                                             'numeric_precision':    routine_parameter['precision'],
131
                                             'numeric_scale':        routine_parameter['scale'],
132
                                             'data_type_descriptor': value})
133
134
    # ------------------------------------------------------------------------------------------------------------------
135
    def _is_start_of_stored_routine(self, line: str) -> bool:
136
        """
137
        Returns True if a line is the start of the code of the stored routine.
138
139
        :param str line: The line with source code of the stored routine.
140
141
        :rtype: bool
142
        """
143
        return re.match(r'^\s*create\s+(procedure|function)', line, re.IGNORECASE) is not None
144
145
    # ------------------------------------------------------------------------------------------------------------------
146
    def _is_start_of_stored_routine_body(self, line: str) -> bool:
147
        """
148
        Returns True if a line is the start of the body of the stored routine.
149
150
        :param str line: The line with source code of the stored routine.
151
152
        :rtype: bool
153
        """
154
        return re.match(r'^\s*as', line, re.IGNORECASE) is not None
155
156
    # ------------------------------------------------------------------------------------------------------------------
157
    def _load_routine_file(self) -> None:
158
        """
159
        Loads the stored routine into the SQL Server instance.
160
        """
161
        self._io.text('Loading {0} <dbo>{1}</dbo>'.format(self._routine_type, self._routine_name))
162
163
        self._set_magic_constants()
164
165
        routine_source = []
166
        i = 0
167
        for line in self._routine_source_code_lines:
168
            new_line = line
169
            self._replace['__LINE__'] = "'%d'" % (i + 1)
170
            for search, replace in self._replace.items():
171
                tmp = re.findall(search, new_line, re.IGNORECASE)
172
                if tmp:
173
                    new_line = new_line.replace(tmp[0], replace)
174
            routine_source.append(new_line)
175
            i += 1
176
177
        routine_source = "\n".join(routine_source)
178
179
        self._unset_magic_constants()
180
181
        if self._rdbms_old_metadata:
182
            if self._pystratum_old_metadata and self._pystratum_old_metadata['designation'] == \
183
                    self._pystratum_metadata['designation']:
184
                p = re.compile(r'(create\s+(procedure|function))', re.IGNORECASE)
185
                matches = p.findall(routine_source)
186
                if matches:
187
                    routine_source = routine_source.replace(matches[0][0], 'alter %s' % matches[0][1])
188
                else:
189
                    raise LoaderException("Unable to find the stored routine type in modified source of file '{0}'".
190
                                          format(self._source_filename))
191
            else:
192
                self._drop_routine()
193
194
        self._dl.execute_none(routine_source)
195
        self._dl.commit()
196
197
    # ------------------------------------------------------------------------------------------------------------------
198
    def _must_reload(self) -> bool:
199
        """
200
        Returns True if the source file must be load or reloaded. Otherwise returns False.
201
202
        :rtype: bool
203
        """
204
        if not self._pystratum_old_metadata:
205
            return True
206
207
        if self._pystratum_old_metadata['timestamp'] != self._m_time:
208
            return True
209
210
        if self._pystratum_old_metadata['replace']:
211
            for key, value in self._pystratum_old_metadata['replace'].items():
212
                if key.lower() not in self._replace_pairs or self._replace_pairs[key.lower()] != value:
213
                    return True
214
215
        if not self._rdbms_old_metadata:
216
            return True
217
218
        return False
219
220
    # ------------------------------------------------------------------------------------------------------------------
221
    def _update_metadata(self) -> None:
222
        """
223
        Updates the metadata of the stored routine.
224
        """
225
        # Update general metadata.
226
        RoutineLoaderHelper._update_metadata(self)
227
228
        # Update SQL Server specific metadata.
229
        self._pystratum_metadata['schema_name'] = self._routines_schema_name
230
231
        # Update SQL Server specific metadata.
232
        self._pystratum_metadata['routine_base_name'] = self._routine_base_name
233
234
# ----------------------------------------------------------------------------------------------------------------------
235