Test Failed
Push — master ( 7c9493...8a6195 )
by P.R.
09:18
created

MySqlMetadataDataLayer.drop_temporary_table()   A

Complexity

Conditions 1

Size

Total Lines 9
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 1
CRAP Score 1.2963

Importance

Changes 0
Metric Value
eloc 3
dl 0
loc 9
ccs 1
cts 3
cp 0.3333
rs 10
c 0
b 0
f 0
cc 1
nop 2
crap 1.2963
1 1
from typing import Any, Dict, List, Union
2
3 1
from pystratum_backend.StratumStyle import StratumStyle
4 1
from pystratum_common.MetadataDataLayer import MetadataDataLayer
5
6 1
from pystratum_mysql.MySqlConnector import MySqlConnector
7 1
from pystratum_mysql.MySqlDataLayer import MySqlDataLayer
8
9
10 1
class MySqlMetadataDataLayer(MetadataDataLayer):
11
    """
12
    Data layer for retrieving metadata and loading stored routines.
13
    """
14
15
    # ------------------------------------------------------------------------------------------------------------------
16 1
    def __init__(self, io: StratumStyle, connector: MySqlConnector):
17
        """
18
        Object constructor.
19
20
        :param PyStratumStyle io: The output decorator.
21
        """
22 1
        super().__init__(io)
23
24 1
        self.__dl: MySqlDataLayer = MySqlDataLayer(connector)
25 1
        """
26
        The connection to the MySQL instance.
27
        """
28
29
    # ------------------------------------------------------------------------------------------------------------------
30 1
    def call_stored_routine(self, routine_name: str) -> int:
31
        """
32
        Class a stored procedure without arguments.
33
34
        :param str routine_name: The name of the procedure.
35
36
        :rtype: int
37
        """
38
        sql = 'call {0}()'.format(routine_name)
39
40
        return self.execute_none(sql)
41
42
    # ------------------------------------------------------------------------------------------------------------------
43 1
    def check_table_exists(self, table_name: str) -> int:
44
        """
45
        Checks if a table exists in the current schema.
46
47
        :param str table_name: The name of the table.
48
49
        :rtype: int
50
        """
51
        sql = """
52
select 1 from
53
information_schema.TABLES
54
where TABLE_SCHEMA = database()
55
and   TABLE_NAME   = '{0}'""".format(table_name)
56
57
        return self.execute_none(sql)
58
59
    # ------------------------------------------------------------------------------------------------------------------
60 1
    def connect(self) -> None:
61
        """
62
        Connects to a MySQL instance.
63
        """
64 1
        self.__dl.connect()
65
66
    # ------------------------------------------------------------------------------------------------------------------
67 1
    def describe_table(self, table_name: str) -> List[Dict[str, Any]]:
68
        """
69
        Describes a table.
70
71
        :param str table_name: The name of the table.
72
73
        :rtype: list[dict[str,*]]
74
        """
75
        sql = 'describe `{0}`'.format(table_name)
76
77
        return self.execute_rows(sql)
78
79
    # ------------------------------------------------------------------------------------------------------------------
80 1
    def disconnect(self) -> None:
81
        """
82
        Disconnects from the MySQL instance.
83
        """
84
        self.__dl.disconnect()
85
86
    # ------------------------------------------------------------------------------------------------------------------
87 1
    def drop_stored_routine(self, routine_type: str, routine_name: str) -> None:
88
        """
89
        Drops a stored routine if it exists.
90
91
        :param str routine_type: The type of the routine (i.e. PROCEDURE or FUNCTION).
92
        :param str routine_name: The name of the routine.
93
        """
94
        sql = 'drop {0} if exists `{1}`'.format(routine_type, routine_name)
95
96
        self.execute_none(sql)
97
98
    # ------------------------------------------------------------------------------------------------------------------
99 1
    def drop_temporary_table(self, table_name: str) -> None:
100
        """
101
        Drops a temporary table.
102
103
        :param str table_name: The name of the table.
104
        """
105
        sql = 'drop temporary table `{0}`'.format(table_name)
