Completed
Push — master ( de86bb...677151 )
by P.R.
01:20
created

PgSqlDataLayer.commit()   A

Complexity

Conditions 1

Size

Total Lines 5
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 1

Importance

Changes 0
Metric Value
cc 1
eloc 2
nop 1
dl 0
loc 5
ccs 2
cts 2
cp 1
crap 1
rs 10
c 0
b 0
f 0
1 1
from typing import Any, Dict, Iterable, List, Union
2
3 1
from psycopg2.extras import RealDictCursor
4 1
from pystratum_middle.exception.ResultException import ResultException
5
6 1
from pystratum_pgsql.PgSqlConnector import PgSqlConnector
7
8
9 1
class PgSqlDataLayer:
10
    """
11
    Class for connecting to a PostgreSQL instance and executing SQL statements. Also, a parent class for classes with
12
    static wrapper methods for executing stored procedures and functions.
13
    """
14
    # ------------------------------------------------------------------------------------------------------------------
15 1
    line_buffered: bool = True
16
    """
17
    If True log messages from stored procedures with designation type 'log' are line buffered (Note: In python
18
    sys.stdout is buffered by default).
19
    """
20
21 1
    sp_log_init: str = 'stratum_log_init'
22
    """
23
    The name of the stored routine that must be run before a store routine with designation type "log".
24
    """
25
26 1
    sp_log_fetch: str = 'stratum_log_fetch'
27
    """
28
    The name of the stored routine that must be run after a store routine with designation type "log".
29
    """
30
31
    # ------------------------------------------------------------------------------------------------------------------
32 1
    def __init__(self, connector: PgSqlConnector):
33
        """
34
        Object constructor.
35
        """
36
37 1
        self.__connector: PgSqlConnector = connector
38
        """
39
        The object for connecting to a PostgreSQL instance.
40
        """
41
42 1
        self._connection: Any = None
43 1
        """
44
        The connection between Python and the PostgreSQL instance.
45
    
46
        :type: psycopg2.extensions.connection
47
        """
48
49
    # ------------------------------------------------------------------------------------------------------------------
50 1
    def _get_column_names(self, cursor) -> List:
51
        """
52
        Returns a list with column names retrieved from the description of a cursor.
53
54
        :param psycopg2.extensions.cursor cursor: The cursor.
55
56
        :return: list
57
        """
58 1
        column_names = []
59 1
        for column in cursor.description:
60 1
            column_names.append(column.name)
61
62 1
        return column_names
63
64
    # ------------------------------------------------------------------------------------------------------------------
65 1
    def start_transaction(self, isolation_level: str = 'READ-COMMITTED', readonly: bool = False) -> None:
66
        """
67
        Starts a transaction.
68
69
        :param str isolation_level: The isolation level.
70
        :param bool readonly:
71
        """
72
        self._connection.set_session(isolation_level, readonly)
73
74
    # ------------------------------------------------------------------------------------------------------------------
75 1
    def commit(self):
76
        """
77
        Commits the current transaction.
78
        """
79 1
        self._connection.commit()
80
81
    # ------------------------------------------------------------------------------------------------------------------
82 1
    def rollback(self) -> None:
83
        """
84
        Rolls back the current transaction.
85
        """
86
        self._connection.rollback()
87
88
    # ------------------------------------------------------------------------------------------------------------------
89 1
    def connect(self) -> None:
90
        """
91
        Connects to a PostgreSQL instance.
92
        """
93 1
        self._connection = self.__connector.connect()
94
95
    # ------------------------------------------------------------------------------------------------------------------
96 1
    def disconnect(self) -> None:
97
        """
98
        Disconnects from the PostgreSQL instance.
99
        """
100 1
        self.__connector.disconnect()
101
102
    # ------------------------------------------------------------------------------------------------------------------
103 1
    def execute_none(self, sql: str, *params) -> int:
104
        """
105
        Executes a query that does not select any rows.
106
107
        :param string sql: The SQL statement.
108
        :param tuple params: The values for the statement.
109
110
        :rtype: int
111
        """
112 1
        cursor = self._connection.cursor()
113 1
        if params:
114
            cursor.execute(sql, params)
115
        else:
116 1
            cursor.execute(sql)
