Test Failed
Push — master ( 8a6195...3318f0 )
by P.R.
10:24
created

MySqlDataLayer.is_alive()   A

Complexity

Conditions 1

Size

Total Lines 7
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 1
CRAP Score 1.125

Importance

Changes 0
Metric Value
eloc 2
dl 0
loc 7
ccs 1
cts 2
cp 0.5
rs 10
c 0
b 0
f 0
cc 1
nop 1
crap 1.125
1 1
from time import gmtime, strftime
2 1
from typing import Any, Dict, List, Optional
3
4 1
from mysql.connector import InterfaceError, MySQLConnection
5 1
from mysql.connector.cursor import MySQLCursor, MySQLCursorBuffered, MySQLCursorBufferedDict, MySQLCursorDict
6 1
from pystratum_middle.BulkHandler import BulkHandler
7 1
from pystratum_middle.exception.ResultException import ResultException
8
9 1
from pystratum_mysql.MySqlConnector import MySqlConnector
10
11
12 1
class MySqlDataLayer:
13
    """
14
    Class for connecting to a MySQL instance and executing SQL statements. Also, a parent class for classes with
15
    static wrapper methods for executing stored procedures and functions.
16
    """
17
18
    # ------------------------------------------------------------------------------------------------------------------
19 1
    def __init__(self, connector: MySqlConnector):
20
        """
21
        Object constructor.
22
        """
23
24 1
        self.__connector: MySqlConnector = connector
25
        """
26
        The object for connecting to a MySQL instance.
27
        """
28
29 1
        self._connection: Optional[MySQLConnection] = None
30
        """
31
        The connection between Python and the MySQL instance.
32
        """
33
34 1
        self.line_buffered: bool = True
35
        """
36
        If True log messages from stored procedures with designation type 'log' are line buffered (Note: In python
37
        sys.stdout is buffered by default).
38
        """
39
40 1
        self._last_sql: str = ''
41 1
        """
42
        The last executed SQL statement.
43
        """
44
45
    # ------------------------------------------------------------------------------------------------------------------
46 1
    def commit(self) -> None:
47
        """
48
        Commits the current transaction.
49
        See https://dev.mysql.com/doc/connector-python/en/connector-python-api-mysqlconnection-commit.html
50
        """
51
        self._connection.commit()
52
53
    # ------------------------------------------------------------------------------------------------------------------
54 1
    def connect(self) -> None:
55
        """
56
        Connects to a MySQL instance. See https://dev.mysql.com/doc/connector-python/en/connector-python-connectargs.html
57
        for a complete overview of all possible keys in config.
58
        """
59 1
        self._connection = self.__connector.connect()
60
61
    # ------------------------------------------------------------------------------------------------------------------
62 1
    def connect_if_not_alive(self) -> None:
63
        """
64
        Connects or reconnects to the MySQL or MariaDB instance when Python is not (longer) connected to a MySQL or
65
        MariaDB instance. See https://dev.mysql.com/doc/connector-python/en/connector-python-connectargs.html for a
66
        complete overview of all possible keys in config.
67
        """
68
        if not self.__connector.is_alive():
69
            if self._connection:
70
                self._connection.close()
71
            self._connection = self.__connector.connect()
72
73
    # ------------------------------------------------------------------------------------------------------------------
74 1
    def disconnect(self) -> None:
75
        """
76
        Disconnects from the MySQL instance.
77
        See https://dev.mysql.com/doc/connector-python/en/connector-python-api-mysqlconnection-disconnect.html.
78
        """
79
        self._connection = None
80
        self.__connector.disconnect()
81
82
    # ------------------------------------------------------------------------------------------------------------------
83 1
    def execute_multi(self, sql: str) -> None:
84
        """
85
        Executes a multi query that does not select any rows.
86
87
        :param str sql: The SQL statements.
88
        """
89
        self._last_sql = sql
90
91
        cursor = MySQLCursor(self._connection)
92
        for _ in cursor.execute(sql, multi=True):
93
            pass
94
        cursor.close()