106
107
        self.execute_none(sql)
108
109
    # ------------------------------------------------------------------------------------------------------------------
110 1
    def execute_none(self, query: str) -> int:
111
        """
112
        Executes a query that does not select any rows.
113
114
        :param str query: The query.
115
116
        :rtype: int
117
        """
118
        self._log_query(query)
119
120
        return self.__dl.execute_none(query)
121
122
    # ------------------------------------------------------------------------------------------------------------------
123 1
    def execute_rows(self, query: str) -> List[Dict[str, Any]]:
124
        """
125
        Executes a query that selects 0 or more rows. Returns the selected rows (an empty list if no rows are selected).
126
127
        :param str query: The query.
128
129
        :rtype: list[dict[str,*]]
130
        """
131
        self._log_query(query)
132
133
        return self.__dl.execute_rows(query)
134
135
    # ------------------------------------------------------------------------------------------------------------------
136 1
    def execute_singleton1(self, query: str) -> Any:
137
        """
138
        Executes SQL statement that selects 1 row with 1 column. Returns the value of the selected column.
139
140
        :param str query: The query.
141
142
        :rtype: *
143
        """
144
        self._log_query(query)
145
146
        return self.__dl.execute_singleton1(query)
147
148
    # ------------------------------------------------------------------------------------------------------------------
149 1
    def get_all_table_columns(self) -> List[Dict[str, Union[str, int, None]]]:
150
        """
151
        Selects metadata of all columns of all tables.
152
153
        :rtype: list[dict[str,*]]
154
        """
155
        sql = """
156
(
157
  select table_name
158
  ,      column_name
159
  ,      column_type
160
  ,      character_set_name
161
  ,      data_type
162
  ,      character_maximum_length
163
  ,      numeric_precision
164
  ,      ordinal_position
165
  from   information_schema.COLUMNS
166
  where  table_schema = database()
167
  and    table_name  rlike '^[a-zA-Z0-9_]*$'
168
  and    column_name rlike '^[a-zA-Z0-9_]*$'
169
  order by table_name
170
  ,        ordinal_position
171
)
172
173
union all
174
175
(
176
  select concat(table_schema,'.',table_name) table_name
177
  ,      column_name
178
  ,      column_type
179
  ,      character_set_name
180
  ,      data_type
181
  ,      character_maximum_length
182
  ,      numeric_precision
183
  ,      ordinal_position
184
  from   information_schema.COLUMNS
185
  where  table_name  rlike '^[a-zA-Z0-9_]*$'
186
  and    column_name rlike '^[a-zA-Z0-9_]*$'
187
  order by table_schema
188
  ,        table_name
189
  ,        ordinal_position
190
)
191
"""
192
193
        return self.execute_rows(sql)
194
195
    # ------------------------------------------------------------------------------------------------------------------
196 1
    def get_correct_sql_mode(self, sql_mode: str) -> str:
197
        """
198
        Selects the SQL mode in the order as preferred by MySQL.
199
200
        :param str sql_mode: The SQL mode.
201
202
        :rtype: str
203
        """
204
        self.set_sql_mode(sql_mode)
205
206
        sql = 'select @@sql_mode'
207
208
        return self.execute_singleton1(sql)
209
210
    # ------------------------------------------------------------------------------------------------------------------
211 1
    def get_label_tables(self, regex: str) -> List[Dict[str, Any]]:
212
        """
213
        Selects metadata of tables with a label column.
214
215
        :param str regex: The regular expression for columns which we want to use.
216
217
        :rtype: list[dict[str,*]]
218
        """
219
        sql = """
220
select t1.TABLE_NAME  table_name
221
,      t1.COLUMN_NAME id
222
,      t2.COLUMN_NAME label
223
from       information_schema.COLUMNS t1
224
inner join information_schema.COLUMNS t2 on t1.TABLE_NAME = t2.TABLE_NAME
225
where t1.TABLE_SCHEMA = database()
226
and   t1.EXTRA        = 'auto_increment'
227
and   t2.TABLE_SCHEMA = database()
228
and   t2.COLUMN_NAME rlike '{0}'""".format(regex)
229
230
        return self.execute_rows(sql)
