pystratum_mssql.MsSqlMetadataDataLayer   A
last analyzed

Complexity

Total Complexity 13

Size/Duplication

Total Lines 238
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 13
eloc 56
dl 0
loc 238
rs 10
c 0
b 0
f 0

13 Methods

Rating   Name   Duplication   Size   Complexity  
A MsSqlMetadataDataLayer.get_label_tables() 0 22 1
A MsSqlMetadataDataLayer.connect() 0 5 1
A MsSqlMetadataDataLayer.execute_rows() 0 11 1
A MsSqlMetadataDataLayer.drop_temporary_table() 0 9 1
A MsSqlMetadataDataLayer.get_all_table_columns() 0 32 1
A MsSqlMetadataDataLayer.get_routines() 0 17 1
A MsSqlMetadataDataLayer.commit() 0 5 1
A MsSqlMetadataDataLayer.disconnect() 0 5 1
A MsSqlMetadataDataLayer.__init__() 0 10 1
A MsSqlMetadataDataLayer.get_routine_parameters() 0 24 1
A MsSqlMetadataDataLayer.drop_stored_routine() 0 11 1
A MsSqlMetadataDataLayer.get_labels_from_table() 0 28 1
A MsSqlMetadataDataLayer.execute_none() 0 11 1
1
from typing import Any, Dict, List
2
3
from pystratum_backend.StratumStyle import StratumStyle
4
from pystratum_common.MetadataDataLayer import MetadataDataLayer
5
6
from pystratum_mssql.MsSqlDataLayer import MsSqlDataLayer
7
from pystratum_mssql.MsSqlConnector import MsSqlConnector
8
9
10
class MsSqlMetadataDataLayer(MetadataDataLayer):
11
    """
12
    Data layer for retrieving metadata and loading stored routines.
13
    """
14
    __dl = None
15
    """
16
    The connection to the SQL Server instance.
17
18
    :type: pystratum_mssql.MsSqlDataLayer.MsSqlDataLayer|None
19
    """
20
21
    # ------------------------------------------------------------------------------------------------------------------
22
    def __init__(self, io: StratumStyle, connector: MsSqlConnector):
23
        """
24
        Object constructor.
25
26
        :param PyStratumStyle io: The output decorator.
27
        """
28
        super().__init__(io)
29
30
        self.__dl: MsSqlDataLayer = MsSqlDataLayer(connector)
31
        """
32
        The connection to the MySQL instance.
33
        """
34
35
    # ------------------------------------------------------------------------------------------------------------------
36
    def commit(self) -> None:
37
        """
38
        Connects to a SQL Server instance.
39
        """
40
        self.__dl.commit()
41
42
    # ------------------------------------------------------------------------------------------------------------------
43
    def connect(self) -> None:
44
        """
45
        Connects to a SQL Server instance.
46
        """
47
        self.__dl.connect()
48
49
    # ------------------------------------------------------------------------------------------------------------------
50
    def disconnect(self) -> None:
51
        """
52
        Disconnects from the SQL Server instance.
53
        """
54
        self.__dl.disconnect()
55
56
    # ------------------------------------------------------------------------------------------------------------------
57
    def drop_stored_routine(self, routine_type: str, schema_name: str, routine_name: str) -> None:
58
        """
59
        Drops a stored routine if it exists.
60
61
        :param str routine_type: The type of the routine (i.e. procedure or function).
62
        :param str schema_name: The name of the schema.
63
        :param str routine_name: The name of the routine.
64
        """
65
        sql = "drop {0} [{1}].[{2}]".format(routine_type, schema_name, routine_name)
66
67
        self.execute_none(sql)
68
69
    # ------------------------------------------------------------------------------------------------------------------
70
    def drop_temporary_table(self, table_name: str) -> None:
71
        """
72
        Drops a temporary table.
73
74
        :param str table_name: The name of the table.
75
        """
76
        sql = 'drop temporary table `{0}`'.format(table_name)
77
78
        self.execute_none(sql)
79
80
    # ------------------------------------------------------------------------------------------------------------------
81
    def execute_none(self, query: str) -> None:
82
        """
83
        Executes a query that does not select any rows.
84
85
        :param str query: The query.
86
87
        :rtype: int
88
        """
89
        self._log_query(query)
90
91
        return self.__dl.execute_none(query)
92
93
    # ------------------------------------------------------------------------------------------------------------------
94
    def execute_rows(self, query: str) -> List[Dict[str, Any]]:
95
        """
96
        Executes a query that selects 0 or more rows. Returns the selected rows (an empty list if no rows are selected).
97
98
        :param str query: The query.
99
100
        :rtype: list[dict[str,*]]
101
        """
102
        self._log_query(query)
103
104
        return self.__dl.execute_rows(query)
105
106
    # ------------------------------------------------------------------------------------------------------------------
107
    def get_all_table_columns(self) -> List[Dict[str, Any]]:
108
        """
109
        Selects metadata of all columns of all tables.
110
111
        :rtype: list[dict[str,*]]
112
        """
