MySqlDataLayer.execute_sp_singleton1()   A
last analyzed

Complexity

Conditions 3

Size

Total Lines 25
Code Lines 14

Duplication

Lines 25
Ratio 100 %

Code Coverage

Tests 1
CRAP Score 10.2065

Importance

Changes 0
Metric Value
eloc 14
dl 25
loc 25
ccs 1
cts 14
cp 0.0714
rs 9.7
c 0
b 0
f 0
cc 3
nop 3
crap 10.2065
1 1
from time import gmtime, strftime
2 1
from typing import Any, Dict, List, Optional
3
4 1
from mysql.connector import InterfaceError, MySQLConnection
5 1
from mysql.connector.cursor import MySQLCursor, MySQLCursorBuffered, MySQLCursorBufferedDict, MySQLCursorDict
6 1
from pystratum_middle.BulkHandler import BulkHandler
7 1
from pystratum_middle.exception.ResultException import ResultException
8
9 1
from pystratum_mysql.MySqlConnector import MySqlConnector
10
11
12 1
class MySqlDataLayer:
13
    """
14
    Class for connecting to a MySQL instance and executing SQL statements. Also, a parent class for classes with
15
    static wrapper methods for executing stored procedures and functions.
16
    """
17
18
    # ------------------------------------------------------------------------------------------------------------------
19 1
    def __init__(self, connector: MySqlConnector):
20
        """
21
        Object constructor.
22
        """
23
24 1
        self.__connector: MySqlConnector = connector
25
        """
26
        The object for connecting to a MySQL instance.
27
        """
28
29 1
        self._connection: Optional[MySQLConnection] = None
30
        """
31
        The connection between Python and the MySQL instance.
32
        """
33
34 1
        self.line_buffered: bool = True
35
        """
36
        If True log messages from stored procedures with designation type 'log' are line buffered (Note: In python
37
        sys.stdout is buffered by default).
38
        """
39
40 1
        self._last_sql: str = ''
41 1
        """
42
        The last executed SQL statement.
43
        """
44
45
    # ------------------------------------------------------------------------------------------------------------------
46 1
    def commit(self) -> None:
47
        """
48
        Commits the current transaction.
49
        See https://dev.mysql.com/doc/connector-python/en/connector-python-api-mysqlconnection-commit.html
50
        """
51
        self._connection.commit()
52
53
    # ------------------------------------------------------------------------------------------------------------------
54 1
    def connect(self) -> None:
55
        """
56
        Connects to a MySQL instance. See https://dev.mysql.com/doc/connector-python/en/connector-python-connectargs
57
        .html
58
        for a complete overview of all possible keys in config.
59 1
        """
60
        self._connection = self.__connector.connect()
61
62 1
    # ------------------------------------------------------------------------------------------------------------------
63
    def connect_if_not_alive(self) -> None:
64
        """
65
        Connects or reconnects to the MySQL or MariaDB instance when Python is not (longer) connected to a MySQL or
66
        MariaDB instance. See https://dev.mysql.com/doc/connector-python/en/connector-python-connectargs.html for a
67
        complete overview of all possible keys in config.
68
        """
69
        if not self.__connector.is_alive():
70
            if self._connection:
71
                self._connection.close()
72
            self._connection = self.__connector.connect()
73
74 1
    # ------------------------------------------------------------------------------------------------------------------
75
    def disconnect(self) -> None:
76
        """
77
        Disconnects from the MySQL instance.
78
        See https://dev.mysql.com/doc/connector-python/en/connector-python-api-mysqlconnection-disconnect.html.
79
        """
80
        self._connection = None
81
        self.__connector.disconnect()
82
83 1
    # ------------------------------------------------------------------------------------------------------------------
84
    def execute_multi(self, sql: str) -> None:
85
        """
86
        Executes a multi query that does not select any rows.
87
88
        :param sql: The SQL statements.
89
        """
90
        self._last_sql = sql
91
92
        cursor = MySQLCursor(self._connection)
93
        for _ in cursor.execute(sql, multi=True):
94
            pass
95
        cursor.close()
96
97 1
    # ------------------------------------------------------------------------------------------------------------------
98
    def execute_none(self, sql: str, *params) -> int:
99
        """
100
        Executes a query that does not select any rows. Returns the number of affected rows.
101
102
        :param sql: The SQL statement.
103
        :param params: The values for the statement.
104
        """
105
        self._last_sql = sql
106
107
        cursor = MySQLCursor(self._connection)
108
        cursor.execute(sql, params)
109
        rowcount = cursor.rowcount
110
        cursor.close()
111
112
        return rowcount
113
114
    # ------------------------------------------------------------------------------------------------------------------
115
    def execute_rows(self, sql: str, *params) -> List[Dict[str, Any]]:
116 1
        """
117
        Executes a query that selects 0 or more rows. Returns the selected rows (an empty list if no rows are selected).
118
119
        :param sql: The SQL statement.
120
        :param params: The arguments for the statement.
121
        """
122
        self._last_sql = sql
123
124
        cursor = MySQLCursorBufferedDict(self._connection)
125
        cursor.execute(sql, *params)
126
        ret = cursor.fetchall()
