pystratum_mysql.MySqlMetadataDataLayer   A
last analyzed

Complexity

Total Complexity 20

Size/Duplication

Total Lines 304
Duplicated Lines 0 %

Test Coverage

Coverage 48.28%

Importance

Changes 0
Metric Value
wmc 20
eloc 71
dl 0
loc 304
ccs 28
cts 58
cp 0.4828
rs 10
c 0
b 0
f 0

20 Methods

Rating   Name   Duplication   Size   Complexity  
A MySqlMetadataDataLayer.__init__() 0 10 1
A MySqlMetadataDataLayer.set_sql_mode() 0 9 1
A MySqlMetadataDataLayer.disconnect() 0 5 1
A MySqlMetadataDataLayer.call_stored_routine() 0 9 1
A MySqlMetadataDataLayer.execute_none() 0 9 1
A MySqlMetadataDataLayer.get_label_tables() 0 18 1
A MySqlMetadataDataLayer.connect() 0 5 1
A MySqlMetadataDataLayer.execute_rows() 0 9 1
A MySqlMetadataDataLayer.execute_singleton1() 0 9 1
A MySqlMetadataDataLayer.check_table_exists() 0 13 1
A MySqlMetadataDataLayer.get_routines() 0 15 1
A MySqlMetadataDataLayer.get_all_table_columns() 0 43 1
A MySqlMetadataDataLayer.get_routine_parameters() 0 22 1
A MySqlMetadataDataLayer.describe_table() 0 9 1
A MySqlMetadataDataLayer.drop_temporary_table() 0 9 1
A MySqlMetadataDataLayer.drop_stored_routine() 0 10 1
A MySqlMetadataDataLayer.get_labels_from_table() 0 18 1
A MySqlMetadataDataLayer.get_correct_sql_mode() 0 11 1
A MySqlMetadataDataLayer.last_sql() 0 5 1
A MySqlMetadataDataLayer.set_character_set() 0 10 1
1 1
from typing import Any, Dict, List, Union
2
3 1
from pystratum_backend.StratumIO import StratumIO
4 1
from pystratum_common.MetadataDataLayer import MetadataDataLayer
5
6 1
from pystratum_mysql.MySqlConnector import MySqlConnector
7 1
from pystratum_mysql.MySqlDataLayer import MySqlDataLayer
8
9
10 1
class MySqlMetadataDataLayer(MetadataDataLayer):
11
    """
12
    Data layer for retrieving metadata and loading stored routines.
13
    """
14
15
    # ------------------------------------------------------------------------------------------------------------------
16 1
    def __init__(self, io: StratumIO, connector: MySqlConnector):
17
        """
18
        Object constructor.
19
20
        :param io: The output decorator.
21
        """
22 1
        super().__init__(io)
23
24 1
        self.__dl: MySqlDataLayer = MySqlDataLayer(connector)
25 1
        """
26
        The connection to the MySQL instance.
27
        """
28
29
    # ------------------------------------------------------------------------------------------------------------------
30 1
    def call_stored_routine(self, routine_name: str) -> int:
31
        """
32
        Class a stored procedure without arguments.
33
34
        :param routine_name: The name of the procedure.
35
        """
36
        sql = 'call {0}()'.format(routine_name)
37
38
        return self.execute_none(sql)
39
40
    # ------------------------------------------------------------------------------------------------------------------
41
    def check_table_exists(self, table_name: str) -> int:
42
        """
43 1
        Checks if a table exists in the current schema.
44
45
        :param table_name: The name of the table.
46
        """
47
        sql = """
48
select 1 from
49
information_schema.TABLES
50
where TABLE_SCHEMA = database()
51
and   TABLE_NAME   = '{0}'""".format(table_name)
52
53
        return self.execute_none(sql)
54
55
    # ------------------------------------------------------------------------------------------------------------------
56
    def connect(self) -> None:
57
        """
58
        Connects to a MySQL instance.
59
        """
60 1
        self.__dl.connect()
61
62
    # ------------------------------------------------------------------------------------------------------------------
63
    def describe_table(self, table_name: str) -> List[Dict[str, Any]]:
64 1
        """
65
        Describes a table.
66
67 1
        :param table_name: The name of the table.
68
        """
69
        sql = 'describe `{0}`'.format(table_name)
70
71
        return self.execute_rows(sql)
