Completed
Push — master ( de86bb...677151 )
by P.R.
01:20
created

PgSqlMetadataDataLayer.connect()   A

Complexity

Conditions 1

Size

Total Lines 5
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 1

Importance

Changes 0
Metric Value
cc 1
eloc 2
nop 1
dl 0
loc 5
ccs 2
cts 2
cp 1
crap 1
rs 10
c 0
b 0
f 0
1 1
from typing import List, Union
2
3 1
from pystratum_backend.StratumStyle import StratumStyle
4 1
from pystratum_common.MetadataDataLayer import MetadataDataLayer
5
6 1
from pystratum_pgsql.PgSqlConnector import PgSqlConnector
7 1
from pystratum_pgsql.PgSqlDataLayer import PgSqlDataLayer
8
9
10 1
class PgSqlMetadataDataLayer(MetadataDataLayer):
11
    """
12
    Data layer for retrieving metadata and loading stored routines.
13
    """
14
15
    # ------------------------------------------------------------------------------------------------------------------
16 1
    def __init__(self, io: StratumStyle, connector: PgSqlConnector):
17
        """
18
        Object constructor.
19
20
        :param PyStratumStyle io: The output decorator.
21
        """
22 1
        super().__init__(io)
23
24 1
        self.__dl: PgSqlDataLayer = PgSqlDataLayer(connector)
25 1
        """
26
        The connection to a PostgreSQL instance.
27
        """
28
29
    # ------------------------------------------------------------------------------------------------------------------
30 1
    def call_stored_routine(self, routine_name: str) -> Union[int, None]:
31
        """
32
        Class a stored procedure without arguments.
33
34
        :param str routine_name: The name of the procedure.
35
36
        :rtype: int|None
37
        """
38
        sql = 'call {0}()'.format(routine_name)
39
40
        return self.execute_none(sql)
41
42
    # ------------------------------------------------------------------------------------------------------------------
43 1
    def check_table_exists(self, table_name: str) -> Union[int, None]:
44
        """
45
        Checks if a table exists in the current schema.
46
47
        :param str table_name: The name of the table.
48
49
        :rtype: int|None
50
        """
51
        sql = """
52
select 1 from
53
information_schema.tables
54
where TABLE_SCHEMA = database()
55
and   TABLE_NAME   = '{0}'""" % table_name
56
57
        return self.execute_none(sql)
58
59
    # ------------------------------------------------------------------------------------------------------------------
60 1
    def commit(self) -> None:
61
        """
62
        Commits the current transaction.
63
        """
64 1
        self.__dl.commit()
65
66
    # ------------------------------------------------------------------------------------------------------------------
67 1
    def connect(self) -> None:
68
        """
69
        Connects to a PostgreSQL instance.
70
        """
71 1
        self.__dl.connect()
72
73
    # ------------------------------------------------------------------------------------------------------------------
74 1
    def describe_table(self, table_name: str) -> List:
75
        """
76
        Describes a table.
77
78
        :param str table_name: The name of the table.
79
80
        :rtype: list[dict[str,Object]]
81
        """
82
        sql = 'describe `{0}`'.format(table_name)
83
84
        return self.execute_rows(sql)
85
86
    # ------------------------------------------------------------------------------------------------------------------
87 1
    def disconnect(self) -> None:
88
        """
89
        Disconnects from the PostgreSQL instance.
90
        """
91 1
        self.__dl.disconnect()
92
93
    # ------------------------------------------------------------------------------------------------------------------
94 1
    def drop_stored_routine(self, routine_type: str, routine_name: str, routine_args: str) -> None:
95
        """
96
        Drops a stored routine if it exists.
97
98
        :param str routine_type: The type of the routine (i.e. PROCEDURE or FUNCTION).
99
        :param str routine_name: The name of the routine.
100
        :param str routine_args: The routine arguments types as comma separated list.
101
        """
102
        sql = 'drop {0} if exists {1}({2})'.format(routine_type, routine_name, routine_args)
103
104
        self.execute_none(sql)
105
106
    # ------------------------------------------------------------------------------------------------------------------
107 1
    def drop_temporary_table(self, table_name: str) -> None:
108
        """
109
        Drops a temporary table.
110
111
        :param str table_name: The name of the table.
112
        """
113
        sql = 'drop temporary table `{0}`'.format(table_name)
114
115
        self.execute_none(sql)
116
117
    # ------------------------------------------------------------------------------------------------------------------
118 1
    def execute_none(self, query: str) -> int:
119
        """
120
        Executes a query that does not select any rows.
121
122
        :param str query: The query.
123
124
        :rtype: int
125
        """
126 1
        self._log_query(query)
127
128 1
        return self.__dl.execute_none(query)
129
130
    # ------------------------------------------------------------------------------------------------------------------
131 1
    def execute_rows(self, query: str) -> List:
132
        """
133
        Executes a query that selects 0 or more rows. Returns the selected rows (an empty list if no rows are selected).
134
135
        :param str query: The query.
136
137
        :rtype: list[dict[str,Object]]
138
        """
139 1
        self._log_query(query)
140
141 1
        return self.__dl.execute_rows(query)
142
143
    # ------------------------------------------------------------------------------------------------------------------
144 1
    def execute_singleton1(self, query: str) -> object:
145
        """
146
        Executes SQL statement that selects 1 row with 1 column. Returns the value of the selected column.
147
148
        :param str query: The query.
149
150
        :rtype: Object
151
        """
152
        self._log_query(query)
153
154
        return self.__dl.execute_singleton1(query)
155
156
    # ------------------------------------------------------------------------------------------------------------------
