pystratum_pgsql.helper.PgSqlRoutineLoaderHelper   A
last analyzed

Complexity

Total Complexity 36

Size/Duplication

Total Lines 251
Duplicated Lines 0 %

Test Coverage

Coverage 57.94%

Importance

Changes 0
Metric Value
eloc 128
dl 0
loc 251
ccs 62
cts 107
cp 0.5794
rs 9.52
c 0
b 0
f 0
wmc 36

12 Methods

Rating   Name   Duplication   Size   Complexity  
B PgSqlRoutineLoaderHelper.get_bulk_insert_table_columns_info() 0 31 5
A PgSqlRoutineLoaderHelper._get_bulk_insert_table_columns_info() 0 5 1
A PgSqlRoutineLoaderHelper._get_data_type_helper() 0 7 1
A PgSqlRoutineLoaderHelper._get_name() 0 26 4
A PgSqlRoutineLoaderHelper.__init__() 0 29 1
A PgSqlRoutineLoaderHelper._is_start_of_stored_routine_body() 0 9 1
A PgSqlRoutineLoaderHelper._is_start_of_stored_routine() 0 9 1
A PgSqlRoutineLoaderHelper._get_routine_parameters_info() 0 12 3
A PgSqlRoutineLoaderHelper._drop_routine() 0 8 2
B PgSqlRoutineLoaderHelper._must_reload() 0 21 8
A PgSqlRoutineLoaderHelper._load_routine_file() 0 28 4
A PgSqlRoutineLoaderHelper._log_exception() 0 23 5
1 1
import re
2 1
from typing import Dict
3
4 1
from psycopg2 import ProgrammingError
5 1
from pystratum_backend.StratumStyle import StratumStyle
6 1
from pystratum_common.helper.DataTypeHelper import DataTypeHelper
7 1
from pystratum_common.helper.RoutineLoaderHelper import RoutineLoaderHelper
8
9 1
from pystratum_pgsql.helper.PgSqlDataTypeHelper import PgSqlDataTypeHelper
10 1
from pystratum_pgsql.PgSqlMetadataDataLayer import PgSqlMetadataDataLayer
11
12
13 1
class PgSqlRoutineLoaderHelper(RoutineLoaderHelper):
14
    """
15
    Class for loading a single stored routine into a PostgreSQL instance from a (pseudo) SQL file.
16
    """
17
18
    # ------------------------------------------------------------------------------------------------------------------
19 1
    def __init__(self,
20
                 io: StratumStyle,
21
                 dl: PgSqlMetadataDataLayer,
22
                 routine_filename: str,
23
                 routine_file_encoding: str,
24
                 pystratum_old_metadata: Dict,
25
                 replace_pairs: Dict[str, str],
26
                 rdbms_old_metadata: Dict):
27
        """
28
        Object constructor.
29
                                
30
        :param PyStratumStyle io: The output decorator.
31
        :param PgSqlMetadataDataLayer dl: The metadata layer.
32
        :param str routine_filename: The filename of the source of the stored routine.
33
        :param str routine_file_encoding: The encoding of the source file.
34
        :param dict pystratum_old_metadata: The metadata of the stored routine from PyStratum.
35
        :param dict[str,str] replace_pairs: A map from placeholders to their actual values.
36
        :param dict rdbms_old_metadata: The old metadata of the stored routine from MS SQL Server.
37
        """
38 1
        RoutineLoaderHelper.__init__(self,
39
                                     io,
40
                                     routine_filename,
41
                                     routine_file_encoding,
42
                                     pystratum_old_metadata,
43
                                     replace_pairs,
44
                                     rdbms_old_metadata)
45
46 1
        self._dl: PgSqlMetadataDataLayer = dl
47 1
        """
48
        The metadata layer.
49
        """
50
51
    # ------------------------------------------------------------------------------------------------------------------
52 1
    def _must_reload(self):
53
        """
54
        Returns True if the source file must be load or reloaded. Otherwise returns False.
55
56
        :rtype: bool
57
        """
58 1
        if not self._pystratum_old_metadata:
59 1
            return True
60
61
        if self._pystratum_old_metadata['timestamp'] != self._m_time:
62
            return True
63
64
        if self._pystratum_old_metadata['replace']:
65
            for key, value in self._pystratum_old_metadata['replace'].items():
66
                if key.lower() not in self._replace_pairs or self._replace_pairs[key.lower()] != value:
67
                    return True
68
69
        if not self._rdbms_old_metadata:
70
            return True
71
72
        return False
73
74
    # ------------------------------------------------------------------------------------------------------------------
75 1
    def _get_name(self) -> bool:
76
        """
77
        Extracts the name of the stored routine and the stored routine type (i.e. procedure or function) source.
78
        Returns True on success, False otherwise.
79
80
        :rtype: bool
81
        """
82 1
        ret = True
83 1
        pattern = re.compile(r'create\s+(function)\s+([a-zA-Z0-9_]+)')
84 1
        matches = pattern.findall(self._routine_source_code)
85
86 1
        if matches:
87 1
            self._routine_type = matches[0][0].lower()
88
89 1
            if self._routine_name != matches[0][1]:
90
                self._io.error('Stored routine name <dbo>{0}</dbo> does not match filename in file <fso>{1}</fso>'.
91
                               format(matches[0][1], self._source_filename))
92
                ret = False
93
        else:
94
            ret = False
95
96 1
        if not self._routine_type:
97
            self._io.error('Unable to find the stored routine name and type in file <fso>{0}</fso>'.
98
                           format(self._source_filename))
99
100 1
        return ret
101
102
    # ------------------------------------------------------------------------------------------------------------------
