MsSqlDataLayer.commit()   A
last analyzed

Complexity

Conditions 1

Size

Total Lines 6
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 2
dl 0
loc 6
rs 10
c 0
b 0
f 0
cc 1
nop 1
1
import csv
2
import re
3
import sys
4
from time import gmtime, strftime
5
from typing import Any, Dict, List, Optional
6
7
from pystratum_middle.exception.ResultException import ResultException
8
9
from pystratum_mssql.MsSqlConnector import MsSqlConnector
10
11
12
class MsSqlDataLayer:
13
    """
14
    Class for connecting to a SQL Server instance and executing SQL statements. Also, a parent class for classes with
15
    static wrapper methods for executing stored procedures and functions.
16
    """
17
    # ------------------------------------------------------------------------------------------------------------------
18
    _suppress_superfluous_messages = True
19
    """
20
    If set superfluous messages like below will be suppressed:
21
    * "Warning: Null value is eliminated by an aggregate or other SET operation."
22
    * The module ... depends on the missing object .... The module will still be created; however, it cannot run
23
      successfully until the object exists.
24
25
    :type: bool
26
    """
27
28
    line_buffered = True
29
    """
30
    If True log messages from stored procedures with designation type 'log' are line buffered (Note: In python
31
    sys.stdout is buffered by default).
32
33
    :type: bool
34
    """
35
36
    # ------------------------------------------------------------------------------------------------------------------
37
    def __init__(self, connector: MsSqlConnector):
38
        """
39
        Object constructor.
40
        """
41
42
        self.__connector: MsSqlConnector = connector
43
        """
44
        The object for connecting to a MySQL instance.
45
        """
46
47
        self.__conn: Optional[Any] = None
48
        """
49
        The connection between Python and the MySQL instance.
50
        """
51
52
    # ------------------------------------------------------------------------------------------------------------------
53
    def autocommit(self, status: bool) -> None:
54
        """
55
        Sets auto commit mode.
56
        See http://pymssql.org/en/stable/ref/pymssql.html#pymssql.Connection.autocommit.
57
58
        :param bool status: True: Auto commit on. False: Auto commit off.
59
        """
60
        self.__conn.autocommit(status)
61
62
    # ------------------------------------------------------------------------------------------------------------------
63
    def commit(self) -> None:
64
        """
65
        Commits the current transaction.
66
        See http://pymssql.org/en/stable/ref/pymssql.html#pymssql.Connection.commit.
67
        """
68
        self.__conn.commit()
69
70
    # ------------------------------------------------------------------------------------------------------------------
71
    def connect(self) -> None:
72
        """
73
        Connects to a MS SQL Server instance.
74
        """
75
        self.__conn = self.__connector.connect()
76
77
    # ------------------------------------------------------------------------------------------------------------------
78
    def disconnect(self) -> None:
79
        """
80
        Disconnects from the MS SQL Server instance.
81
        See http://pymssql.org/en/stable/ref/pymssql.html#pymssql.Connection.close.
82
        """
83
        self.__conn = None
84
        self.__connector.disconnect()
85
86
    # ------------------------------------------------------------------------------------------------------------------
87
    def execute_csv(self, sql: str, filename: str, dialect: str = 'unix', encoding: str = 'utf-8') -> int:
88
        file = open(filename, 'w', encoding=encoding)
89
        csv_file = csv.writer(file, dialect=dialect)
90
91
        # Run the query.
92
        cursor = self.__conn.cursor(as_dict=False)
93
        cursor.execute(sql)
94
95
        # Store all rows in CSV format in the file.
96
        n = 0
97
        for row in cursor:
98
            csv_file.writerow(row)
99
            n += 1
100
101
        # Close the CSV file and the cursor.
102
        file.close()
103
        cursor.close()
104
105
        return n
106
107
    # ------------------------------------------------------------------------------------------------------------------
108
    def execute_log(self, sql: str, *params) -> int:
109
        """
110
        Executes a query with log statements. Returns the number of lines in the log.
111
112
        :param str sql: The SQL statement.
113
        :param iterable params: The parameters.
114
115
        :rtype: int
116
        """
117
        cursor = self.__conn.cursor()
118
        cursor.execute(sql, params)
119
120
        n = 0
121
        next_set = True
122
        while next_set:
123
            stamp = strftime('%Y-%m-%d %H:%M:%S', gmtime())
124
            for row in cursor:
125
                print(stamp, end='')
126
                for field in row:
127
                    print(' %s' % field, end='')
128
                print('', flush=self.line_buffered)
129
                n += 1
130
131
            next_set = cursor.nextset()
132
133
        cursor.close()
134
135
        return n
136
137
    # ------------------------------------------------------------------------------------------------------------------
138
    def execute_none(self, sql: str, *params) -> None:
