PgSqlRoutineLoaderHelper.__init__()   A
last analyzed

Complexity

Conditions 1

Size

Total Lines 29
Code Lines 16

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 4
CRAP Score 1

Importance

Changes 0
Metric Value
cc 1
eloc 16
nop 8
dl 0
loc 29
ccs 4
cts 4
cp 1
crap 1
rs 9.6
c 0
b 0
f 0

How to fix   Many Parameters   

Many Parameters

Methods with many parameters are not only hard to understand, but their parameters also often become inconsistent when you need more, or different data.

There are several approaches to avoid long parameter lists:

1 1
import re
2 1
from typing import Dict
3
4 1
from psycopg2 import ProgrammingError
5 1
from pystratum_backend.StratumStyle import StratumStyle
6 1
from pystratum_common.helper.DataTypeHelper import DataTypeHelper
7 1
from pystratum_common.helper.RoutineLoaderHelper import RoutineLoaderHelper
8
9 1
from pystratum_pgsql.helper.PgSqlDataTypeHelper import PgSqlDataTypeHelper
10 1
from pystratum_pgsql.PgSqlMetadataDataLayer import PgSqlMetadataDataLayer
11
12
13 1
class PgSqlRoutineLoaderHelper(RoutineLoaderHelper):
14
    """
15
    Class for loading a single stored routine into a PostgreSQL instance from a (pseudo) SQL file.
16
    """
17
18
    # ------------------------------------------------------------------------------------------------------------------
19 1
    def __init__(self,
20
                 io: StratumStyle,
21
                 dl: PgSqlMetadataDataLayer,
22
                 routine_filename: str,
23
                 routine_file_encoding: str,
24
                 pystratum_old_metadata: Dict,
25
                 replace_pairs: Dict[str, str],
26
                 rdbms_old_metadata: Dict):
27
        """
28
        Object constructor.
29
                                
30
        :param PyStratumStyle io: The output decorator.
31
        :param PgSqlMetadataDataLayer dl: The metadata layer.
32
        :param str routine_filename: The filename of the source of the stored routine.
33
        :param str routine_file_encoding: The encoding of the source file.
34
        :param dict pystratum_old_metadata: The metadata of the stored routine from PyStratum.
35
        :param dict[str,str] replace_pairs: A map from placeholders to their actual values.
36
        :param dict rdbms_old_metadata: The old metadata of the stored routine from MS SQL Server.
37
        """
38 1
        RoutineLoaderHelper.__init__(self,
39
                                     io,
40
                                     routine_filename,
41
                                     routine_file_encoding,
42
                                     pystratum_old_metadata,
43
                                     replace_pairs,
44
                                     rdbms_old_metadata)
45
46 1
        self._dl: PgSqlMetadataDataLayer = dl
47 1
        """
48
        The metadata layer.
49
        """
50
51
    # ------------------------------------------------------------------------------------------------------------------
52 1
    def _must_reload(self):
53
        """
54
        Returns True if the source file must be load or reloaded. Otherwise returns False.
55
56
        :rtype: bool
57
        """
58 1
        if not self._pystratum_old_metadata:
59 1
            return True
60
61
        if self._pystratum_old_metadata['timestamp'] != self._m_time:
62
            return True
63
64
        if self._pystratum_old_metadata['replace']:
65
            for key, value in self._pystratum_old_metadata['replace'].items():
66
                if key.lower() not in self._replace_pairs or self._replace_pairs[key.lower()] != value:
67
                    return True
68
69
        if not self._rdbms_old_metadata:
70
            return True
71
72
        return False
73
74
    # ------------------------------------------------------------------------------------------------------------------
75 1
    def _get_name(self) -> bool:
76
        """
77
        Extracts the name of the stored routine and the stored routine type (i.e. procedure or function) source.
78
        Returns True on success, False otherwise.
79
80
        :rtype: bool
81
        """
82 1
        ret = True
83 1
        pattern = re.compile(r'create\s+(function)\s+([a-zA-Z0-9_]+)')
84 1
        matches = pattern.findall(self._routine_source_code)
85
86 1
        if matches:
87 1
            self._routine_type = matches[0][0].lower()
88
89 1
            if self._routine_name != matches[0][1]:
90
                self._io.error('Stored routine name <dbo>{0}</dbo> does not match filename in file <fso>{1}</fso>'.
91
                               format(matches[0][1], self._source_filename))
92
                ret = False
93
        else:
94
            ret = False
95
96 1
        if not self._routine_type:
97
            self._io.error('Unable to find the stored routine name and type in file <fso>{0}</fso>'.
98
                           format(self._source_filename))
99
100 1
        return ret
101
102
    # ------------------------------------------------------------------------------------------------------------------
103 1
    def _get_data_type_helper(self) -> DataTypeHelper:
104
        """
105
        Returns a data type helper object or PostgreSQL.
106
107
        :rtype: DataTypeHelper
108
        """
109 1
        return PgSqlDataTypeHelper()
110
111
    # ------------------------------------------------------------------------------------------------------------------
112 1
    def _is_start_of_stored_routine(self, line: str) -> bool:
