Test Failed
Push — master ( 7c9493...8a6195 )
by P.R.
09:18
created

MySqlDataLayer.execute_rows()   A

Complexity

Conditions 1

Size

Total Lines 17
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 1
CRAP Score 1.6296

Importance

Changes 0
Metric Value
eloc 7
dl 0
loc 17
ccs 1
cts 7
cp 0.1429
rs 10
c 0
b 0
f 0
cc 1
nop 3
crap 1.6296
1 1
from time import gmtime, strftime
2 1
from typing import Any, Dict, List, Optional
3
4 1
from mysql.connector import InterfaceError, MySQLConnection
5 1
from mysql.connector.cursor import MySQLCursor, MySQLCursorBuffered, MySQLCursorBufferedDict, MySQLCursorDict
6 1
from pystratum_middle.BulkHandler import BulkHandler
7 1
from pystratum_middle.exception.ResultException import ResultException
8
9 1
from pystratum_mysql.MySqlConnector import MySqlConnector
10
11
12 1
class MySqlDataLayer:
13
    """
14
    Class for connecting to a MySQL instance and executing SQL statements. Also, a parent class for classes with
15
    static wrapper methods for executing stored procedures and functions.
16
    """
17
18
    # ------------------------------------------------------------------------------------------------------------------
19 1
    def __init__(self, connector: MySqlConnector):
20
        """
21
        Object constructor.
22
        """
23
24 1
        self.__connector: MySqlConnector = connector
25
        """
26
        The object for connecting to a MySQL instance.
27
        """
28
29 1
        self._connection: Optional[MySQLConnection] = None
30
        """
31
        The connection between Python and the MySQL instance.
32
        """
33
34 1
        self.line_buffered: bool = True
35
        """
36
        If True log messages from stored procedures with designation type 'log' are line buffered (Note: In python
37
        sys.stdout is buffered by default).
38
        """
39
40 1
        self._last_sql: str = ''
41 1
        """
42
        The last executed SQL statement.
43
        """
44
45
    # ------------------------------------------------------------------------------------------------------------------
46 1
    def commit(self) -> None:
47
        """
48
        Commits the current transaction.
49
        See http://dev.mysql.com/doc/connector-python/en/connector-python-api-mysqlconnection-commit.html
50
        """
51
        self._connection.commit()
52
53
    # ------------------------------------------------------------------------------------------------------------------
54 1
    def connect(self) -> None:
55
        """
56
        Connects to a MySQL instance. See http://dev.mysql.com/doc/connector-python/en/connector-python-connectargs.html
57
        for a complete overview of all possible keys in config.
58
        """
59 1
        self._connection = self.__connector.connect()
60
61
    # ------------------------------------------------------------------------------------------------------------------
62 1
    def disconnect(self) -> None:
63
        """
64
        Disconnects from the MySQL instance.
65
        See http://dev.mysql.com/doc/connector-python/en/connector-python-api-mysqlconnection-disconnect.html.
66
        """
67
        self._connection = None
68
        self.__connector.disconnect()
69
70
    # ------------------------------------------------------------------------------------------------------------------
71 1
    def execute_multi(self, sql: str) -> None:
72
        """
73
        Executes a multi query that does not select any rows.
74
75
        :param str sql: The SQL statements.
76
        """
77
        self._last_sql = sql
78
79
        cursor = MySQLCursor(self._connection)
80
        for _ in cursor.execute(sql, multi=True):
81
            pass
82
        cursor.close()
83
84
    # ------------------------------------------------------------------------------------------------------------------
85 1
    def execute_none(self, sql: str, *params) -> int:
86
        """
87
        Executes a query that does not select any rows. Returns the number of affected rows.
88
89
        :param str sql: The SQL statement.
90
        :param iterable params: The values for the statement.
91
92
        :rtype: int
93
        """
94
        self._last_sql = sql
95
96
        cursor = MySQLCursor(self._connection)
97
        cursor.execute(sql, params)
98
        rowcount = cursor.rowcount
99
        cursor.close()
100
101
        return rowcount
102
103
    # ------------------------------------------------------------------------------------------------------------------
104 1
    def execute_rows(self, sql: str, *params) -> List[Dict[str, Any]]:
105
        """
106
        Executes a query that selects 0 or more rows. Returns the selected rows (an empty list if no rows are selected).
107
108
        :param str sql: The SQL statement.
109
        :param iterable params: The arguments for the statement.
110
111
        :rtype: list[dict[str,*]]
112
        """
113
        self._last_sql = sql
114
115
        cursor = MySQLCursorBufferedDict(self._connection)
116
        cursor.execute(sql, *params)
117
        ret = cursor.fetchall()
118
        cursor.close()
119
120
        return ret
121
122
    # ------------------------------------------------------------------------------------------------------------------
123 1
    def execute_singleton1(self, sql: str, *params) -> Any:
124
        """
125
        Executes SQL statement that selects 1 row with 1 column. Returns the value of the selected column.
126
127
        :param str sql: The SQL calling the stored procedure.
128
        :param iterable params: The arguments for the stored procedure.
129
130
        :rtype: *:
131
        """