113
        sql = """
114
select scm.name                   as schema_name
115
,      tab.name                   as table_name
116
,      col.name                   as column_name
117
,      isnull(stp.name,utp.name)  as data_type
118
,      col.max_length
119
,      col.precision
120
,      col.scale
121
,      col.column_id
122
from            sys.columns col
123
inner join      sys.types   utp  on  utp.user_type_id = col.user_type_id and
124
                                     utp.system_type_id = col.system_type_id
125
left outer join sys.types   stp  on  utp.is_user_defined = 1 and
126
                                     stp.is_user_defined = 0 and
127
                                     utp.system_type_id = stp.system_type_id and
128
                                     utp.user_type_id <> stp.user_type_id  and
129
                                     stp.user_type_id = stp.system_type_id
130
inner join      sys.tables  tab  on  col.object_id = tab.object_id
131
inner join      sys.schemas scm  on  tab.schema_id = scm.schema_id
132
where tab.type in ('U','S','V')
133
order by  scm.name
134
,         tab.name
135
,         col.column_id
136
"""
137
138
        return self.execute_rows(sql)
139
140
    # ------------------------------------------------------------------------------------------------------------------
141
    def get_label_tables(self, regex: str) -> List[Dict[str, Any]]:
142
        """
143
        Selects metadata of tables with a label column.
144
145
        :param str regex: The regular expression for columns which we want to use.
146
147
        :rtype: list[dict[str,*]]
148
        """
149
        sql = """
150
select db_name() [database]
151
,      scm.name  [schema_name]
152
,      tab.name  [table_name]
153
,      cl1.name  [label]
154
,      cl2.name  [id]
155
from       sys.schemas     scm 
156
inner join sys.tables      tab  on  tab.[schema_id] = scm.[schema_id]
157
inner join sys.all_columns cl1  on  cl1.[object_id] = tab.[object_id]
158
inner join sys.all_columns cl2  on  cl2.[object_id] = tab.[object_id]
159
where cl1.name like '{0}'
160
and   cl2.is_identity = 1""".format(regex)
161
162
        return self.execute_rows(sql)
163
164
    # ------------------------------------------------------------------------------------------------------------------
165
    def get_labels_from_table(self,
166
                              database_name: str,
167
                              schema_name: str,
168
                              table_name: str,
169
                              id_column_name: str,
170
                              label_column_name: str) -> List[Dict[str, Any]]:
171
        """
172
        Selects all labels from a table with labels.
173
174
        :param str database_name: The name of the database.
175
        :param str schema_name: The name of the schema.
176
        :param str table_name: The name of the table.
177
        :param str id_column_name: The name of the auto increment column.
178
        :param str label_column_name: The name of the column with labels.
179
180
        :rtype: list[dict[str,*]]
181
        """
182
        sql = """
183
select tab.[{0!s}] id
184
,      tab.[{1!s}] label
185
from   [{2!s}].[{3!s}].[{4!s}] tab
186
where  nullif(tab.[{1!s}],'') is not null""".format(id_column_name,
187
                                                    label_column_name,
188
                                                    database_name,
189
                                                    schema_name,
190
                                                    table_name)
191
192
        return self.execute_rows(sql)
193
194
    # ------------------------------------------------------------------------------------------------------------------
195
    def get_routine_parameters(self, schema_name: str, routine_name: str) -> List[Dict[str, Any]]:
196
        """
197
        Selects metadata of the parameters of a stored routine.
198
199
        :param str schema_name: The name of the schema.
200
        :param str routine_name: The name of the routine.
201
202
        :rtype: list[dict[str,*]]
203
        """
204
        sql = """
205
select par.name      parameter_name
206
,      typ.name      type_name
207
,      typ.max_length
208
,      typ.precision
209
,      typ.scale
210
from       sys.schemas        scm
211
inner join sys.all_objects    prc  on  prc.[schema_id] = scm.[schema_id]
212
inner join sys.all_parameters par  on  par.[object_id] = prc.[object_id]
213
inner join sys.types          typ  on  typ.user_type_id = par.system_type_id
214
where scm.name = '%s'
215
and   prc.name = '%s'
216
order by par.parameter_id""" % (schema_name, routine_name)
217
218
        return self.execute_rows(sql)
219
220
    # ------------------------------------------------------------------------------------------------------------------
221
    def get_routines(self) -> List[Dict[str, Any]]:
222
        """
223
        Selects metadata of all routines.
224
225
        :rtype: list[dict[str,*]]
226
        """
227
        sql = """
228
select scm.name as schema_name
229
,      prc.name as routine_name
230
,      prc.type as routine_type
231
from       sys.all_objects  prc
232
inner join sys.schemas     scm  on   scm.schema_id = prc.schema_id
233
where prc.type in ('P','FN','TF')
234
and   scm.name <> 'sys'
235
and   prc.is_ms_shipped=0"""
236
237
        return self.execute_rows(sql)
238
239
# ----------------------------------------------------------------------------------------------------------------------
240