103 1
    def _get_data_type_helper(self) -> DataTypeHelper:
104
        """
105
        Returns a data type helper object or PostgreSQL.
106
107
        :rtype: DataTypeHelper
108
        """
109 1
        return PgSqlDataTypeHelper()
110
111
    # ------------------------------------------------------------------------------------------------------------------
112 1
    def _is_start_of_stored_routine(self, line: str) -> bool:
113
        """
114
        Returns True if a line is the start of the code of the stored routine.
115
116
        :param str line: The line with source code of the stored routine.
117
118
        :rtype: bool
119
        """
120 1
        return re.match(r'^\s*create\s+(function)', line, re.IGNORECASE) is not None
121
122
    # ------------------------------------------------------------------------------------------------------------------
123 1
    def _is_start_of_stored_routine_body(self, line: str) -> bool:
124
        """
125
        Returns True if a line is the start of the body of the stored routine.
126
127
        :param str line: The line with source code of the stored routine.
128
129
        :rtype: bool
130
        """
131 1
        return re.match(r'^\s*begin', line, re.IGNORECASE) is not None
132
133
    # ------------------------------------------------------------------------------------------------------------------
134 1
    def _load_routine_file(self) -> None:
135
        """
136
        Loads the stored routine into the PostgreSQL instance.
137
        """
138 1
        self._io.text('Loading {0} <dbo>{1}</dbo>'.format(self._routine_type, self._routine_name))
139
140 1
        self._set_magic_constants()
141
142 1
        routine_source = []
143 1
        i = 0
144 1
        for line in self._routine_source_code_lines:
145 1
            new_line = line
146 1
            self._replace['__LINE__'] = "'%d'" % (i + 1)
147 1
            for search, replace in self._replace.items():
148 1
                tmp = re.findall(search, new_line, re.IGNORECASE)
149 1
                if tmp:
150 1
                    new_line = new_line.replace(tmp[0], replace)
151 1
            routine_source.append(new_line)
152 1
            i += 1
153
154 1
        routine_source = "\n".join(routine_source)
155
156 1
        self._unset_magic_constants()
157 1
        self._drop_routine()
158
159 1
        self._dl.commit()
160 1
        self._dl.execute_none(routine_source)
161 1
        self._dl.commit()
162
163
    # ------------------------------------------------------------------------------------------------------------------
164 1
    def _log_exception(self, exception: Exception) -> None:
165
        """
166
        Logs an exception.
167
168
        :param Exception exception: The exception.
169
        """
170
        self._dl.rollback()
171
172
        RoutineLoaderHelper._log_exception(self, exception)
173
174
        if isinstance(exception, ProgrammingError):
175
            if 'syntax error at or near' in str(exception):
176
                cursor = exception.cursor
177
                if cursor:
178
                    sql = str(cursor.query, 'utf-8').rstrip()
179
180
                    parts = re.search(r'LINE (\d+):', str(exception))
181
                    if parts:
182
                        error_line = int(parts.group(1))
183
                    else:
184
                        error_line = 0
185
186
                    self._print_sql_with_error(sql, error_line)
187
188
    # ------------------------------------------------------------------------------------------------------------------
189 1
    def _get_bulk_insert_table_columns_info(self) -> None:
190
        """
191
        Gets the column names and column types of the current table for bulk insert.
192
        """
193
        raise NotImplementedError()
194
195
    # ------------------------------------------------------------------------------------------------------------------
196 1
    def get_bulk_insert_table_columns_info(self) -> None:
197
        """
198
        Gets the column names and column types of the current table for bulk insert.
199
        """
200
        table_is_non_temporary = self._dl.check_table_exists(self._table_name)
201
202
        if not table_is_non_temporary:
203
            self._dl.call_stored_routine(self._routine_name)
204
205
        columns = self._dl.describe_table(self._table_name)
206
207
        tmp_column_types = []
208
        tmp_fields = []
209
210
        n1 = 0
211
        for column in columns:
212
            c_type = re.findall(r'(\\w+)', column['Type'])
213
            tmp_column_types.append(c_type[0])
214
            tmp_fields.append(column['Field'])
215
            n1 += 1
216
217
        n2 = len(self._columns)
218
219
        if not table_is_non_temporary:
220
            self._dl.drop_temporary_table(self._table_name)
221
222
        if n1 != n2:
223
            raise Exception("Number of fields %d and number of columns %d don't match." % (n1, n2))
224
225
        self._columns_types = tmp_column_types
226
        self._fields = tmp_fields
227
228
    # ------------------------------------------------------------------------------------------------------------------
229 1
    def _get_routine_parameters_info(self) -> None:
230
        """
231
        Retrieves information about the stored routine parameters from the meta data of PostgreSQL.
232
        """
233 1
        routine_parameters = self._dl.get_routine_parameters(self._routine_name)
234 1
        for routine_parameter in routine_parameters:
235 1
            if routine_parameter['parameter_name']:
236 1
                value = routine_parameter['column_type']
237
238 1
                self._parameters.append({'name':                 routine_parameter['parameter_name'],
239
                                         'data_type':            routine_parameter['parameter_type'],
240
                                         'data_type_descriptor': value})
241
242
    # ------------------------------------------------------------------------------------------------------------------
243 1
    def _drop_routine(self) -> None:
244
        """
245
        Drops the stored routine if it exists.
246
        """
247 1
        if self._rdbms_old_metadata:
248
            self._dl.drop_stored_routine(self._rdbms_old_metadata['routine_type'],
249
                                         self._routine_name,
250
                                         self._rdbms_old_metadata['routine_args'])
251
252
# ----------------------------------------------------------------------------------------------------------------------
253