117 1
        row_count = cursor.rowcount
118 1
        cursor.close()
119
120 1
        return row_count
121
122
    # ------------------------------------------------------------------------------------------------------------------
123 1
    def execute_rows(self, sql: str, *params) -> List:
124
        """
125
        Executes a query that selects 0 or more rows. Returns the selected rows (an empty list if no rows are selected).
126
127
        :param str sql: The SQL statement.
128
        :param iterable params: The arguments for the statement.
129
130
        :rtype: list[dict]
131
        """
132 1
        cursor = self._connection.cursor(cursor_factory=RealDictCursor)
133 1
        if params:
134
            cursor.execute(sql, params)
135
        else:
136 1
            cursor.execute(sql)
137 1
        rows = cursor.fetchall()
138 1
        cursor.close()
139
140 1
        return rows
141
142
    # ------------------------------------------------------------------------------------------------------------------
143 1
    def execute_sp_none(self, sql: str, *params) -> None:
144
        """
145
        Executes a stored procedure that does not select any rows.
146
147
        Unfortunately, it is not possible to retrieve the number of affected rows of the SQL statement in the stored
148
        function as with execute_none (cursor.rowcount is always 1 using cursor.execute and cursor.callproc).
149
150
        :param str sql: The SQL statement.
151
        :param iterable params: The arguments for the statement.
152
        """
153
        cursor = self._connection.cursor()
154
        if params:
155
            cursor.execute(sql, params)
156
        else:
157
            cursor.execute(sql)
158
        cursor.close()
159
160
    # ------------------------------------------------------------------------------------------------------------------
161 1 View Code Duplication
    def execute_sp_row0(self, sql: str, *params) -> Union[None, Dict]:
1 ignored issue
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
162
        """
163
        Executes a stored procedure that selects 0 or 1 row. Returns the selected row or None.
164
165
        :param str sql: The SQL statement.
166
        :param iterable params: The arguments for the statement.
167
168
        :rtype: None|dict[str,object]
169
        """
170 1
        cursor = self._connection.cursor()
171
172 1
        if params:
173 1
            cursor.execute(sql, params)
174
        else:
175
            cursor.execute(sql)
176 1
        portal = self._connection.cursor(cursor.fetchone()[0])
177 1
        rows = portal.fetchall()
178 1
        row_count = len(rows)
179 1
        if row_count == 1:
180 1
            column_names = self._get_column_names(portal)
181 1
            ret = dict(zip(column_names, rows[0]))
182
        else:
183 1
            ret = None
184 1
        portal.close()
185 1
        cursor.close()
186
187 1
        if not (row_count == 0 or row_count == 1):
188 1
            raise ResultException('0 or 1', row_count, sql)
189
190 1
        return ret
191
192
    # ------------------------------------------------------------------------------------------------------------------
193 1 View Code Duplication
    def execute_sp_row1(self, sql: str, *params) -> Dict:
1 ignored issue
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
194
        """
195
        Executes a stored procedure that selects 1 row. Returns the selected row.
196
197
        :param str sql: The SQL call the the stored procedure.
198
        :param iterable params: The arguments for the stored procedure.
199
200
        :rtype: dict[str,object]
201
        """
202 1
        cursor = self._connection.cursor()
203 1
        if params:
204 1
            cursor.execute(sql, params)
205
        else:
206 1
            cursor.execute(sql)
207 1
        portal = self._connection.cursor(cursor.fetchone()[0])
208 1
        rows = portal.fetchall()
209 1
        self._get_column_names(portal)
210 1
        row_count = len(rows)
211 1
        if row_count == 1:
212 1
            column_names = self._get_column_names(portal)
213 1
            ret = dict(zip(column_names, rows[0]))
214
        else:
215 1
            ret = None  # Keep our IDE happy.
216 1
        portal.close()
217 1
        cursor.close()
218
219 1
        if row_count != 1:
220 1
            raise ResultException('1', row_count, sql)
221
222 1
        return ret
223
224
    # ------------------------------------------------------------------------------------------------------------------
225 1
    def execute_sp_rows(self, sql: str, *params) -> List:
226
        """
227
        Executes a stored procedure that selects 0 or more rows. Returns the selected rows (an empty list if no rows
228
        are selected).
229
230
        :param str sql: The SQL call the the stored procedure.
231
        :param iterable params: The arguments for the stored procedure.
232
233
        :rtype: list[dict[str,object]]
234
        """
235 1
        cursor = self._connection.cursor()
236 1
        if params:
237 1
            cursor.execute(sql, params)
238
        else:
239
            cursor.execute(sql)
240 1
        portal = self._connection.cursor(cursor.fetchone()[0])
241 1
        tmp = portal.fetchall()
242 1
        column_names = self._get_column_names(portal)
243 1
        portal.close()
244 1
        cursor.close()
245
246 1
        ret = []
247 1
        for row in tmp:
248 1
            ret.append(dict(zip(column_names, row)))
249
250 1
        return ret
251
252
    # ------------------------------------------------------------------------------------------------------------------
253 1 View Code Duplication
    def execute_sp_singleton0(self, sql: str, *params) -> object:
1 ignored issue
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
254
        """
255
        Executes a stored procedure that selects 0 or 1 row with 1 column. Returns the value of selected column or None.
256
257
        :param str sql: The SQL call the the stored procedure.
258
        :param iterable params: The arguments for the stored procedure.
259
260
        :rtype: object
261
        """
262 1
        cursor = self._connection.cursor()
263 1
        if params:
264 1
            cursor.execute(sql, params)
265
        else:
266
            cursor.execute(sql)
267 1
        portal = self._connection.cursor(cursor.fetchone()[0])
268 1
        rows = portal.fetchall()
269 1
        row_count = len(rows)
270 1
        if row_count == 1:
271 1
            ret = rows[0][0]
272
        else:
273 1
            ret = None  # Keep our IDE happy.
274 1
        portal.close()
275 1
        cursor.close()
276
277 1
        if not (row_count == 0 or row_count == 1):
278 1
            raise ResultException('0 or 1', row_count, sql)
279
280 1
        return ret
281
282
    # ------------------------------------------------------------------------------------------------------------------
283 1
    def execute_singleton1(self, sql: str, *params) -> object:
284
        """
285
        Executes query that selects 1 row with 1 column. Returns the value of the selected column.
286
287
        :param str sql: The SQL call the the stored procedure.
288
        :param iterable params: The arguments for the stored procedure.
289
290
        :rtype: object
291
        """
292 1
        cursor = self._connection.cursor()
293 1
        if params:
294 1
            cursor.execute(sql, params)
295
        else:
296
            cursor.execute(sql)
297 1
        row_count = cursor.rowcount
298 1
        if row_count == 1:
299 1
            ret = cursor.fetchone()[0]
300
        else:
301
            ret = None  # Keep our IDE happy.
302 1
        cursor.close()
303
304 1
        if row_count != 1:
305
            raise ResultException('1', row_count, sql)
306
307 1
        return ret
308
309
    # ------------------------------------------------------------------------------------------------------------------
310 1 View Code Duplication
    def execute_sp_singleton1(self, sql: str, *params) -> object:
1 ignored issue
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
311
        """
312
        Executes a stored procedure that selects 1 row with 1 column. Returns the value of the selected column.
313
314
        :param str sql: The SQL call the the stored procedure.
315
        :param iterable params: The arguments for the stored procedure.
316
317
        :rtype: object
318
        """
319 1
        cursor = self._connection.cursor()
320 1
        if params:
321 1
            cursor.execute(sql, params)
322
        else:
323 1
            cursor.execute(sql)
324 1
        portal = self._connection.cursor(cursor.fetchone()[0])
325 1
        rows = portal.fetchall()
326 1
        row_count = len(rows)
327 1
        if row_count == 1:
328 1
            ret = rows[0][0]
329
        else:
330 1
            ret = None  # Keep our IDE happy.
331 1
        portal.close()
332 1
        cursor.close()
333
334 1
        if len(rows) != 1:
335 1
            raise ResultException('1', row_count, sql)
336
337 1
        return ret
338
339
    # ------------------------------------------------------------------------------------------------------------------
340 1
    def execute_sp_table(self, sql: str, *params) -> int:
341
        """
342
        Executes a stored routine with designation type "table". Returns the number of rows.
343
344
        :param str sql: The SQL calling the the stored procedure.
345
        :param iterable params: The arguments for calling the stored routine.
346
347
        :rtype: int
348
        """