72
73
    # ------------------------------------------------------------------------------------------------------------------
74
    def disconnect(self) -> None:
75
        """
76
        Disconnects from the MySQL instance.
77
        """
78
        self.__dl.disconnect()
79
80 1
    # ------------------------------------------------------------------------------------------------------------------
81
    def drop_stored_routine(self, routine_type: str, routine_name: str) -> None:
82
        """
83
        Drops a stored routine if it exists.
84
85
        :param routine_type: The type of the routine (i.e. PROCEDURE or FUNCTION).
86
        :param routine_name: The name of the routine.
87 1
        """
88
        sql = 'drop {0} if exists `{1}`'.format(routine_type, routine_name)
89
90
        self.execute_none(sql)
91
92
    # ------------------------------------------------------------------------------------------------------------------
93
    def drop_temporary_table(self, table_name: str) -> None:
94
        """
95
        Drops a temporary table.
96
97
        :param table_name: The name of the table.
98
        """
99 1
        sql = 'drop temporary table `{0}`'.format(table_name)
100
101
        self.execute_none(sql)
102
103
    # ------------------------------------------------------------------------------------------------------------------
104
    def execute_none(self, query: str) -> int:
105
        """
106
        Executes a query that does not select any rows.
107
108
        :param query: The query.
109
        """
110 1
        self._log_query(query)
111
112
        return self.__dl.execute_none(query)
113
114
    # ------------------------------------------------------------------------------------------------------------------
115
    def execute_rows(self, query: str) -> List[Dict[str, Any]]:
116
        """
117
        Executes a query that selects 0 or more rows. Returns the selected rows (an empty list if no rows are selected).
118
119
        :param query: The query.
120
        """
121
        self._log_query(query)
122
123 1
        return self.__dl.execute_rows(query)
124
125
    # ------------------------------------------------------------------------------------------------------------------
126
    def execute_singleton1(self, query: str) -> Any:
127
        """
128
        Executes SQL statement that selects 1 row with 1 column. Returns the value of the selected column.
129
130
        :param query: The query.
131
        """
132
        self._log_query(query)
133
134
        return self.__dl.execute_singleton1(query)
135
136 1
    # ------------------------------------------------------------------------------------------------------------------
137
    def get_all_table_columns(self) -> List[Dict[str, Union[str, int, None]]]:
138
        """
139
        Selects metadata of all columns of all tables.
140
        """
141
        sql = """
142
(
143
  select table_name
144
  ,      column_name
145
  ,      column_type
146
  ,      character_set_name
147
  ,      data_type
148
  ,      character_maximum_length
149 1
  ,      numeric_precision
150
  ,      ordinal_position
151
  from   information_schema.COLUMNS
152
  where  table_schema = database()
153
  and    table_name  rlike '^[a-zA-Z0-9_]*$'
154
  and    column_name rlike '^[a-zA-Z0-9_]*$'
155
  order by table_name
156
  ,        ordinal_position
157
)
158
159
union all
160
161
(
162
  select concat(table_schema,'.',table_name) table_name
163
  ,      column_name
164
  ,      column_type
165
  ,      character_set_name
166
  ,      data_type
167
  ,      character_maximum_length
168
  ,      numeric_precision
169
  ,      ordinal_position
170
  from   information_schema.COLUMNS
171
  where  table_name  rlike '^[a-zA-Z0-9_]*$'
172
  and    column_name rlike '^[a-zA-Z0-9_]*$'
173
  order by table_schema
174
  ,        table_name
175
  ,        ordinal_position
176
)
177
"""
178
179
        return self.execute_rows(sql)
180
181
    # ------------------------------------------------------------------------------------------------------------------
182
    def get_correct_sql_mode(self, sql_mode: str) -> str:
183
        """
184
        Selects the SQL mode in the order as preferred by MySQL.
185
186
        :param sql_mode: The SQL mode.
187
        """
188
        self.set_sql_mode(sql_mode)
189
190
        sql = 'select @@sql_mode'
191
192
        return self.execute_singleton1(sql)
193
194
    # ------------------------------------------------------------------------------------------------------------------
195
    def get_label_tables(self, regex: str) -> List[Dict[str, Any]]:
196 1
        """
197
        Selects metadata of tables with a label column.
198
199
        :param regex: The regular expression for columns which we want to use.
200
        """