139
        """
140
        Executes a query that does not select any rows.
141
142
        :param str sql: The SQL statement.
143
        :param iterable params: The parameters.
144
145
        :rtype: None
146
        """
147
        cursor = self.__conn.cursor()
148
        cursor.execute(sql, *params)
149
        cursor.close()
150
151
    # ------------------------------------------------------------------------------------------------------------------
152 View Code Duplication
    def execute_row0(self, sql, *params) -> Optional[Dict[str, Any]]:
153
        """
154
        Executes a query that selects 0 or 1 row. Returns the selected row or None.
155
156
        :param str sql: The SQL statement.
157
        :param iterable params: The parameters.
158
159
        :rtype: None|dict[str,*]
160
        """
161
        cursor = self.__conn.cursor(as_dict=True)
162
        cursor.execute(sql, *params)
163
        rows = cursor.fetchall()
164
        cursor.close()
165
166
        n = len(rows)
167
        if n == 1:
168
            return rows[0]
169
        elif n == 0:
170
            return None
171
        else:
172
            raise ResultException('0 or 1', n, sql)
173
174
    # ------------------------------------------------------------------------------------------------------------------
175
    def execute_row1(self, sql: str, *params) -> Dict[str, Any]:
176
        """
177
        Executes a query that selects 1 row. Returns the selected row.
178
179
        :param str sql: The SQL statement.
180
        :param iterable params: The parameters.
181
182
        :rtype: dict[str,*]
183
        """
184
        cursor = self.__conn.cursor(as_dict=True)
185
        cursor.execute(sql, *params)
186
        rows = cursor.fetchall()
187
        cursor.close()
188
189
        n = len(rows)
190
        if n != 1:
191
            raise ResultException('1', n, sql)
192
193
        return rows[0]
194
195
    # ------------------------------------------------------------------------------------------------------------------
196
    def execute_rows(self, sql: str, *params) -> List[Dict[str, Any]]:
197
        """
198
        Executes a query that selects 0 or more rows. Returns the selected rows (an empty list if no rows
199
        are selected).
200
201
        :param str sql: The SQL statement.
202
        :param iterable params: The parameters.
203
204
        :rtype: list[dict[str,*]]
205
        """
206
        cursor = self.__conn.cursor(as_dict=True)
207
        cursor.execute(sql, *params)
208
        rows = cursor.fetchall()
209
        cursor.close()
210
211
        return rows
212
213
    # ------------------------------------------------------------------------------------------------------------------
214
    def execute_singleton0(self, sql: str, *params) -> Any:
215
        """
216
        Executes a query that selects 0 or 1 row with 1 column. Returns the value of selected column or None.
217
218
        :param str sql: The SQL statement.
219
        :param iterable params: The parameters.
220
221
        :rtype: *
222
        """
223
        cursor = self.__conn.cursor()
224
        cursor.execute(sql, *params)
225
        rows = cursor.fetchall()
226
        cursor.close()
227
228
        n = len(rows)
229
        if n == 1:
230
            return rows[0][0]
231
        elif n == 0:
232
            return None
233
        else:
234
            raise ResultException('0 or 1', n, sql)
235
236
    # ------------------------------------------------------------------------------------------------------------------
237
    def execute_singleton1(self, sql: str, *params) -> Any:
238
        """
239
        Executes a query that selects 1 row with 1 column. Returns the value of the selected column.
240
241
        :param str sql:The SQL statement.
242
        :param iterable params: The parameters.
243
244
        :rtype: *
245
        """
246
        cursor = self.__conn.cursor()
247
        cursor.execute(sql, *params)
248
        rows = cursor.fetchall()
249
        cursor.close()
250
251
        n = len(rows)
252
        if n != 1:
253
            raise ResultException('1', n, sql)
254
255
        return rows[0][0]
256
257
    # ------------------------------------------------------------------------------------------------------------------
258
    def execute_sp_none(self, sql: str, *params) -> None:
259
        """
260
        Executes a stored routine that does not select any rows.
261
262
        :param str sql: The SQL calling the stored procedure.
263
        :param iterable params: The parameters for the stored procedure.
264
265
        :rtype: None
266
        """
267
        cursor = self.__conn.cursor()
268
        cursor.execute(sql, params)
269
        cursor.close()
270
271
    # ------------------------------------------------------------------------------------------------------------------
272 View Code Duplication
    def execute_sp_row0(self, sql: str, *params) -> Optional[Dict[str, Any]]:
273
        """
274
        Executes a stored procedure that selects 0 or 1 row. Returns the selected row or None.
275
276
        :param str sql: The SQL call the the stored procedure.
277
        :param iterable params: The parameters for the stored procedure.
278
279
        :rtype: None|dict[str,*]
280
        """