127
        cursor.close()
128
129
        return ret
130
131
    # ------------------------------------------------------------------------------------------------------------------
132
    def execute_singleton1(self, sql: str, *params) -> Any:
133
        """
134
        Executes SQL statement that selects 1 row with 1 column. Returns the value of the selected column.
135 1
136
        :param sql: The SQL calling the stored procedure.
137
        :param params: The arguments for the stored procedure.
138
        """
139
        self._last_sql = sql
140
141
        cursor = MySQLCursorBuffered(self._connection)
142
        cursor.execute(sql, params)
143
        rowcount = cursor.rowcount
144
        if rowcount == 1:
145
            ret = cursor.fetchone()[0]
146
        else:
147
            ret = None  # Keep our IDE happy.
148
        cursor.close()
149
150
        if rowcount != 1:
151
            raise ResultException('1', rowcount, sql)
152
153
        return ret
154
155
    # ------------------------------------------------------------------------------------------------------------------
156
    def execute_sp_bulk(self, bulk_handler: BulkHandler, sql: str, *params) -> int:
157
        """
158
        Executes a stored routine with designation type "bulk". Returns the number of rows processed.
159
160
        :param bulk_handler: The bulk handler for processing the selected rows.
161 1
        :param sql: The SQL statement for calling the stored routine.
162
        :param params: The arguments for calling the stored routine.
163
        """
164
        self._last_sql = sql
165
166
        cursor = MySQLCursorDict(self._connection)
167
        itr = cursor.execute(sql, params, multi=True)
168
        bulk_handler.start()
169
170
        rowcount = 0
171
        for result in itr:
172
            for row in result:
173
                rowcount += 1
174
                bulk_handler.row(row)
175
176
        cursor.close()
177
        bulk_handler.stop()
178
179
        return rowcount
180
181
    # ------------------------------------------------------------------------------------------------------------------
182
    def execute_sp_log(self, sql: str, *params) -> int:
183
        """
184
        Executes a stored routine with designation type "log". Returns the number of log messages.
185
186
        :param sql: The SQL statement for calling the stored routine.
187
        :param params: The arguments for calling the stored routine.
188
        """
189 1
        self._last_sql = sql
190
191
        cursor = MySQLCursorBuffered(self._connection)
192
        itr = cursor.execute(sql, params, multi=True)
193
194
        rowcount = 0
195
        try:
196
            for result in itr:
197
                rows = result.fetchall()
198
                if rows is not None:
199
                    stamp = strftime('%Y-%m-%d %H:%M:%S', gmtime())
200
                    for row in rows:
201
                        print(stamp, end='')
202
                        for field in row:
203
                            print(' %s' % field, end='')
204
                        print('', flush=self.line_buffered)
205
                        rowcount += 1
206
        except InterfaceError:
207
            pass
208
209
        cursor.close()
210
211
        return rowcount
212
213
    # ------------------------------------------------------------------------------------------------------------------
214
    def execute_sp_multi(self, sql: str, *params) -> List[List[Dict[str, Any]]]:
215
        """
216
        Executes a stored routine with designation type "multi". Returns a list of the result sets.
217
218
        :param sql: The SQL statement for calling the stored routine.
219
        :param params: The arguments for calling the stored routine.
220
        """
221
        self._last_sql = sql
222
223 1
        cursor = MySQLCursorBufferedDict(self._connection)
224
        itr = cursor.execute(sql, params, multi=True)
225
226
        results = []
227
        try:
228
            for result in itr:
229
                results.append(result.fetchall())
230
        except InterfaceError:
231
            pass
232
233
        cursor.close()
234
235
        return results
236
237
    # ------------------------------------------------------------------------------------------------------------------
238
    def execute_sp_none(self, sql: str, *params) -> int:
239
        """
240
        Executes a stored routine that does not select any rows. Returns the number of affected rows.
241
242
        :param sql: The SQL calling the stored procedure.
243
        :param params: The arguments for the stored procedure.
244
        """
245
        self._last_sql = sql
246
247
        cursor = MySQLCursor(self._connection)
248
        itr = cursor.execute(sql, params, multi=True)
249 1
        result = itr.__next__()
250
        rowcount = result.rowcount
251
        cursor.close()
252
253
        return rowcount
254
255
    # ------------------------------------------------------------------------------------------------------------------
256 View Code Duplication
    def execute_sp_row0(self, sql: str, *params) -> Optional[Dict[str, Any]]:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
257
        """
258
        Executes a stored procedure that selects 0 or 1 row. Returns the selected row or None.
259
260
        :param sql: The SQL code to execute the stored procedure.
261
        :param params: The arguments for the stored procedure.
262
        """
263
        self._last_sql = sql
264
265
        cursor = MySQLCursorBufferedDict(self._connection)
266
        itr = cursor.execute(sql, params, multi=True)
267
        result = itr.__next__()
268
        rowcount = result.rowcount
269 1
        if rowcount == 1:
270
            ret = result.fetchone()
271
        else:
272
            ret = None
273
        itr.__next__()
274
        cursor.close()
275
276
        if not (rowcount == 0 or rowcount == 1):