231
232
    # ------------------------------------------------------------------------------------------------------------------
233 1
    def get_labels_from_table(self, table_name: str, id_column_name: str, label_column_name: str) -> \
234
            List[Dict[str, Any]]:
235
        """
236
        Selects all labels from a table with labels.
237
238
        :param str table_name: The name of the table.
239
        :param str id_column_name: The name of the auto increment column.
240
        :param str label_column_name: The name of the column with labels.
241
242
        :rtype: list[dict[str,*]]
243
        """
244
        sql = """
245
select `{0}`  as `id`
246
,      `{1}`  as `label`
247
from   `{2}`
248
where   nullif(`{1}`,'') is not null""".format(id_column_name,
249
                                               label_column_name,
250
                                               table_name)
251
252
        return self.execute_rows(sql)
253
254
    # ------------------------------------------------------------------------------------------------------------------
255 1
    def last_sql(self) -> str:
256
        """
257
        The last executed SQL statement.
258
259
        :rtype: str
260
        """
261
        return self.__dl.last_sql()
262
263
    # ------------------------------------------------------------------------------------------------------------------
264 1
    def get_routine_parameters(self, routine_name: str) -> List[Dict[str, Any]]:
265
        """
266
        Selects metadata of the parameters of a stored routine.
267
268
        :param str routine_name: The name of the routine.
269
270
        :rtype: list[dict[str,*]]
271
        """
272
        sql = """
273
select t2.PARAMETER_NAME      parameter_name
274
,      t2.DATA_TYPE           parameter_type
275
,      t2.NUMERIC_PRECISION   numeric_precision
276
,      t2.NUMERIC_SCALE       numeric_scale
277
,      t2.DTD_IDENTIFIER      column_type
278
,      t2.CHARACTER_SET_NAME  character_set_name
279
,      t2.COLLATION_NAME      collation
280
from            information_schema.ROUTINES   t1
281
left outer join information_schema.PARAMETERS t2  on  t2.SPECIFIC_SCHEMA = t1.ROUTINE_SCHEMA and
282
                                                      t2.SPECIFIC_NAME   = t1.ROUTINE_NAME and
283
                                                      t2.PARAMETER_MODE   is not null
284
where t1.ROUTINE_SCHEMA = database()
285
and   t1.ROUTINE_NAME   = '{0}'""".format(routine_name)
286
287
        return self.execute_rows(sql)
288
289
    # ------------------------------------------------------------------------------------------------------------------
290 1
    def get_routines(self) -> List[Dict[str, Any]]:
291
        """
292
        Selects metadata of all routines in the current schema.
293
294
        :rtype: list[dict[str,*]]
295
        """
296
        sql = """
297
select ROUTINE_NAME           as routine_name
298
,      ROUTINE_TYPE           as routine_type
299
,      SQL_MODE               as sql_mode
300
,      CHARACTER_SET_CLIENT   as character_set_client
301
,      COLLATION_CONNECTION   as collation_connection
302
from  information_schema.ROUTINES
303
where ROUTINE_SCHEMA = database()
304
order by routine_name"""
305
306
        return self.execute_rows(sql)
307
308
    # ------------------------------------------------------------------------------------------------------------------
309 1
    def set_character_set(self, character_set: str, collate: str) -> None:
310
        """
311
        Sets the default character set and collate.
312
313
        :param str character_set: The name of the character set.
314
        :param str collate: The name of the collate
315
        """
316
        sql = "set names '{0}' collate '{1}'".format(character_set, collate)
317
318
        self.execute_none(sql)
319
320
    # ------------------------------------------------------------------------------------------------------------------
321 1
    def set_sql_mode(self, sql_mode: str) -> None:
322
        """
323
        Sets the SQL mode.
324
325
        :param str sql_mode: The SQL mode.
326
        """
327
        sql = "set sql_mode = '{0}'".format(sql_mode)
328
329
        self.execute_none(sql)
330
331
# ----------------------------------------------------------------------------------------------------------------------
332