Test Failed
Push — master ( 849aa5...0715ed )
by P.R.
01:26
created

MySqlDataLayer.execute_sp_singleton1()   A

Complexity

Conditions 3

Size

Total Lines 27
Code Lines 14

Duplication

Lines 27
Ratio 100 %

Code Coverage

Tests 14
CRAP Score 3

Importance

Changes 0
Metric Value
eloc 14
dl 27
loc 27
ccs 14
cts 14
cp 1
rs 9.7
c 0
b 0
f 0
cc 3
nop 3
crap 3
1 1
from time import gmtime, strftime
2 1
from typing import Any, Dict, List, Optional
3
4 1
from mysql.connector import InterfaceError, MySQLConnection
5 1
from mysql.connector.cursor import MySQLCursor, MySQLCursorBuffered, MySQLCursorBufferedDict, MySQLCursorDict
6 1
from pystratum_middle.BulkHandler import BulkHandler
7 1
from pystratum_middle.exception.ResultException import ResultException
8
9 1
from pystratum_mysql.MySqlConnector import MySqlConnector
10
11
12 1
class MySqlDataLayer:
13
    """
14
    Class for connecting to a MySQL instance and executing SQL statements. Also, a parent class for classes with
15
    static wrapper methods for executing stored procedures and functions.
16
    """
17
18
    # ------------------------------------------------------------------------------------------------------------------
19 1
    def __init__(self, connector: MySqlConnector):
20
        """
21
        Object constructor.
22
        """
23
24 1
        self.__connector: MySqlConnector = connector
25
        """
26
        The object for connecting to the MySql instance.
27
        """
28
29 1
        self._connection: Optional[MySQLConnection] = None
30
        """
31
        The connection between Python and the MySQL instance.
32
        """
33
34 1
        self.line_buffered: bool = True
35
        """
36
        If True log messages from stored procedures with designation type 'log' are line buffered (Note: In python
37
        sys.stdout is buffered by default).
38
        """
39
40 1
        self._last_sql: str = ''
41 1
        """
42
        The last executed SQL statement.
43
        """
44
45
    # ------------------------------------------------------------------------------------------------------------------
46 1
    def commit(self) -> None:
47
        """
48
        Commits the current transaction.
49
        See http://dev.mysql.com/doc/connector-python/en/connector-python-api-mysqlconnection-commit.html
50
        """
51
        self._connection.commit()
52
53
    # ------------------------------------------------------------------------------------------------------------------
54 1
    def connect(self) -> None:
55
        """
56
        Connects to a MySQL instance. See http://dev.mysql.com/doc/connector-python/en/connector-python-connectargs.html
57
        for a complete overview of all possible keys in config.
58
        """
59 1
        self._connection = self.__connector.connect()
60
61
    # ------------------------------------------------------------------------------------------------------------------
62 1
    def disconnect(self) -> None:
63
        """
64
        Disconnects from the MySQL instance.
65
        See http://dev.mysql.com/doc/connector-python/en/connector-python-api-mysqlconnection-disconnect.html.
66
        """
67 1
        self._connection = None
68 1
        self.__connector.disconnect()
69
70
    # ------------------------------------------------------------------------------------------------------------------
71 1
    def execute_multi(self, sql: str) -> None:
72
        """
73
        Executes a multi query that does not select any rows.
74
75
        :param str sql: The SQL statements.
76
        """
77
        self._last_sql = sql
78
79
        cursor = MySQLCursor(self._connection)
80
        for _ in cursor.execute(sql, multi=True):
81
            pass
82
        cursor.close()
83
84
    # ------------------------------------------------------------------------------------------------------------------
85 1
    def execute_none(self, sql: str, *params) -> int:
86
        """
87
        Executes a query that does not select any rows. Returns the number of affected rows.
88
89
        :param str sql: The SQL statement.
90
        :param iterable params: The values for the statement.
91
92
        :rtype: int
93
        """
94 1
        self._last_sql = sql
95
96 1
        cursor = MySQLCursor(self._connection)