277
            raise ResultException('0 or 1', rowcount, sql)
278
279
        return ret
280
281
    # ------------------------------------------------------------------------------------------------------------------
282 View Code Duplication
    def execute_sp_row1(self, sql: str, *params) -> Dict[str, Any]:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
283
        """
284
        Executes a stored procedure that selects 1 row. Returns the selected row.
285
286
        :param sql: The SQL code to execute the stored procedure.
287
        :param params: The arguments for the stored procedure.
288
        """
289
        self._last_sql = sql
290
291
        cursor = MySQLCursorBufferedDict(self._connection)
292
        itr = cursor.execute(sql, params, multi=True)
293
        result = itr.__next__()
294
        rowcount = result.rowcount
295
        if rowcount == 1:
296
            ret = result.fetchone()
297 1
        else:
298
            ret = None  # Keep our IDE happy.
299
        itr.__next__()
300
        cursor.close()
301
302
        if rowcount != 1:
303
            raise ResultException('1', rowcount, sql)
304
305
        return ret
306
307
    # ------------------------------------------------------------------------------------------------------------------
308
    def execute_sp_rows(self, sql: str, *params) -> List[Dict[str, Any]]:
309
        """
310
        Executes a stored procedure that selects 0 or more rows. Returns the selected rows (an empty list if no rows
311
        are selected).
312
313
        :param sql: The SQL code to execute the stored procedure.
314
        :param params: The arguments for the statement.
315
        """
316
        self._last_sql = sql
317
318
        cursor = MySQLCursorBufferedDict(self._connection)
319
        itr = cursor.execute(sql, params, multi=True)
320
        ret = itr.__next__().fetchall()
321
        itr.__next__()
322
        cursor.close()
323
324
        return ret
325 1
326
    # ------------------------------------------------------------------------------------------------------------------
327 View Code Duplication
    def execute_sp_singleton0(self, sql: str, *params) -> Any:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
328
        """
329
        Executes a stored procedure that selects 0 or 1 row with 1 column. Returns the value of selected column or None.
330
331
        :param sql: The SQL code to execute the stored procedure.
332
        :param params: The arguments for the stored procedure.
333
        """
334
        self._last_sql = sql
335
336
        cursor = MySQLCursorBuffered(self._connection)
337
        itr = cursor.execute(sql, params, multi=True)
338
        result = itr.__next__()
339
        rowcount = result.rowcount
340
        if rowcount == 1:
341
            ret = result.fetchone()[0]
342
        else:
343
            ret = None
344
        itr.__next__()
345
        cursor.close()
346 1
347
        if not (rowcount == 0 or rowcount == 1):
348
            raise ResultException('0 or 1', rowcount, sql)
349
350
        return ret
351
352
    # ------------------------------------------------------------------------------------------------------------------
353 View Code Duplication
    def execute_sp_singleton1(self, sql: str, *params) -> Any:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
354
        """
355
        Executes a stored routine with designation type "table", i.e a stored routine that is expected to select 1 row
356
        with 1 column.
357
358
        :param sql: The SQL code to execute the stored procedure.
359
        :param params: The arguments for the stored procedure.
360
        """
361
        self._last_sql = sql
362
363
        cursor = MySQLCursorBuffered(self._connection)
364
        itr = cursor.execute(sql, params, multi=True)
365
        result = itr.__next__()
366
        rowcount = result.rowcount
367
        if rowcount == 1:
368
            ret = result.fetchone()[0]
369
        else:
370
            ret = None  # Keep our IDE happy.
371
        itr.__next__()
372
        cursor.close()
373
374 1
        if rowcount != 1:
375
            raise ResultException('1', rowcount, sql)
376
377
        return ret
378
379
    # ------------------------------------------------------------------------------------------------------------------
380
    def is_alive(self) -> bool:
381
        """
382
        Returns whether Python is (still) connected to a MySQL or MariaDB instance.
383
        """
384
        return self.__connector.is_alive()
385
386
    # ------------------------------------------------------------------------------------------------------------------
387
    def last_sql(self) -> str:
388
        """
389
        Returns the last execute SQL statement.
390
        """
391
        return self._last_sql
392
393
    # ------------------------------------------------------------------------------------------------------------------
394
    def rollback(self) -> None:
395
        """
396
        Rolls back the current transaction.
397
        See https://dev.mysql.com/doc/connector-python/en/connector-python-api-mysqlconnection-rollback.html
398
        """
399
        self._connection.rollback()
400
401
    # ------------------------------------------------------------------------------------------------------------------
402
    def start_transaction(self,
403 1
                          consistent_snapshot: bool = False,
404
                          isolation_level: str = 'READ-COMMITTED',
405
                          readonly: Optional[bool] = None) -> None:
406
        """
407
        Starts a transaction.
408
        See https://dev.mysql.com/doc/connector-python/en/connector-python-api-mysqlconnection-start-transaction.html
409
410
        :param consistent_snapshot:
411
        :param isolation_level:
412 1
        :param readonly:
413
        """
414
        self._connection.start_transaction(consistent_snapshot, isolation_level, readonly)
415
416
# ----------------------------------------------------------------------------------------------------------------------
417