95
96
    # ------------------------------------------------------------------------------------------------------------------
97 1
    def execute_none(self, sql: str, *params) -> int:
98
        """
99
        Executes a query that does not select any rows. Returns the number of affected rows.
100
101
        :param str sql: The SQL statement.
102
        :param iterable params: The values for the statement.
103
104
        :rtype: int
105
        """
106
        self._last_sql = sql
107
108
        cursor = MySQLCursor(self._connection)
109
        cursor.execute(sql, params)
110
        rowcount = cursor.rowcount
111
        cursor.close()
112
113
        return rowcount
114
115
    # ------------------------------------------------------------------------------------------------------------------
116 1
    def execute_rows(self, sql: str, *params) -> List[Dict[str, Any]]:
117
        """
118
        Executes a query that selects 0 or more rows. Returns the selected rows (an empty list if no rows are selected).
119
120
        :param str sql: The SQL statement.
121
        :param iterable params: The arguments for the statement.
122
123
        :rtype: list[dict[str,*]]
124
        """
125
        self._last_sql = sql
126
127
        cursor = MySQLCursorBufferedDict(self._connection)
128
        cursor.execute(sql, *params)
129
        ret = cursor.fetchall()
130
        cursor.close()
131
132
        return ret
133
134
    # ------------------------------------------------------------------------------------------------------------------
135 1
    def execute_singleton1(self, sql: str, *params) -> Any:
136
        """
137
        Executes SQL statement that selects 1 row with 1 column. Returns the value of the selected column.
138
139
        :param str sql: The SQL calling the stored procedure.
140
        :param iterable params: The arguments for the stored procedure.
141
142
        :rtype: *:
143
        """
144
        self._last_sql = sql
145
146
        cursor = MySQLCursorBuffered(self._connection)
147
        cursor.execute(sql, params)
148
        rowcount = cursor.rowcount
149
        if rowcount == 1:
150
            ret = cursor.fetchone()[0]
151
        else:
152
            ret = None  # Keep our IDE happy.
153
        cursor.close()
154
155
        if rowcount != 1:
156
            raise ResultException('1', rowcount, sql)
157
158
        return ret
159
160
    # ------------------------------------------------------------------------------------------------------------------
161 1
    def execute_sp_bulk(self, bulk_handler: BulkHandler, sql: str, *params) -> int:
162
        """
163
        Executes a stored routine with designation type "bulk". Returns the number of rows processed.
164
165
        :param BulkHandler bulk_handler: The bulk handler for processing the selected rows.
166
        :param str sql: The SQL statement for calling the stored routine.
167
        :param iterable params: The arguments for calling the stored routine.
168
169
        :rtype: int
170
        """
171
        self._last_sql = sql
172
173
        cursor = MySQLCursorDict(self._connection)
174
        itr = cursor.execute(sql, params, multi=True)
175
        bulk_handler.start()
176
177
        rowcount = 0
178
        for result in itr:
179
            for row in result:
180
                rowcount += 1
181
                bulk_handler.row(row)
182
183
        cursor.close()
184
        bulk_handler.stop()
185
186
        return rowcount
187
188
    # ------------------------------------------------------------------------------------------------------------------
189 1
    def execute_sp_log(self, sql: str, *params) -> int:
190
        """
191
        Executes a stored routine with designation type "log". Returns the number of log messages.
192
193
        :param str sql: The SQL statement for calling the stored routine.
194
        :param iterable params: The arguments for calling the stored routine.
195
196
        :rtype: int
197
        """
198
        self._last_sql = sql
199
200
        cursor = MySQLCursorBuffered(self._connection)
201
        itr = cursor.execute(sql, params, multi=True)
202
203
        rowcount = 0
204
        try:
205
            for result in itr:
206
                rows = result.fetchall()
207
                if rows is not None:
208
                    stamp = strftime('%Y-%m-%d %H:%M:%S', gmtime())
209
                    for row in rows:
210
                        print(stamp, end='')
211
                        for field in row:
212
                            print(' %s' % field, end='')