97 1
        cursor.execute(sql, params)
98 1
        rowcount = cursor.rowcount
99 1
        cursor.close()
100
101 1
        return rowcount
102
103
    # ------------------------------------------------------------------------------------------------------------------
104 1
    def execute_rows(self, sql: str, *params) -> List[Dict[str, Any]]:
105
        """
106
        Executes a query that selects 0 or more rows. Returns the selected rows (an empty list if no rows are selected).
107
108
        :param str sql: The SQL statement.
109
        :param iterable params: The arguments for the statement.
110
111
        :rtype: list[dict[str,*]]
112
        """
113 1
        self._last_sql = sql
114
115 1
        cursor = MySQLCursorBufferedDict(self._connection)
116 1
        cursor.execute(sql, *params)
117 1
        ret = cursor.fetchall()
118 1
        cursor.close()
119
120 1
        return ret
121
122
    # ------------------------------------------------------------------------------------------------------------------
123 1
    def execute_singleton1(self, sql: str, *params) -> Any:
124
        """
125
        Executes SQL statement that selects 1 row with 1 column. Returns the value of the selected column.
126
127
        :param str sql: The SQL calling the stored procedure.
128
        :param iterable params: The arguments for the stored procedure.
129
130
        :rtype: *:
131
        """
132 1
        self._last_sql = sql
133
134 1
        cursor = MySQLCursorBuffered(self._connection)
135 1
        cursor.execute(sql, params)
136 1
        rowcount = cursor.rowcount
137 1
        if rowcount == 1:
138 1
            ret = cursor.fetchone()[0]
139
        else:
140
            ret = None  # Keep our IDE happy.
141 1
        cursor.close()
142
143 1
        if rowcount != 1:
144
            raise ResultException('1', rowcount, sql)
145
146 1
        return ret
147
148
    # ------------------------------------------------------------------------------------------------------------------
149 1
    def execute_sp_bulk(self, bulk_handler: BulkHandler, sql: str, *params) -> int:
150
        """
151
        Executes a stored routine with designation type "bulk". Returns the number of rows processed.
152
153
        :param BulkHandler bulk_handler: The bulk handler for processing the selected rows.
154
        :param str sql: The SQL statement for calling the stored routine.
155
        :param iterable params: The arguments for calling the stored routine.
156
157
        :rtype: int
158
        """
159 1
        self._last_sql = sql
160
161 1
        cursor = MySQLCursorDict(self._connection)
162 1
        itr = cursor.execute(sql, params, multi=True)
163 1
        bulk_handler.start()
164
165 1
        rowcount = 0
166 1
        for result in itr:
167 1
            for row in result:
168 1
                rowcount += 1
169 1
                bulk_handler.row(row)
170
171
        cursor.close()
172
        bulk_handler.stop()
173
174
        return rowcount
175
176
    # ------------------------------------------------------------------------------------------------------------------
177 1
    def execute_sp_log(self, sql: str, *params) -> int:
178
        """
179
        Executes a stored routine with designation type "log". Returns the number of log messages.
180
181
        :param str sql: The SQL statement for calling the stored routine.
182
        :param iterable params: The arguments for calling the stored routine.
183
184
        :rtype: int
185
        """
186 1
        self._last_sql = sql
187
188 1
        cursor = MySQLCursorBuffered(self._connection)
189 1
        itr = cursor.execute(sql, params, multi=True)
190
191 1
        rowcount = 0
192 1
        try:
193 1
            for result in itr:
194 1
                rows = result.fetchall()
195 1
                if rows is not None:
196 1
                    stamp = strftime('%Y-%m-%d %H:%M:%S', gmtime())
197 1
                    for row in rows:
198 1
                        print(stamp, end='')
199 1
                        for field in row:
200 1
                            print(' %s' % field, end='')
201 1
                        print('', flush=self.line_buffered)
202 1
                        rowcount += 1
203 1
        except InterfaceError:
204 1
            pass
205
206 1
        cursor.close()