349
        # todo methods for showing table
350
        raise NotImplementedError()
351
352
    # ------------------------------------------------------------------------------------------------------------------
353 1
    def execute_sp_log(self, sql: str, *params) -> int:
354
        """
355
        Executes a stored procedure with log statements. Returns the number of lines in the log.
356
357
        :param str sql: The SQL call the the stored procedure.
358
        :param iterable params: The arguments for the stored procedure.
359
360
        :rtype: int
361
        """
362 1
        cursor = self._connection.cursor()
363
364
        # Create temporary table for logging.
365 1
        cursor.callproc(self.sp_log_init)
366
367
        # Execute the stored procedure.
368 1
        if params:
369
            cursor.execute(sql, params)
370
        else:
371 1
            cursor.execute(sql)
372
373
        # Fetch the log messages.
374 1
        cursor.callproc(self.sp_log_fetch)
375 1
        portal = self._connection.cursor(cursor.fetchone()[0])
376 1
        messages = portal.fetchall()
377 1
        portal.close()
378 1
        cursor.close()
379
380
        # Log the log messages.
381 1
        for message in messages:
382 1
            print('{0!s} {1!s}'.format(*message), flush=self.line_buffered)
383
384 1
        return len(messages)
385
386
    # ------------------------------------------------------------------------------------------------------------------
387 1
    def copy_from(self,
388
                  file: object,
389
                  table: str,
390
                  sep: str = '\t',
391
                  null: str = '\\N',
392
                  size: int = 8192,
393
                  columns: Union[None, Iterable] = None) -> int:
394
        """
395
        Read data from the file-like object file appending them to the table named table. Returns the number of rows
396
        copied.
397
398
        :param T file: File-like object to read data from. It must have both read() and readline() methods.
399
        :param str table: Name of the table to copy data into.
400
        :param str sep: Columns separator expected in the file. Defaults to a tab.
401
        :param str null: Textual representation of NULL in the file. The default is the two characters string \\N.
402
        :param int size: Size of the buffer used to read from the file.
403
        :param iterable columns: Iterable with name of the columns to import. The length and types should match the
404
                                 content of the file to read. If not specified, it is assumed that the entire table
405
                                 matches the file structure.
406
407
        :rtype: int
408
        """
409
        cursor = self._connection.cursor()
410
        cursor.copy_from(file, table, sep, null, size, columns)
411
        row_count = cursor.rowcount
412
        cursor.close()
413
414
        return row_count
415
416
    # ------------------------------------------------------------------------------------------------------------------
417 1
    def copy_to(self,
418
                file: object,
419
                table: str,
420
                sep: str = '\t',
421
                null: str = '\\N',
422
                columns: Union[None, Iterable] = None) -> int:
423
        """
424
        Write the content of the table named table to the file-like object file. Returns the number of rows copied.
425
426
        :param T file: File-like object to write data into. It must have a write() method.
427
        :param str table: Name of the table to copy data from.
428
        :param str sep: Columns separator expected in the file. Defaults to a tab.
429
        :param str null: Textual representation of NULL in the file. The default is the two characters string \\N.
430
        :param iterable columns: Iterable with name of the columns to export. If not specified, export all the columns.
431
432
        :rtype: int
433
        """
434
        cursor = self._connection.cursor()
435
        cursor.copy_to(file, table, sep, null, columns)
436
        row_count = cursor.rowcount
437
        cursor.close()
438
439
        return row_count
440
441
    # ------------------------------------------------------------------------------------------------------------------
442 1
    def copy_expert(self, sql: str, file: object, size: int = 8192) -> int:
443
        """
444
        Submit a user-composed COPY statement. Returns the number of rows copied.
445
446
        :param str sql: The COPY statement to execute.
447
        :param T file: A file-like object to read or write (according to sql).
448
        :param int size: Size of the read buffer to be used in COPY FROM.
449
450
        :rtype: int
451
        """
452
        cursor = self._connection.cursor()
453
        cursor.copy_expert(sql, file, size)
454
        row_count = cursor.rowcount
455
        cursor.close()
456
457
        return row_count
458
459
# ----------------------------------------------------------------------------------------------------------------------
460