157 1
    def get_all_table_columns(self) -> List:
158
        """
159
        Selects metadata of all columns of all tables.
160
161
        :rtype: list[dict[str,Object]]
162
        """
163 1
        sql = """
164
(
165
  select table_name
166
  ,      column_name
167
  ,      data_type
168
  ,      character_maximum_length
169
  ,      numeric_precision
170
  ,      ordinal_position
171
  from   information_schema.COLUMNS
172
  where  table_catalog = current_database()
173
  and    table_schema  = current_schema()
174
  and    table_name  similar to '[a-zA-Z0-9_]*'
175
  and    column_name similar to '[a-zA-Z0-9_]*'
176
  order by table_name
177
  ,        ordinal_position
178
)
179
180
union all
181
182
(
183
  select concat(table_schema,'.',table_name) table_name
184
  ,      column_name
185
  ,      data_type
186
  ,      character_maximum_length
187
  ,      numeric_precision
188
  ,      ordinal_position
189
  from   information_schema.COLUMNS
190
  where  1=0 and table_catalog = current_database()
191
  and    table_name  similar to '[a-zA-Z0-9_]*'
192
  and    column_name similar to '[a-zA-Z0-9_]*'
193
  order by table_schema
194
  ,        table_name
195
  ,        ordinal_position
196
)
197
"""
198
199 1
        return self.execute_rows(sql)
200
201
    # ------------------------------------------------------------------------------------------------------------------
202 1
    def get_label_tables(self, regex: str) -> List:
203
        """
204
        Selects metadata of tables with a label column.
205
206
        :param str regex: The regular expression for columns which we want to use.
207
208
        :rtype: list[dict[str,Object]]
209
        """
210 1
        sql = """
211
select t1.table_name  "table_name"
212
,      t1.column_name "id"
213
,      t2.column_name "label"
214
from       information_schema.columns t1
215
inner join information_schema.columns t2 on t1.table_name = t2.table_name
216
where t1.table_catalog = current_database()
217
and   t1.table_schema = current_schema()
218
and   t1.column_default like 'nextval(%%)'
219
and   t2.table_catalog = current_database()
220
and   t2.table_schema  = current_schema()
221
and   t2.column_name ~ '{0}'
222
""".format(regex)
223
224 1
        return self.execute_rows(sql)
225
226
    # ------------------------------------------------------------------------------------------------------------------
227 1
    def get_labels_from_table(self, table_name: str, id_column_name: str, label_column_name: str) -> List:
228
        """
229
        Selects all labels from a table with labels.
230
231
        :param str table_name: The name of the table.
232
        :param str id_column_name: The name of the auto increment column.
233
        :param str label_column_name: The name of the column with labels.
234
235
        :rtype: list[dict[str,Object]]
236
        """
237 1
        sql = """
238
select \"{0}\"  as id
239
,      \"{1}\"  as label
240
from   \"{2}\"
241
where   nullif(\"{1}\",'') is not null""".format(id_column_name,
242
                                                 label_column_name,
243
                                                 table_name)
244
245 1
        return self.execute_rows(sql)
246
247
    # ------------------------------------------------------------------------------------------------------------------
248 1
    def get_routine_parameters(self, routine_name: str) -> List:
249
        """
250
        Selects metadata of the parameters of a stored routine.
251
252
        :param str routine_name: The name of the routine.
253
254
        :rtype: list[dict[str,Object]]
255
        """
256 1
        sql = """
257
select t2.parameter_name      parameter_name
258
,      t2.data_type           parameter_type
259
,      t2.udt_name            column_type
260
from            information_schema.routines   t1
261
left outer join information_schema.parameters t2  on  t2.specific_catalog = t1.specific_catalog and
262
                                                      t2.specific_schema  = t1.specific_schema and
263
                                                      t2.specific_name    = t1.specific_name and
264
                                                      t2.parameter_name   is not null
265
where t1.routine_catalog = current_database()
266
and   t1.routine_schema  = current_schema()
267
and   t1.routine_name    = '%s'
268
order by t2.ordinal_position""" % routine_name
269
270 1
        return self.execute_rows(sql)
271
272
    # ------------------------------------------------------------------------------------------------------------------
273 1
    def get_routines(self) -> List:
274
        """
275
        Selects metadata of all routines in the current schema.
276
277
        :rtype: list[dict[str,Object]]
278
        """
279 1
        sql = """
280
select t1.routine_name                                                                        as routine_name
281
,      t1.routine_type                                                                        as routine_type
282
,      array_to_string(array_agg(case when (parameter_name is not null) then
283
                                   concat(t2.parameter_mode, ' ',
284
                                          t2.parameter_name, ' ',
285
                                          t2.udt_name)
286
                                 end order by t2.ordinal_position), ',')                      as routine_args
287
from            information_schema.routines   t1
288
left outer join information_schema.parameters t2  on  t2.specific_catalog = t1.specific_catalog and
289
                                                      t2.specific_schema  = t1.specific_schema and
290
                                                      t2.specific_name    = t1.specific_name and
291
                                                      t2.parameter_name   is not null
292
where routine_catalog = current_database()
293
and   routine_schema  = current_schema()
294
group by t1.routine_name
295
,        t1.routine_type
296
order by routine_name
297
"""
298
299 1
        return self.execute_rows(sql)
300
301
    # ------------------------------------------------------------------------------------------------------------------
302 1
    def rollback(self) -> None:
303
        """
304
        Rollbacks the current transaction.
305
        """
306
        self.__dl.rollback()
307
308
# ----------------------------------------------------------------------------------------------------------------------
309