207
208 1
        return rowcount
209
210
    # ------------------------------------------------------------------------------------------------------------------
211 1
    def execute_sp_multi(self, sql: str, *params) -> List[List[Dict[str, Any]]]:
212
        """
213
        Executes a stored routine with designation type "multi". Returns a list of the result sets.
214
215
        :param str sql: The SQL statement for calling the stored routine.
216
        :param iterable params: The arguments for calling the stored routine.
217
218
        :rtype: list[list[dict[str,*]]]
219
        """
220 1
        self._last_sql = sql
221
222 1
        cursor = MySQLCursorBufferedDict(self._connection)
223 1
        itr = cursor.execute(sql, params, multi=True)
224
225 1
        results = []
226 1
        try:
227 1
            for result in itr:
228 1
                results.append(result.fetchall())
229 1
        except InterfaceError:
230 1
            pass
231
232 1
        cursor.close()
233
234 1
        return results
235
236
    # ------------------------------------------------------------------------------------------------------------------
237 1
    def execute_sp_none(self, sql: str, *params) -> int:
238
        """
239
        Executes a stored routine that does not select any rows. Returns the number of affected rows.
240
241
        :param str sql: The SQL calling the stored procedure.
242
        :param iterable params: The arguments for the stored procedure.
243
244
        :rtype: int
245
        """
246 1
        self._last_sql = sql
247
248 1
        cursor = MySQLCursor(self._connection)
249 1
        itr = cursor.execute(sql, params, multi=True)
250 1
        result = itr.__next__()
251 1
        rowcount = result.rowcount
252 1
        cursor.close()
253
254 1
        return rowcount
255
256
    # ------------------------------------------------------------------------------------------------------------------
257 1 View Code Duplication
    def execute_sp_row0(self, sql: str, *params) -> Optional[Dict[str, Any]]:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
258
        """
259
        Executes a stored procedure that selects 0 or 1 row. Returns the selected row or None.
260
261
        :param str sql: The SQL call the the stored procedure.
262
        :param iterable params: The arguments for the stored procedure.
263
264
        :rtype: None|dict[str,*]
265
        """
266 1
        self._last_sql = sql
267
268 1
        cursor = MySQLCursorBufferedDict(self._connection)
269 1
        itr = cursor.execute(sql, params, multi=True)
270 1
        result = itr.__next__()
271 1
        rowcount = result.rowcount
272 1
        if rowcount == 1:
273 1
            ret = result.fetchone()
274
        else:
275 1
            ret = None
276 1
        itr.__next__()
277 1
        cursor.close()
278
279 1
        if not (rowcount == 0 or rowcount == 1):
280 1
            raise ResultException('0 or 1', rowcount, sql)
281
282 1
        return ret
283
284
    # ------------------------------------------------------------------------------------------------------------------
285 1 View Code Duplication
    def execute_sp_row1(self, sql: str, *params) -> Dict[str, Any]:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
286
        """
287
        Executes a stored procedure that selects 1 row. Returns the selected row.
288
289
        :param str sql: The SQL calling the the stored procedure.
290
        :param iterable params: The arguments for the stored procedure.
291
292
        :rtype: dict[str,*]
293
        """
294 1
        self._last_sql = sql
295
296 1
        cursor = MySQLCursorBufferedDict(self._connection)
297 1
        itr = cursor.execute(sql, params, multi=True)
298 1
        result = itr.__next__()
299 1
        rowcount = result.rowcount
300 1
        if rowcount == 1:
301 1
            ret = result.fetchone()
302
        else:
303 1
            ret = None  # Keep our IDE happy.
304 1
        itr.__next__()
305 1
        cursor.close()
306
307 1
        if rowcount != 1:
308 1
            raise ResultException('1', rowcount, sql)
309
310 1
        return ret
311
312
    # ------------------------------------------------------------------------------------------------------------------
313 1
    def execute_sp_rows(self, sql: str, *params) -> List[Dict[str, Any]]:
314
        """
315
        Executes a stored procedure that selects 0 or more rows. Returns the selected rows (an empty list if no rows
316
        are selected).
317
318
        :param str sql: The SQL statement.
319
        :param iterable params: The arguments for the statement.
320
321
        :rtype: list[dict[str,*]]
322
        """
323 1
        self._last_sql = sql
324
325 1
        cursor = MySQLCursorBufferedDict(self._connection)
326 1
        itr = cursor.execute(sql, params, multi=True)
327 1
        ret = itr.__next__().fetchall()
328 1
        itr.__next__()
329 1
        cursor.close()
330
331 1
        return ret
332
333
    # ------------------------------------------------------------------------------------------------------------------
334 1 View Code Duplication
    def execute_sp_singleton0(self, sql: str, *params) -> Any:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
335
        """
336
        Executes a stored procedure that selects 0 or 1 row with 1 column. Returns the value of selected column or None.
337
338
        :param str sql: The SQL calling the stored procedure.
339
        :param iterable params: The arguments for the stored procedure.
340
341
        :rtype: *
342
        """
343 1
        self._last_sql = sql
344
345 1
        cursor = MySQLCursorBuffered(self._connection)
346 1
        itr = cursor.execute(sql, params, multi=True)
347 1
        result = itr.__next__()
348 1
        rowcount = result.rowcount
349 1
        if rowcount == 1:
350 1
            ret = result.fetchone()[0]
351
        else:
352 1
            ret = None
353 1
        itr.__next__()
354 1
        cursor.close()
355
356 1
        if not (rowcount == 0 or rowcount == 1):
357 1
            raise ResultException('0 or 1', rowcount, sql)
358
359 1
        return ret
360
361
    # ------------------------------------------------------------------------------------------------------------------
362 1 View Code Duplication
    def execute_sp_singleton1(self, sql: str, *params) -> Any:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
363
        """
364
        Executes a stored routine with designation type "table", i.e a stored routine that is expected to select 1 row
365
        with 1 column.
366
367
        :param str sql: The SQL calling the the stored procedure.
368
        :param iterable params: The arguments for the stored procedure.
369
370
        :rtype: * The value of the selected column.
371
        """
372 1
        self._last_sql = sql
373
374 1
        cursor = MySQLCursorBuffered(self._connection)
375 1
        itr = cursor.execute(sql, params, multi=True)
376 1
        result = itr.__next__()
377 1
        rowcount = result.rowcount
378 1
        if rowcount == 1:
379 1
            ret = result.fetchone()[0]
380
        else:
381 1
            ret = None  # Keep our IDE happy.
382 1
        itr.__next__()
383 1
        cursor.close()
384
385 1
        if rowcount != 1:
386 1
            raise ResultException('1', rowcount, sql)
387
388 1
        return ret
389
390
    # ------------------------------------------------------------------------------------------------------------------
391 1
    def last_sql(self) -> str:
392
        """
393
        Returns the last execute SQL statement.
394
        """
395
        return self._last_sql
396
397
    # ------------------------------------------------------------------------------------------------------------------
398 1
    def rollback(self) -> None:
399
        """
400
        Rolls back the current transaction.
401
        See http://dev.mysql.com/doc/connector-python/en/connector-python-api-mysqlconnection-rollback.html
402
        """
403
        self._connection.rollback()
404
405
    # ------------------------------------------------------------------------------------------------------------------
406 1
    def start_transaction(self,
407
                          consistent_snapshot: bool = False,
408
                          isolation_level: str = 'READ-COMMITTED',
409
                          readonly: Optional[bool] = None) -> None:
410
        """
411
        Starts a transaction.
412
        See http://dev.mysql.com/doc/connector-python/en/connector-python-api-mysqlconnection-start-transaction.html
413
414
        :param bool consistent_snapshot:
415
        :param str isolation_level:
416
        :param bool readonly:
417
        """
418
        self._connection.start_transaction(consistent_snapshot, isolation_level, readonly)
419
420
# ----------------------------------------------------------------------------------------------------------------------
421