Passed
Pull Request — master (#119)
by Cyb3r
01:34
created

bot.utils.datahandler.table_create()   A

Complexity

Conditions 2

Size

Total Lines 11
Code Lines 6

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 2
eloc 6
nop 0
dl 0
loc 11
rs 10
c 0
b 0
f 0
1
"""Handles all postgresql data and tables"""
2
import os
3
import typing
4
5
import psycopg2.errors
6
from psycopg2 import pool
7
from psycopg2.extensions import AsIs
8
from .tables import tables
9
from .logger import make_logger
10
11
# Imports the database logger
12
log = make_logger("database", os.getenv("LOG_LEVEL", "INFO"))
13
14
# Creates the connection to the database
15
16
db_pool = pool.ThreadedConnectionPool(minconn=1, maxconn=15, dsn=os.getenv("DATABASE_URL").strip())
17
DuplicateError = psycopg2.errors.lookup("23505")
18
19
20
def table_create() -> None:
21
    """Table_create
22
23
    Create tables if they do not exist at startup. All tables are pulled from tables.py
24
    :return:
25
    """
26
    with db_pool.getconn() as con:
27
        with con.cursor() as pg_cursor:
28
            try:
29
                for table in tables:
30
                    pg_cursor.execute(table)
31
                con.commit()
32
            except psycopg2.Error as pge:
33
                log.error(pge)
34
                con.rollback()
35
36
37
def _format_step(table: str) -> str:
38
    """
39
    Returns the format string to be used in insert. This was split from insert to make it less
40
    complex and easier to read.
41
42
    :param table: Name of the table that is being used
43
    :type table: str
44
    :return: String that will be used for cursor execution
45
    :rtype: str
46
    """
47
    match table:
48
        case "schools":
49
            query_str = (
50
                "INSERT INTO schools"
51
                "(school, region, color, id, added_by, added_by_id) "
52
                "VALUES (%s, %s, %s, %s, %s, %s);"
53
            )
54
        case "errors":
55
            query_str = (
56
                "INSERT INTO errors"
57
                "(id, command, message, error, time, ack) "
58
                "VALUES (%s, %s, %s, %s, %s, %s);"
59
            )
60
        case "reports":
61
            query_str = (
62
                "INSERT INTO reports"
63
                "(id, name, name_id, message, time) "
64
                "VALUES (%s, %s, %s, %s, %s);"
65
            )
66
        case "admin_channels":
67
            query_str = (
68
                "INSERT INTO admin_channels (name, id, log) "
69
                "VALUES (%s, %s, %s) ON CONFLICT DO NOTHING;"
70
            )
71
        case "bot_admins":
72
            query_str = "INSERT INTO bot_admins (name, id) VALUES (%s, %s) ON CONFLICT DO NOTHING;"
73
        case "regions":
74
            query_str = "INSERT INTO regions (name, id) VALUES (%s, %s);"
75
        case "keys":
76
            query_str = "INSERT INTO keys (key, value) VALUES (%s, %s);"
77
        case _:
78
            log.error(f"Table {table} not found.")
79
            return "error"
80
    return query_str
81
82
83
def _result_parser(column: str, fetched: list) -> typing.Union[list, typing.List[typing.Tuple]]:
84
    """Parse the query results
85
86
    :param column: The name of the column(s) only used to determine how to parse the
87
            results. Multiple commas need ','
88
    :type column: str
89
    :param fetched: Results from SQL query
90
    :type fetched: list
91
    :return: A normal list or a list of tuples.
92
    :rtype: list
93
    """
94
    # fetched is a list so it does not need breaking up
95
    if column == "*" or column.find(",") != -1:
96
        result = fetched
97
    # Breaks up the tuples to a standard list.
98
    else:
99
        result = [x[0] for x in fetched]
100
    return result
101
102
103
async def insert(table: str, data: list) -> typing.Union[None, str]:
104
    """Insert into a table
105
106
    **Asynchronous Function**
107
108
    Inserts a new row to an existing table. Get the string to execute with from _format_step()
109
110
    **PostgreSQL Equivalent:**
111
112
    INSERT into **table** VALUE (**data**);
113
114
    :param table: Table to perform the insert on
115
    :type table: str
116
    :param data: Data that gets placed into the format_str
117
    :type data: list
118
    :return: In the event of an error inserting into the table the string 'error' will be
119
            returned. If there is no error then 'None' will be returned.
120
    :rtype: str
121
    """
122
    format_str = _format_step(table)
123
    if format_str == "error":
124
        return "error"
125
    log.debug(f'String: {format_str} Data {" ".join(map(str, data))}')
126
    with db_pool.getconn() as con:
127
        with con.cursor() as pg_cursor:
128
            try:
129
                pg_cursor.execute(format_str, data)
130
                con.commit()
131
            except psycopg2.Error as pge:
132
                log.error(pge)
133
                con.rollback()
134
                if isinstance(pge, DuplicateError):
135
                    return "duplicate"
136
                return "error"
137
    # con = db_pool.getconn()
138
    # pg_cursor = con.cursor()
139
    # try:
140
    #     pg_cursor.execute(format_str, data)
141
    #     con.commit()
142
    #     return None
143
    # except psycopg2.Error as pge:
144
    #     log.error(pge)
145
    #     pg_cursor.rollback()
146
    #     if isinstance(pge, DuplicateError):
147
    #         return "duplicate"
148
    #     return "error"
149
    # finally:
150
    #     db_pool.putconn(con)
151
152
153
async def fetch(table: str, column: str) -> list:
154
    """Fetch a single column from a table
155
156
    **Asynchronous Function**
157
158
    Retrieves the full column values for **column** from the **table**.
159
160
    **Postgresql Equivalent:**
161
162
    SELECT **column** FROM **table**;
163
164
    :param table: Table that data is being fetched from.
165
    :type table: str
166
    :param column:
167
    :type column: str
168
    :return: List of values
169
    :rtype: list
170
    """
171
    with db_pool.getconn() as con:
172
        with con.cursor() as pg_cursor:
173
            try:
174
                format_str = "SELECT %s FROM %s;"
175
                pg_cursor.execute(format_str, (AsIs(column), AsIs(table)))
176
                fetched = pg_cursor.fetchall()
177
                result = _result_parser(column, fetched)
178
                return result
179
            except psycopg2.Error as pge:
180
                log.error(pge)
181
                con.rollback()
182
                return []
183
    # con = db_pool.getconn()
184
    # pg_cursor = con.cursor()
185
    # try:
186
    #
187
    #     pg_cursor.execute(format_str, (AsIs(column), AsIs(table)))
188
    #     fetched = pg_cursor.fetchall()
189
    #     return _result_parser(column, fetched)
190
    # except psycopg2.Error as pge:
191
    #     log.error(pge)
192
    #     return []
193
    # finally:
194
    #     db_pool.putconn(con)
195
196
197
async def select(
198
    table: str,
199
    column: str,
200
    where_column: str,
201
    where_value: typing.Union[str, bool, int],
202
    symbol: [str, bool] = "=",
203
) -> list:
204
    """Choice specific roles to return
205
206
    **Asynchronous Function**
207
208
209
    **Postgresql Equivalent:**
210
211
    SELECT :ref:`column` FROM :ref:`table`
212
    WHERE :ref:`where_column` :ref:`symbol` :ref:`where_value`;
213
214
    :param table: Table that data is being fetched from.
215
    :type table: str
216
    :param column: Column(s) that is being fetched. Multiple columns need to comma
217
            separated, if all columns are wanted then use '*'.
218
    :type column: str
219
    :param where_column: Column that is going have the value of :ref:`where_value`.
220
    :type where_column: str
221
    :param where_value: Value to match.
222
    :type where_value: str
223
    :param symbol: Symbol to use in comparison. Default is '='
224
    :type symbol: str
225
    :return: List of values that are the results
226
    :rtype: list
227
    """
228
    with db_pool.getconn() as con:
229
        with con.cursor() as pg_cursor:
230
            try:
231
                format_str = "SELECT %s FROM %s WHERE %s %s %s;"
232
                pg_cursor.execute(
233
                    format_str, (AsIs(column), AsIs(table), AsIs(where_column), symbol, where_value)
234
                )
235
                fetched = pg_cursor.fetchall()
236
                result = _result_parser(column, fetched)
237
                return result
238
            except psycopg2.Error as pge:
239
                log.error(pge)
240
                con.rollback()
241
                return []
242
    # con = db_pool.getconn()
243
    # pg_cursor = con.cursor()
244
    # try:
245
    #     format_str = "SELECT %s FROM %s WHERE %s %s %s;"
246
    #     pg_cursor.execute(
247
    #         format_str,
248
    #         (AsIs(column), AsIs(table), AsIs(where_column), AsIs(symbol), where_value),
249
    #     )
250
    #     fetched = pg_cursor.fetchall()
251
    #     return _result_parser(column, fetched)
252
    # except psycopg2.Error as pge:
253
    #     log.error(pge)
254
    #     con.rollback()
255
    #     return []
256
    # finally:
257
    #     db_pool.putconn(con)
258
259
260
async def update(
261
    table: str,
262
    column: str,
263
    where_value: str,
264
    new_value: typing.Union[str, bool],
265
    where_column: str = "",
266
) -> None:
267
    """Update
268
269
    **Asynchronous Function**
270
271
    **Postgresql Equivalent:**
272
273
    UPDATE **table** SET :ref:`column` = :ref:`new_value`
274
    WHERE :ref:`check_column` = :ref:`where_value`;
275
276
    :param table: Table that the data is being updated on.
277
    :type table: str
278
    :param column: Column that is being updated. Multiple columns are not supported.
279
    :type column: str
280
    :param where_value: Value that is going to be updated.
281
    :type where_value: str
282
    :param new_value: Value that is the replacement
283
    :type new_value: str
284
    :param where_column: Column to select from. Multiple columns are not supported.
285
    :type where_column: str
286
    :return: None
287
    """
288
    if not where_column:
289
        where_column = column
290
    with db_pool.getconn() as con:
291
        with con.cursor() as pg_cursor:
292
            try:
293
                format_str = "UPDATE %s SET %s = %s WHERE %s = %s;"
294
                pg_cursor.execute(
295
                    format_str,
296
                    (AsIs(table), AsIs(column), new_value, AsIs(where_column), where_value),
297
                )
298
                con.commit()
299
            except psycopg2.Error as pge:
300
                log.error(pge)
301
                con.rollback()
302
    # con = db_pool.getconn()
303
    # pg_cursor = con.cursor()
304
    # try:
305
    #     format_str = "UPDATE %s SET %s = %s where %s = %s"
306
    #     pg_cursor.execute(
307
    #         format_str,
308
    #         (AsIs(table), AsIs(column), new_value, AsIs(where_column), where_value),
309
    #     )
310
    #     con.commit()
311
    # except psycopg2.Error as pge:
312
    #     log.error(pge)
313
    #     con.rollback()
314
    # finally:
315
    #     db_pool.putconn(con)
316
317
318
async def delete(table: str, column: str, value: str) -> None:
319
    """Delete
320
321
    **Asynchronous Function**
322
323
    Delete value from table
324
325
    **Postgresql Equivalent:**
326
327
    DELETE FROM :ref:`table` WHERE :ref:`column` = :ref:`value`;
328
329
    :param table: Table that the data is being deleted from.
330
    :type table: str
331
    :param column: Column to which the :ref:`value` is going to match.
332
    :type column: str
333
    :param value: Value in the row which is going to match to a value in :ref:`column`
334
    :type value: str
335
    :return: None
336
    """
337
    with db_pool.getconn() as con:
338
        with con.cursor() as pg_cursor:
339
            try:
340
                format_str = "DELETE FROM %s WHERE %s = %s;"
341
                pg_cursor.execute(format_str, (AsIs(table), AsIs(column), value))
342
                con.commit()
343
            except psycopg2.Error as pge:
344
                log.error(pge)
345
                con.rollback()
346
    # con = db_pool.getconn()
347
    # pg_cursor = con.cursor()
348
    # try:
349
    #     log.info(f"Deleting {column} where {value} from {table}")
350
    #     format_str = "DELETE FROM %s WHERE %s = %s"
351
    #     pg_cursor.execute(format_str, (AsIs(table), AsIs(column), value))
352
    #     con.commit()
353
    # except psycopg2.Error as pge:
354
    #     log.error(pge)
355
    #     con.rollback()
356
    # finally:
357
    #     db_pool.putconn(con)
358