213
                        print('', flush=self.line_buffered)
214
                        rowcount += 1
215
        except InterfaceError:
216
            pass
217
218
        cursor.close()
219
220
        return rowcount
221
222
    # ------------------------------------------------------------------------------------------------------------------
223 1
    def execute_sp_multi(self, sql: str, *params) -> List[List[Dict[str, Any]]]:
224
        """
225
        Executes a stored routine with designation type "multi". Returns a list of the result sets.
226
227
        :param str sql: The SQL statement for calling the stored routine.
228
        :param iterable params: The arguments for calling the stored routine.
229
230
        :rtype: list[list[dict[str,*]]]
231
        """
232
        self._last_sql = sql
233
234
        cursor = MySQLCursorBufferedDict(self._connection)
235
        itr = cursor.execute(sql, params, multi=True)
236
237
        results = []
238
        try:
239
            for result in itr:
240
                results.append(result.fetchall())
241
        except InterfaceError:
242
            pass
243
244
        cursor.close()
245
246
        return results
247
248
    # ------------------------------------------------------------------------------------------------------------------
249 1
    def execute_sp_none(self, sql: str, *params) -> int:
250
        """
251
        Executes a stored routine that does not select any rows. Returns the number of affected rows.
252
253
        :param str sql: The SQL calling the stored procedure.
254
        :param iterable params: The arguments for the stored procedure.
255
256
        :rtype: int
257
        """
258
        self._last_sql = sql
259
260
        cursor = MySQLCursor(self._connection)
261
        itr = cursor.execute(sql, params, multi=True)
262
        result = itr.__next__()
263
        rowcount = result.rowcount
264
        cursor.close()
265
266
        return rowcount
267
268
    # ------------------------------------------------------------------------------------------------------------------
269 1 View Code Duplication
    def execute_sp_row0(self, sql: str, *params) -> Optional[Dict[str, Any]]:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
270
        """
271
        Executes a stored procedure that selects 0 or 1 row. Returns the selected row or None.
272
273
        :param str sql: The SQL call the the stored procedure.
274
        :param iterable params: The arguments for the stored procedure.
275
276
        :rtype: None|dict[str,*]
277
        """
278
        self._last_sql = sql
279
280
        cursor = MySQLCursorBufferedDict(self._connection)
281
        itr = cursor.execute(sql, params, multi=True)
282
        result = itr.__next__()
283
        rowcount = result.rowcount
284
        if rowcount == 1:
285
            ret = result.fetchone()
286
        else:
287
            ret = None
288
        itr.__next__()
289
        cursor.close()
290
291
        if not (rowcount == 0 or rowcount == 1):
292
            raise ResultException('0 or 1', rowcount, sql)
293
294
        return ret
295
296
    # ------------------------------------------------------------------------------------------------------------------
297 1 View Code Duplication
    def execute_sp_row1(self, sql: str, *params) -> Dict[str, Any]:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
298
        """
299
        Executes a stored procedure that selects 1 row. Returns the selected row.
300
301
        :param str sql: The SQL calling the the stored procedure.
302
        :param iterable params: The arguments for the stored procedure.
303
304
        :rtype: dict[str,*]
305
        """
306
        self._last_sql = sql
307
308
        cursor = MySQLCursorBufferedDict(self._connection)
309
        itr = cursor.execute(sql, params, multi=True)
310
        result = itr.__next__()
311
        rowcount = result.rowcount
312
        if rowcount == 1:
313
            ret = result.fetchone()
314
        else:
315
            ret = None  # Keep our IDE happy.
316
        itr.__next__()
317
        cursor.close()
318
319
        if rowcount != 1:
320
            raise ResultException('1', rowcount, sql)
321
322
        return ret
323
324
    # ------------------------------------------------------------------------------------------------------------------
325 1
    def execute_sp_rows(self, sql: str, *params) -> List[Dict[str, Any]]:
326
        """
327
        Executes a stored procedure that selects 0 or more rows. Returns the selected rows (an empty list if no rows
328
        are selected).
329
330
        :param str sql: The SQL statement.
331
        :param iterable params: The arguments for the statement.
332
333
        :rtype: list[dict[str,*]]
334
        """
335
        self._last_sql = sql
336
337
        cursor = MySQLCursorBufferedDict(self._connection)
338
        itr = cursor.execute(sql, params, multi=True)
339
        ret = itr.__next__().fetchall()
340
        itr.__next__()
341
        cursor.close()
342
343
        return ret
344
345
    # ------------------------------------------------------------------------------------------------------------------
346 1 View Code Duplication
    def execute_sp_singleton0(self, sql: str, *params) -> Any:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
347
        """
348
        Executes a stored procedure that selects 0 or 1 row with 1 column. Returns the value of selected column or None.
349
350
        :param str sql: The SQL calling the stored procedure.
351
        :param iterable params: The arguments for the stored procedure.
352
353
        :rtype: *
354
        """
355
        self._last_sql = sql
356
357
        cursor = MySQLCursorBuffered(self._connection)
358
        itr = cursor.execute(sql, params, multi=True)
359
        result = itr.__next__()
360
        rowcount = result.rowcount
361
        if rowcount == 1:
362
            ret = result.fetchone()[0]
363
        else:
364
            ret = None
365
        itr.__next__()
366
        cursor.close()
367
368
        if not (rowcount == 0 or rowcount == 1):
369
            raise ResultException('0 or 1', rowcount, sql)
370
371
        return ret
372
373
    # ------------------------------------------------------------------------------------------------------------------
374 1 View Code Duplication
    def execute_sp_singleton1(self, sql: str, *params) -> Any:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
375
        """
376
        Executes a stored routine with designation type "table", i.e a stored routine that is expected to select 1 row
377
        with 1 column.
378
379
        :param str sql: The SQL calling the the stored procedure.
380
        :param iterable params: The arguments for the stored procedure.
381
382
        :rtype: * The value of the selected column.
383
        """
384
        self._last_sql = sql
385
386
        cursor = MySQLCursorBuffered(self._connection)
387
        itr = cursor.execute(sql, params, multi=True)
388
        result = itr.__next__()
389
        rowcount = result.rowcount
390
        if rowcount == 1:
391
            ret = result.fetchone()[0]
392
        else:
393
            ret = None  # Keep our IDE happy.
394
        itr.__next__()
395
        cursor.close()
396
397
        if rowcount != 1:
398
            raise ResultException('1', rowcount, sql)
399
400
        return ret
401
402
    # ------------------------------------------------------------------------------------------------------------------
403 1
    def is_alive(self) -> bool:
404
        """
405
        Returns whether Python is (still) connected to a MySQL or MariaDB instance.
406
407
        :rtype: bool
408
        """
409
        return self.__connector.is_alive()
410
411
    # ------------------------------------------------------------------------------------------------------------------
412 1
    def last_sql(self) -> str:
413
        """
414
        Returns the last execute SQL statement.
415
        """
416
        return self._last_sql
417
418
    # ------------------------------------------------------------------------------------------------------------------
419 1
    def rollback(self) -> None:
420
        """
421
        Rolls back the current transaction.
422
        See https://dev.mysql.com/doc/connector-python/en/connector-python-api-mysqlconnection-rollback.html
423
        """
424
        self._connection.rollback()
425
426
    # ------------------------------------------------------------------------------------------------------------------
427 1
    def start_transaction(self,
428
                          consistent_snapshot: bool = False,
429
                          isolation_level: str = 'READ-COMMITTED',
430
                          readonly: Optional[bool] = None) -> None:
431
        """
432
        Starts a transaction.
433
        See https://dev.mysql.com/doc/connector-python/en/connector-python-api-mysqlconnection-start-transaction.html
434
435
        :param bool consistent_snapshot:
436
        :param str isolation_level:
437
        :param bool readonly:
438
        """
439
        self._connection.start_transaction(consistent_snapshot, isolation_level, readonly)
440
441
# ----------------------------------------------------------------------------------------------------------------------
442