281
        cursor = self.__conn.cursor(as_dict=True)
282
        cursor.execute(sql, params)
283
        rows = cursor.fetchall()
284
        cursor.close()
285
286
        n = len(rows)
287
        if n == 1:
288
            return rows[0]
289
        elif n == 0:
290
            return None
291
        else:
292
            raise ResultException('0 or 1', n, sql)
293
294
    # ------------------------------------------------------------------------------------------------------------------
295
    def execute_sp_row1(self, sql: str, *params) -> Dict[str, Any]:
296
        """
297
        Executes a stored procedure that selects 1 row. Returns the selected row.
298
299
        :param str sql: The SQL calling the the stored procedure.
300
        :param iterable params: The parameters for the stored procedure.
301
302
        :rtype: dict[str,*]
303
        """
304
        cursor = self.__conn.cursor(as_dict=True)
305
        cursor.execute(sql, params)
306
        rows = cursor.fetchall()
307
        cursor.close()
308
309
        n = len(rows)
310
        if n != 1:
311
            raise ResultException('1', n, sql)
312
313
        return rows[0]
314
315
    # ------------------------------------------------------------------------------------------------------------------
316
    def execute_sp_rows(self, sql: str, *params) -> List[Dict[str, Any]]:
317
        """
318
        Executes a stored procedure that selects 0 or more rows. Returns the selected rows (an empty list if no rows
319
        are selected).
320
321
        :param str sql: The SQL calling the the stored procedure.
322
        :param iterable params: The parameters for the stored procedure.
323
324
        :rtype: list[dict[str,*]]
325
        """
326
        cursor = self.__conn.cursor(as_dict=True)
327
        cursor.execute(sql, params)
328
        rows = cursor.fetchall()
329
        cursor.close()
330
331
        return rows
332
333
    # ------------------------------------------------------------------------------------------------------------------
334
    def execute_sp_singleton0(self, sql: str, *params) -> Any:
335
        """
336
        Executes a stored procedure that selects 0 or 1 row with 1 column. Returns the value of selected column or None.
337
338
        :param str sql: The SQL calling the stored procedure.
339
        :param iterable params: The parameters for the stored procedure.
340
341
        :rtype: *
342
        """
343
        cursor = self.__conn.cursor()
344
        cursor.execute(sql, params)
345
        rows = cursor.fetchall()
346
        cursor.close()
347
348
        n = len(rows)
349
        if n == 1:
350
            return rows[0][0]
351
        elif n == 0:
352
            return None
353
        else:
354
            raise ResultException('0 or 1', n, sql)
355
356
    # ------------------------------------------------------------------------------------------------------------------
357
    def execute_sp_singleton1(self, sql: str, *params) -> Any:
358
        """
359
        Executes a stored routine with designation type "table", i.e a stored routine that is expected to select 1 row
360
        with 1 column.
361
362
        :param str sql: The SQL calling the the stored procedure.
363
        :param iterable params: The parameters for the stored procedure.
364
365
        :rtype: * The value of the selected column.
366
        """
367
        cursor = self.__conn.cursor()
368
        cursor.execute(sql, params)
369
        rows = cursor.fetchall()
370
        cursor.close()
371
372
        n = len(rows)
373
        if n != 1:
374
            raise ResultException('1', n, sql)
375
376
        return rows[0][0]
377
378
    # ------------------------------------------------------------------------------------------------------------------
379
    def rollback(self) -> None:
380
        """
381
        Rolls back the current transaction.
382
        See http://pymssql.org/en/stable/ref/pymssql.html#pymssql.Connection.rollback.
383
        """
384
        self.__conn.rollback()
385
386
    # ------------------------------------------------------------------------------------------------------------------
387
    @staticmethod
388
    def stratum_msg_handler(msgstate: str, severity: int, srvname: str, procname: str, line: int, msgtext: bin) -> None:
389
        """
390
        Custom message handler suppressing some superfluous messages.
391
        """
392
        if severity > 0:
393
            print("Error at line %d: %s" % (line, msgtext.decode("utf-8")), file=sys.stderr)
394
        else:
395
            msg = msgtext.decode("utf-8")
396
397
            # Suppress bogus messages if flag is set.
398
            if MsSqlDataLayer._suppress_superfluous_messages:
399
                # @todo Make this method more flexible by using two lists. One with strings and one on regex to
400
                # suppress.
401
                if msg == 'Warning: Null value is eliminated by an aggregate or other SET operation.':
402
                    return
403
404
                if re.match(
405
                        "^The module \'.*\' depends on the missing object \'.*\'. The module will still be created; "
406
                        "however, it cannot run successfully until the object exists.$",
407
                        msg):
408
                    return
409
410
            print(msg)
411
412
# ----------------------------------------------------------------------------------------------------------------------
413