132
        self._last_sql = sql
133
134
        cursor = MySQLCursorBuffered(self._connection)
135
        cursor.execute(sql, params)
136
        rowcount = cursor.rowcount
137
        if rowcount == 1:
138
            ret = cursor.fetchone()[0]
139
        else:
140
            ret = None  # Keep our IDE happy.
141
        cursor.close()
142
143
        if rowcount != 1:
144
            raise ResultException('1', rowcount, sql)
145
146
        return ret
147
148
    # ------------------------------------------------------------------------------------------------------------------
149 1
    def execute_sp_bulk(self, bulk_handler: BulkHandler, sql: str, *params) -> int:
150
        """
151
        Executes a stored routine with designation type "bulk". Returns the number of rows processed.
152
153
        :param BulkHandler bulk_handler: The bulk handler for processing the selected rows.
154
        :param str sql: The SQL statement for calling the stored routine.
155
        :param iterable params: The arguments for calling the stored routine.
156
157
        :rtype: int
158
        """
159
        self._last_sql = sql
160
161
        cursor = MySQLCursorDict(self._connection)
162
        itr = cursor.execute(sql, params, multi=True)
163
        bulk_handler.start()
164
165
        rowcount = 0
166
        for result in itr:
167
            for row in result:
168
                rowcount += 1
169
                bulk_handler.row(row)
170
171
        cursor.close()
172
        bulk_handler.stop()
173
174
        return rowcount
175
176
    # ------------------------------------------------------------------------------------------------------------------
177 1
    def execute_sp_log(self, sql: str, *params) -> int:
178
        """
179
        Executes a stored routine with designation type "log". Returns the number of log messages.
180
181
        :param str sql: The SQL statement for calling the stored routine.
182
        :param iterable params: The arguments for calling the stored routine.
183
184
        :rtype: int
185
        """
186
        self._last_sql = sql
187
188
        cursor = MySQLCursorBuffered(self._connection)
189
        itr = cursor.execute(sql, params, multi=True)
190
191
        rowcount = 0
192
        try:
193
            for result in itr:
194
                rows = result.fetchall()
195
                if rows is not None:
196
                    stamp = strftime('%Y-%m-%d %H:%M:%S', gmtime())
197
                    for row in rows:
198
                        print(stamp, end='')
199
                        for field in row:
200
                            print(' %s' % field, end='')
201
                        print('', flush=self.line_buffered)
202
                        rowcount += 1
203
        except InterfaceError:
204
            pass
205
206
        cursor.close()
207
208
        return rowcount
209
210
    # ------------------------------------------------------------------------------------------------------------------
211 1
    def execute_sp_multi(self, sql: str, *params) -> List[List[Dict[str, Any]]]:
212
        """
213
        Executes a stored routine with designation type "multi". Returns a list of the result sets.
214
215
        :param str sql: The SQL statement for calling the stored routine.
216
        :param iterable params: The arguments for calling the stored routine.
217
218
        :rtype: list[list[dict[str,*]]]
219
        """
220
        self._last_sql = sql
221
222
        cursor = MySQLCursorBufferedDict(self._connection)
223
        itr = cursor.execute(sql, params, multi=True)
224
225
        results = []
226
        try:
227
            for result in itr:
228
                results.append(result.fetchall())
229
        except InterfaceError:
230
            pass
231
232
        cursor.close()
233
234
        return results
235
236
    # ------------------------------------------------------------------------------------------------------------------
237 1
    def execute_sp_none(self, sql: str, *params) -> int:
238
        """
239
        Executes a stored routine that does not select any rows. Returns the number of affected rows.
240
241
        :param str sql: The SQL calling the stored procedure.
242
        :param iterable params: The arguments for the stored procedure.
243
244
        :rtype: int
245
        """
246
        self._last_sql = sql
247
248
        cursor = MySQLCursor(self._connection)
249
        itr = cursor.execute(sql, params, multi=True)
250
        result = itr.__next__()
251
        rowcount = result.rowcount
252
        cursor.close()
253
254
        return rowcount
255
256
    # ------------------------------------------------------------------------------------------------------------------
257 1 View Code Duplication
    def execute_sp_row0(self, sql: str, *params) -> Optional[Dict[str, Any]]:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
258
        """
259
        Executes a stored procedure that selects 0 or 1 row. Returns the selected row or None.
260
261
        :param str sql: The SQL call the the stored procedure.
262
        :param iterable params: The arguments for the stored procedure.
263
264
        :rtype: None|dict[str,*]
265
        """
266
        self._last_sql = sql
267
268
        cursor = MySQLCursorBufferedDict(self._connection)
269
        itr = cursor.execute(sql, params, multi=True)
270
        result = itr.__next__()
271
        rowcount = result.rowcount
272
        if rowcount == 1:
273
            ret = result.fetchone()
274
        else:
275
            ret = None
276
        itr.__next__()
277
        cursor.close()
278
279
        if not (rowcount == 0 or rowcount == 1):
280
            raise ResultException('0 or 1', rowcount, sql)
