MsSqlRoutineLoaderHelper._must_reload()   B
last analyzed

Complexity

Conditions 8

Size

Total Lines 21
Code Lines 12

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 12
dl 0
loc 21
rs 7.3333
c 0
b 0
f 0
cc 8
nop 1
1
import re
2
3
from pystratum_backend.StratumStyle import StratumStyle
4
from pystratum_common.exception.LoaderException import LoaderException
5
from pystratum_common.helper.DataTypeHelper import DataTypeHelper
6
from pystratum_common.helper.RoutineLoaderHelper import RoutineLoaderHelper
7
8
from pystratum_mssql.helper.MsSqlDataTypeHelper import MsSqlDataTypeHelper
9
from pystratum_mssql.MsSqlMetadataDataLayer import MsSqlMetadataDataLayer
10
11
12
class MsSqlRoutineLoaderHelper(RoutineLoaderHelper):
13
    """
14
    Class for loading a single stored routine into a SQL Server instance from a (pseudo) SQL file.
15
    """
16
17
    # ------------------------------------------------------------------------------------------------------------------
18
    def __init__(self,
19
                 io: StratumStyle,
20
                 dl: MsSqlMetadataDataLayer,
21
                 routine_filename,
22
                 routine_file_encoding,
23
                 pystratum_old_metadata,
24
                 replace_pairs,
25
                 rdbms_old_metadata):
26
        """
27
        Object constructor.
28
29
        :param str routine_filename: The filename of the source of the stored routine.
30
        :param str routine_file_encoding: The encoding of the source file.
31
        :param dict pystratum_old_metadata: The metadata of the stored routine from PyStratum.
32
        :param dict[str,str] replace_pairs: A map from placeholders to their actual values.
33
        :param dict rdbms_old_metadata: The old metadata of the stored routine from MS SQL Server.
34
        :param pystratum.style.PyStratumStyle.PyStratumStyle io: The output decorator.
35
        """
36
        RoutineLoaderHelper.__init__(self,
37
                                     io,
38
                                     routine_filename,
39
                                     routine_file_encoding,
40
                                     pystratum_old_metadata,
41
                                     replace_pairs,
42
                                     rdbms_old_metadata)
43
44
        self._routine_base_name: str = ''
45
        """
46
        The name of the stored routine without schema name.
47
        """
48
49
        self._routines_schema_name: str = ''
50
        """
51
        The name of the schema of the stored routine.
52
        """
53
54
        self._dl: MsSqlMetadataDataLayer = dl
55
        """
56
        The metadata layer.
57
        """
58
59
    # ------------------------------------------------------------------------------------------------------------------
60
    def _drop_routine(self) -> None:
61
        """
62
        Drops the stored routine if it exists.
63
        """
64
        if self._rdbms_old_metadata:
65
            if self._rdbms_old_metadata['routine_type'].strip() == 'P':
66
                routine_type = 'procedure'
67
            elif self._rdbms_old_metadata['routine_type'].strip() in ('FN', 'TF'):
68
                routine_type = 'function'
69
            else:
70
                raise Exception("Unknown routine type '{0}'.".format(self._rdbms_old_metadata['routine_type']))
71
72
            self._dl.drop_stored_routine(routine_type,
73
                                         self._rdbms_old_metadata['schema_name'],
74
                                         self._routine_base_name)
75
76
    # ------------------------------------------------------------------------------------------------------------------
77
    def _get_bulk_insert_table_columns_info(self) -> None:
78
        """
79
        Gets the column names and column types of the current table for bulk insert.
80
        """
81
        raise NotImplementedError()
82
83
    # ------------------------------------------------------------------------------------------------------------------
84
    def _get_data_type_helper(self) -> DataTypeHelper:
85
        """
86
        Returns a data type helper object for SQL Server.
87
88
        :rtype: pystratum.helper.DataTypeHelper.DataTypeHelper
89
        """
90
        return MsSqlDataTypeHelper()
91
92
    # ------------------------------------------------------------------------------------------------------------------
93
    def _get_name(self) -> None:
94
        """
95
        Extracts the name of the stored routine and the stored routine type (i.e. procedure or function) source.
96
        """
97
        p = re.compile(r"create\s+(procedure|function)\s+(?:(\w+)\.([a-zA-Z0-9_]+))", re.IGNORECASE)
98
        matches = p.findall(self._routine_source_code)
99
100
        if matches:
101
            self._routine_type = matches[0][0].lower()
102
            self._routines_schema_name = matches[0][1]
103
            self._routine_base_name = matches[0][2]
104
105
            if self._routine_name != (matches[0][1] + '.' + matches[0][2]):
106
                raise LoaderException(
107
                        'Stored routine name {0}.{1} does not match filename in file {2}'.format(matches[0][1],
108
                                                                                                 matches[0][2],
109
                                                                                                 self._source_filename))