113
        """
114
        Returns True if a line is the start of the code of the stored routine.
115
116
        :param str line: The line with source code of the stored routine.
117
118
        :rtype: bool
119
        """
120 1
        return re.match(r'^\s*create\s+(function)', line, re.IGNORECASE) is not None
121
122
    # ------------------------------------------------------------------------------------------------------------------
123 1
    def _is_start_of_stored_routine_body(self, line: str) -> bool:
124
        """
125
        Returns True if a line is the start of the body of the stored routine.
126
127
        :param str line: The line with source code of the stored routine.
128
129
        :rtype: bool
130
        """
131 1
        return re.match(r'^\s*begin', line, re.IGNORECASE) is not None
132
133
    # ------------------------------------------------------------------------------------------------------------------
134 1
    def _load_routine_file(self) -> None:
135
        """
136
        Loads the stored routine into the PostgreSQL instance.
137
        """
138 1
        self._io.text('Loading {0} <dbo>{1}</dbo>'.format(self._routine_type, self._routine_name))
139
140 1
        self._set_magic_constants()
141
142 1
        routine_source = []
143 1
        i = 0
144 1
        for line in self._routine_source_code_lines:
145 1
            new_line = line
146 1
            self._replace['__LINE__'] = "'%d'" % (i + 1)
147 1
            for search, replace in self._replace.items():
148 1
                tmp = re.findall(search, new_line, re.IGNORECASE)
149 1
                if tmp:
150 1
                    new_line = new_line.replace(tmp[0], replace)
151 1
            routine_source.append(new_line)
152 1
            i += 1
153
154 1
        routine_source = "\n".join(routine_source)
155
156 1
        self._unset_magic_constants()
157 1
        self._drop_routine()
158
159 1
        self._dl.commit()
160 1
        self._dl.execute_none(routine_source)
161 1
        self._dl.commit()
162
163
    # ------------------------------------------------------------------------------------------------------------------
164 1
    def _log_exception(self, exception: Exception) -> None:
165
        """
166
        Logs an exception.
167
168
        :param Exception exception: The exception.
169
        """
170
        self._dl.rollback()
171
172
        RoutineLoaderHelper._log_exception(self, exception)
173
174
        if isinstance(exception, ProgrammingError):
175
            if 'syntax error at or near' in str(exception):
176
                cursor = exception.cursor
177
                if cursor:
178
                    sql = str(cursor.query, 'utf-8').rstrip()
179
180
                    parts = re.search(r'LINE (\d+):', str(exception))
181
                    if parts:
182
                        error_line = int(parts.group(1))
183
                    else:
184
                        error_line = 0
185
186
                    self._print_sql_with_error(sql, error_line)
187
188
    # ------------------------------------------------------------------------------------------------------------------
189 1
    def _get_bulk_insert_table_columns_info(self) -> None:
190
        """
191
        Gets the column names and column types of the current table for bulk insert.
192
        """
193
        raise NotImplementedError()
194
195
    # ------------------------------------------------------------------------------------------------------------------
196 1
    def get_bulk_insert_table_columns_info(self) -> None:
197
        """
198
        Gets the column names and column types of the current table for bulk insert.
199
        """
200
        table_is_non_temporary = self._dl.check_table_exists(self._table_name)
201
202
        if not table_is_non_temporary:
203
            self._dl.call_stored_routine(self._routine_name)
204
205
        columns = self._dl.describe_table(self._table_name)
206
207
        tmp_column_types = []
208
        tmp_fields = []
209
210
        n1 = 0
211
        for column in columns:
212
            c_type = re.findall(r'(\\w+)', column['Type'])
213
            tmp_column_types.append(c_type[0])
214
            tmp_fields.append(column['Field'])
215
            n1 += 1
216
217
        n2 = len(self._columns)
218
219
        if not table_is_non_temporary:
220
            self._dl.drop_temporary_table(self._table_name)
221
222
        if n1 != n2:
223
            raise Exception("Number of fields %d and number of columns %d don't match." % (n1, n2))
224
225
        self._columns_types = tmp_column_types
226
        self._fields = tmp_fields
227
228
    # ------------------------------------------------------------------------------------------------------------------
229 1
    def _get_routine_parameters_info(self) -> None:
230
        """
231
        Retrieves information about the stored routine parameters from the meta data of PostgreSQL.
232
        """
233 1
        routine_parameters = self._dl.get_routine_parameters(self._routine_name)
234 1
        for routine_parameter in routine_parameters:
235 1
            if routine_parameter['parameter_name']:
236 1
                value = routine_parameter['column_type']
237
238 1
                self._parameters.append({'name':                 routine_parameter['parameter_name'],
239
                                         'data_type':            routine_parameter['parameter_type'],
240
                                         'data_type_descriptor': value})
241
242
    # ------------------------------------------------------------------------------------------------------------------
243 1
    def _drop_routine(self) -> None:
244
        """
245
        Drops the stored routine if it exists.
246
        """
247 1
        if self._rdbms_old_metadata:
248
            self._dl.drop_stored_routine(self._rdbms_old_metadata['routine_type'],
249
                                         self._routine_name,
250
                                         self._rdbms_old_metadata['routine_args'])
251
252
# ----------------------------------------------------------------------------------------------------------------------
253