201
        sql = """
202
select t1.TABLE_NAME  table_name
203
,      t1.COLUMN_NAME id
204
,      t2.COLUMN_NAME label
205
from       information_schema.COLUMNS t1
206
inner join information_schema.COLUMNS t2 on t1.TABLE_NAME = t2.TABLE_NAME
207
where t1.TABLE_SCHEMA = database()
208
and   t1.EXTRA        = 'auto_increment'
209
and   t2.TABLE_SCHEMA = database()
210
and   t2.COLUMN_NAME rlike '{0}'""".format(regex)
211 1
212
        return self.execute_rows(sql)
213
214
    # ------------------------------------------------------------------------------------------------------------------
215
    def get_labels_from_table(self, table_name: str, id_column_name: str, label_column_name: str) -> \
216
            List[Dict[str, Any]]:
217
        """
218
        Selects all labels from a table with labels.
219
220
        :param table_name: The name of the table.
221
        :param id_column_name: The name of the auto increment column.
222
        :param label_column_name: The name of the column with labels.
223
        """
224
        sql = """
225
select `{0}`  as `id`
226
,      `{1}`  as `label`
227
from   `{2}`
228
where   nullif(`{1}`,'') is not null""".format(id_column_name,
229
                                               label_column_name,
230
                                               table_name)
231
232
        return self.execute_rows(sql)
233 1
234
    # ------------------------------------------------------------------------------------------------------------------
235
    def last_sql(self) -> str:
236
        """
237
        The last executed SQL statement.
238
        """
239
        return self.__dl.last_sql()
240
241
    # ------------------------------------------------------------------------------------------------------------------
242
    def get_routine_parameters(self, routine_name: str) -> List[Dict[str, Any]]:
243
        """
244
        Selects metadata of the parameters of a stored routine.
245
246
        :param routine_name: The name of the routine.
247
        """
248
        sql = """
249
select t2.PARAMETER_NAME      parameter_name
250
,      t2.DATA_TYPE           parameter_type
251
,      t2.NUMERIC_PRECISION   numeric_precision
252
,      t2.NUMERIC_SCALE       numeric_scale
253
,      t2.DTD_IDENTIFIER      column_type
254
,      t2.CHARACTER_SET_NAME  character_set_name
255 1
,      t2.COLLATION_NAME      collation
256
from            information_schema.ROUTINES   t1
257
left outer join information_schema.PARAMETERS t2  on  t2.SPECIFIC_SCHEMA = t1.ROUTINE_SCHEMA and
258
                                                      t2.SPECIFIC_NAME   = t1.ROUTINE_NAME and
259
                                                      t2.PARAMETER_MODE   is not null
260
where t1.ROUTINE_SCHEMA = database()
261
and   t1.ROUTINE_NAME   = '{0}'""".format(routine_name)
262
263
        return self.execute_rows(sql)
264 1
265
    # ------------------------------------------------------------------------------------------------------------------
266
    def get_routines(self) -> List[Dict[str, Any]]:
267
        """
268
        Selects metadata of all routines in the current schema.
269
        """
270
        sql = """
271
select ROUTINE_NAME           as routine_name
272
,      ROUTINE_TYPE           as routine_type
273
,      SQL_MODE               as sql_mode
274
,      CHARACTER_SET_CLIENT   as character_set_client
275
,      COLLATION_CONNECTION   as collation_connection
276
from  information_schema.ROUTINES
277
where ROUTINE_SCHEMA = database()
278
order by routine_name"""
279
280
        return self.execute_rows(sql)
281
282
    # ------------------------------------------------------------------------------------------------------------------
283
    def set_character_set(self, character_set: str, collate: str) -> None:
284
        """
285
        Sets the default character set and collate.
286
287
        :param character_set: The name of the character set.
288
        :param collate: The name of collate.
289
        """
290 1
        sql = "set names '{0}' collate '{1}'".format(character_set, collate)
291
292
        self.execute_none(sql)
293
294
    # ------------------------------------------------------------------------------------------------------------------
295
    def set_sql_mode(self, sql_mode: str) -> None:
296
        """
297
        Sets the SQL mode.
298
299
        :param sql_mode: The SQL mode.
300
        """
301
        sql = "set sql_mode = '{0}'".format(sql_mode)
302
303
        self.execute_none(sql)
304
305
# ----------------------------------------------------------------------------------------------------------------------
306