110
111
        if not self._routine_type:
112
            raise LoaderException('Unable to find the stored routine name and type in file {0}'.
113
                                  format(self._source_filename))
114
115
    # ------------------------------------------------------------------------------------------------------------------
116
    def _get_routine_parameters_info(self) -> None:
117
        """
118
        Retrieves information about the stored routine parameters from the meta data of SQL Server.
119
        """
120
        routine_parameters = self._dl.get_routine_parameters(self._routines_schema_name,
121
                                                             self._routine_base_name)
122
        if len(routine_parameters) != 0:
123
            for routine_parameter in routine_parameters:
124
                if routine_parameter['parameter_name']:
125
                    parameter_name = routine_parameter['parameter_name'][1:]
126
                    value = routine_parameter['type_name']
127
128
                    self._parameters.append({'name':                 parameter_name,
129
                                             'data_type':            routine_parameter['type_name'],
130
                                             'numeric_precision':    routine_parameter['precision'],
131
                                             'numeric_scale':        routine_parameter['scale'],
132
                                             'data_type_descriptor': value})
133
134
    # ------------------------------------------------------------------------------------------------------------------
135
    def _is_start_of_stored_routine(self, line: str) -> bool:
136
        """
137
        Returns True if a line is the start of the code of the stored routine.
138
139
        :param str line: The line with source code of the stored routine.
140
141
        :rtype: bool
142
        """
143
        return re.match(r'^\s*create\s+(procedure|function)', line, re.IGNORECASE) is not None
144
145
    # ------------------------------------------------------------------------------------------------------------------
146
    def _is_start_of_stored_routine_body(self, line: str) -> bool:
147
        """
148
        Returns True if a line is the start of the body of the stored routine.
149
150
        :param str line: The line with source code of the stored routine.
151
152
        :rtype: bool
153
        """
154
        return re.match(r'^\s*as', line, re.IGNORECASE) is not None
155
156
    # ------------------------------------------------------------------------------------------------------------------
157
    def _load_routine_file(self) -> None:
158
        """
159
        Loads the stored routine into the SQL Server instance.
160
        """
161
        self._io.text('Loading {0} <dbo>{1}</dbo>'.format(self._routine_type, self._routine_name))
162
163
        self._set_magic_constants()
164
165
        routine_source = []
166
        i = 0
167
        for line in self._routine_source_code_lines:
168
            new_line = line
169
            self._replace['__LINE__'] = "'%d'" % (i + 1)
170
            for search, replace in self._replace.items():
171
                tmp = re.findall(search, new_line, re.IGNORECASE)
172
                if tmp:
173
                    new_line = new_line.replace(tmp[0], replace)
174
            routine_source.append(new_line)
175
            i += 1
176
177
        routine_source = "\n".join(routine_source)
178
179
        self._unset_magic_constants()
180
181
        if self._rdbms_old_metadata:
182
            if self._pystratum_old_metadata and self._pystratum_old_metadata['designation'] == \
183
                    self._pystratum_metadata['designation']:
184
                p = re.compile(r'(create\s+(procedure|function))', re.IGNORECASE)
185
                matches = p.findall(routine_source)
186
                if matches:
187
                    routine_source = routine_source.replace(matches[0][0], 'alter %s' % matches[0][1])
188
                else:
189
                    raise LoaderException("Unable to find the stored routine type in modified source of file '{0}'".
190
                                          format(self._source_filename))
191
            else:
192
                self._drop_routine()
193
194
        self._dl.execute_none(routine_source)
195
        self._dl.commit()
196
197
    # ------------------------------------------------------------------------------------------------------------------
198
    def _must_reload(self) -> bool:
199
        """
200
        Returns True if the source file must be load or reloaded. Otherwise returns False.
201
202
        :rtype: bool
203
        """
204
        if not self._pystratum_old_metadata:
205
            return True
206
207
        if self._pystratum_old_metadata['timestamp'] != self._m_time:
208
            return True
209
210
        if self._pystratum_old_metadata['replace']:
211
            for key, value in self._pystratum_old_metadata['replace'].items():
212
                if key.lower() not in self._replace_pairs or self._replace_pairs[key.lower()] != value:
213
                    return True
214
215
        if not self._rdbms_old_metadata:
216
            return True
217
218
        return False
219
220
    # ------------------------------------------------------------------------------------------------------------------
221
    def _update_metadata(self) -> None:
222
        """
223
        Updates the metadata of the stored routine.
224
        """
225
        # Update general metadata.
226
        RoutineLoaderHelper._update_metadata(self)
227
228
        # Update SQL Server specific metadata.
229
        self._pystratum_metadata['schema_name'] = self._routines_schema_name
230
231
        # Update SQL Server specific metadata.
232
        self._pystratum_metadata['routine_base_name'] = self._routine_base_name
233
234
# ----------------------------------------------------------------------------------------------------------------------
235