281
282
        return ret
283
284
    # ------------------------------------------------------------------------------------------------------------------
285 1 View Code Duplication
    def execute_sp_row1(self, sql: str, *params) -> Dict[str, Any]:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
286
        """
287
        Executes a stored procedure that selects 1 row. Returns the selected row.
288
289
        :param str sql: The SQL calling the the stored procedure.
290
        :param iterable params: The arguments for the stored procedure.
291
292
        :rtype: dict[str,*]
293
        """
294
        self._last_sql = sql
295
296
        cursor = MySQLCursorBufferedDict(self._connection)
297
        itr = cursor.execute(sql, params, multi=True)
298
        result = itr.__next__()
299
        rowcount = result.rowcount
300
        if rowcount == 1:
301
            ret = result.fetchone()
302
        else:
303
            ret = None  # Keep our IDE happy.
304
        itr.__next__()
305
        cursor.close()
306
307
        if rowcount != 1:
308
            raise ResultException('1', rowcount, sql)
309
310
        return ret
311
312
    # ------------------------------------------------------------------------------------------------------------------
313 1
    def execute_sp_rows(self, sql: str, *params) -> List[Dict[str, Any]]:
314
        """
315
        Executes a stored procedure that selects 0 or more rows. Returns the selected rows (an empty list if no rows
316
        are selected).
317
318
        :param str sql: The SQL statement.
319
        :param iterable params: The arguments for the statement.
320
321
        :rtype: list[dict[str,*]]
322
        """
323
        self._last_sql = sql
324
325
        cursor = MySQLCursorBufferedDict(self._connection)
326
        itr = cursor.execute(sql, params, multi=True)
327
        ret = itr.__next__().fetchall()
328
        itr.__next__()
329
        cursor.close()
330
331
        return ret
332
333
    # ------------------------------------------------------------------------------------------------------------------
334 1 View Code Duplication
    def execute_sp_singleton0(self, sql: str, *params) -> Any:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
335
        """
336
        Executes a stored procedure that selects 0 or 1 row with 1 column. Returns the value of selected column or None.
337
338
        :param str sql: The SQL calling the stored procedure.
339
        :param iterable params: The arguments for the stored procedure.
340
341
        :rtype: *
342
        """
343
        self._last_sql = sql
344
345
        cursor = MySQLCursorBuffered(self._connection)
346
        itr = cursor.execute(sql, params, multi=True)
347
        result = itr.__next__()
348
        rowcount = result.rowcount
349
        if rowcount == 1:
350
            ret = result.fetchone()[0]
351
        else:
352
            ret = None
353
        itr.__next__()
354
        cursor.close()
355
356
        if not (rowcount == 0 or rowcount == 1):
357
            raise ResultException('0 or 1', rowcount, sql)
358
359
        return ret
360
361
    # ------------------------------------------------------------------------------------------------------------------
362 1 View Code Duplication
    def execute_sp_singleton1(self, sql: str, *params) -> Any:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
363
        """
364
        Executes a stored routine with designation type "table", i.e a stored routine that is expected to select 1 row
365
        with 1 column.
366
367
        :param str sql: The SQL calling the the stored procedure.
368
        :param iterable params: The arguments for the stored procedure.
369
370
        :rtype: * The value of the selected column.
371
        """
372
        self._last_sql = sql
373
374
        cursor = MySQLCursorBuffered(self._connection)
375
        itr = cursor.execute(sql, params, multi=True)
376
        result = itr.__next__()
377
        rowcount = result.rowcount
378
        if rowcount == 1:
379
            ret = result.fetchone()[0]
380
        else:
381
            ret = None  # Keep our IDE happy.
382
        itr.__next__()
383
        cursor.close()
384
385
        if rowcount != 1:
386
            raise ResultException('1', rowcount, sql)
387
388
        return ret
389
390
    # ------------------------------------------------------------------------------------------------------------------
391 1
    def last_sql(self) -> str:
392
        """
393
        Returns the last execute SQL statement.
394
        """
395
        return self._last_sql
396
397
    # ------------------------------------------------------------------------------------------------------------------
398 1
    def rollback(self) -> None:
399
        """
400
        Rolls back the current transaction.
401
        See http://dev.mysql.com/doc/connector-python/en/connector-python-api-mysqlconnection-rollback.html
402
        """
403
        self._connection.rollback()
404
405
    # ------------------------------------------------------------------------------------------------------------------
406 1
    def start_transaction(self,
407
                          consistent_snapshot: bool = False,
408
                          isolation_level: str = 'READ-COMMITTED',
409
                          readonly: Optional[bool] = None) -> None:
410
        """
411
        Starts a transaction.
412
        See http://dev.mysql.com/doc/connector-python/en/connector-python-api-mysqlconnection-start-transaction.html
413
414
        :param bool consistent_snapshot:
415
        :param str isolation_level:
416
        :param bool readonly:
417
        """
418
        self._connection.start_transaction(consistent_snapshot, isolation_level, readonly)
419
420
# ----------------------------------------------------------------------------------------------------------------------
421