Passed
Push — master ( 269135...eec213 )
by
unknown
01:33
created

bot.utils.datahandler.delete()   A

Complexity

Conditions 2

Size

Total Lines 31
Code Lines 12

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 2
eloc 12
nop 3
dl 0
loc 31
rs 9.8
c 0
b 0
f 0
1
"""Handles all postgresql data and tables"""
2
import os
3
import typing
4
5
import psycopg2.errors
6
from psycopg2 import pool
7
from psycopg2.extensions import AsIs
8
from .tables import tables
9
from .logger import make_logger
10
11
# Imports the database logger
12
log = make_logger("database", os.getenv("LOG_LEVEL", "INFO"))
13
14
# Creates the connection to the database
15
16
db_pool = pool.ThreadedConnectionPool(minconn=1, maxconn=15, dsn=os.getenv("DATABASE_URL").strip())
17
DuplicateError = psycopg2.errors.lookup("23505")
18
19
20
def table_create() -> None:
21
    """Table_create
22
23
    Create tables if they do not exist at startup. All tables are pulled from tables.py
24
    :return:
25
    """
26
    with db_pool.getconn() as con, con.cursor() as pg_cursor:
27
        try:
28
            for table in tables:
29
                pg_cursor.execute(table)
30
            con.commit()
31
        except psycopg2.Error as pge:
32
            log.error(pge)
33
            con.rollback()
34
35
36
def _format_step(table: str) -> str:
37
    """
38
    Returns the format string to be used in insert. This was split from insert to make it less
39
    complex and easier to read.
40
41
    :param table: Name of the table that is being used
42
    :type table: str
43
    :return: String that will be used for cursor execution
44
    :rtype: str
45
    """
46
    match table:
47
        case "schools":
48
            query_str = (
49
                "INSERT INTO schools"
50
                "(school, region, color, id, added_by, added_by_id) "
51
                "VALUES (%s, %s, %s, %s, %s, %s);"
52
            )
53
        case "errors":
54
            query_str = (
55
                "INSERT INTO errors"
56
                "(id, command, message, error, time, ack) "
57
                "VALUES (%s, %s, %s, %s, %s, %s);"
58
            )
59
        case "reports":
60
            query_str = (
61
                "INSERT INTO reports"
62
                "(id, name, name_id, message, time) "
63
                "VALUES (%s, %s, %s, %s, %s);"
64
            )
65
        case "admin_channels":
66
            query_str = (
67
                "INSERT INTO admin_channels (name, id, log) "
68
                "VALUES (%s, %s, %s) ON CONFLICT DO NOTHING;"
69
            )
70
        case "bot_admins":
71
            query_str = "INSERT INTO bot_admins (name, id) VALUES (%s, %s) ON CONFLICT DO NOTHING;"
72
        case "regions":
73
            query_str = "INSERT INTO regions (name, id) VALUES (%s, %s);"
74
        case "keys":
75
            query_str = "INSERT INTO keys (key, value) VALUES (%s, %s);"
76
        case _:
77
            log.error(f"Table {table} not found.")
78
            return "error"
79
    return query_str
80
81
82
def _result_parser(column: str, fetched: list) -> typing.Union[list, typing.List[typing.Tuple]]:
83
    """Parse the query results
84
85
    :param column: The name of the column(s) only used to determine how to parse the
86
            results. Multiple commas need ','
87
    :type column: str
88
    :param fetched: Results from SQL query
89
    :type fetched: list
90
    :return: A normal list or a list of tuples.
91
    :rtype: list
92
    """
93
    # fetched is a list so it does not need breaking up
94
    if column == "*" or column.find(",") != -1:
95
        result = fetched
96
    # Breaks up the tuples to a standard list.
97
    else:
98
        result = [x[0] for x in fetched]
99
    return result
100
101
102
async def insert(table: str, data: list) -> typing.Union[None, str]:
103
    """Insert into a table
104
105
    **Asynchronous Function**
106
107
    Inserts a new row to an existing table. Get the string to execute with from _format_step()
108
109
    **PostgreSQL Equivalent:**
110
111
    INSERT into **table** VALUE (**data**);
112
113
    :param table: Table to perform the insert on
114
    :type table: str
115
    :param data: Data that gets placed into the format_str
116
    :type data: list
117
    :return: In the event of an error inserting into the table the string 'error' will be
118
            returned. If there is no error then 'None' will be returned.
119
    :rtype: str
120
    """
121
    format_str = _format_step(table)
122
    if format_str == "error":
123
        return "error"
124
    log.debug(f'String: {format_str} Data {" ".join(map(str, data))}')
125
    with db_pool.getconn() as con, con.cursor() as pg_cursor:
126
        try:
127
            pg_cursor.execute(format_str, data)
128
            con.commit()
129
            return None
130
        except psycopg2.Error as pge:
131
            log.error(pge)
132
            con.rollback()
133
            if isinstance(pge, DuplicateError):
134
                return "duplicate"
135
            return "error"
136
137
138
async def fetch(table: str, column: str) -> list:
139
    """Fetch a single column from a table
140
141
    **Asynchronous Function**
142
143
    Retrieves the full column values for **column** from the **table**.
144
145
    **Postgresql Equivalent:**
146
147
    SELECT **column** FROM **table**;
148
149
    :param table: Table that data is being fetched from.
150
    :type table: str
151
    :param column:
152
    :type column: str
153
    :return: List of values
154
    :rtype: list
155
    """
156
    with db_pool.getconn() as con, con.cursor() as pg_cursor:
157
        try:
158
            format_str = "SELECT %s FROM %s;"
159
            pg_cursor.execute(format_str, (AsIs(column), AsIs(table)))
160
            fetched = pg_cursor.fetchall()
161
            result = _result_parser(column, fetched)
162
            return result
163
        except psycopg2.Error as pge:
164
            log.error(pge)
165
            con.rollback()
166
            return []
167
168
169
async def select(
170
    table: str,
171
    column: str,
172
    where_column: str,
173
    where_value: typing.Union[str, bool, int],
174
    symbol: [str, bool] = "=",
175
) -> list:
176
    """Choice specific roles to return
177
178
    **Asynchronous Function**
179
180
181
    **Postgresql Equivalent:**
182
183
    SELECT :ref:`column` FROM :ref:`table`
184
    WHERE :ref:`where_column` :ref:`symbol` :ref:`where_value`;
185
186
    :param table: Table that data is being fetched from.
187
    :type table: str
188
    :param column: Column(s) that is being fetched. Multiple columns need to comma
189
            separated, if all columns are wanted then use '*'.
190
    :type column: str
191
    :param where_column: Column that is going have the value of :ref:`where_value`.
192
    :type where_column: str
193
    :param where_value: Value to match.
194
    :type where_value: str
195
    :param symbol: Symbol to use in comparison. Default is '='
196
    :type symbol: str
197
    :return: List of values that are the results
198
    :rtype: list
199
    """
200
    with db_pool.getconn() as con, con.cursor() as pg_cursor:
201
        try:
202
            format_str = "SELECT %s FROM %s WHERE %s %s %s;"
203
            pg_cursor.execute(
204
                format_str, (AsIs(column), AsIs(table), AsIs(where_column), symbol, where_value)
205
            )
206
            fetched = pg_cursor.fetchall()
207
            result = _result_parser(column, fetched)
208
            return result
209
        except psycopg2.Error as pge:
210
            log.error(pge)
211
            con.rollback()
212
            return []
213
214
215
async def update(
216
    table: str,
217
    column: str,
218
    where_value: str,
219
    new_value: typing.Union[str, bool],
220
    where_column: str = "",
221
) -> None:
222
    """Update
223
224
    **Asynchronous Function**
225
226
    **Postgresql Equivalent:**
227
228
    UPDATE **table** SET :ref:`column` = :ref:`new_value`
229
    WHERE :ref:`check_column` = :ref:`where_value`;
230
231
    :param table: Table that the data is being updated on.
232
    :type table: str
233
    :param column: Column that is being updated. Multiple columns are not supported.
234
    :type column: str
235
    :param where_value: Value that is going to be updated.
236
    :type where_value: str
237
    :param new_value: Value that is the replacement
238
    :type new_value: str
239
    :param where_column: Column to select from. Multiple columns are not supported.
240
    :type where_column: str
241
    :return: None
242
    """
243
    if not where_column:
244
        where_column = column
245
246
    with db_pool.getconn() as con, con.cursor() as pg_cursor:
247
        try:
248
            format_str = "UPDATE %s SET %s = %s WHERE %s = %s;"
249
            pg_cursor.execute(
250
                format_str,
251
                (AsIs(table), AsIs(column), new_value, AsIs(where_column), where_value),
252
            )
253
            con.commit()
254
        except psycopg2.Error as pge:
255
            log.error(pge)
256
            con.rollback()
257
258
259
async def delete(table: str, column: str, value: str) -> None:
260
    """Delete
261
262
    **Asynchronous Function**
263
264
    Delete value from table
265
266
    **Postgresql Equivalent:**
267
268
    DELETE FROM :ref:`table` WHERE :ref:`column` = :ref:`value`;
269
270
    :param table: Table that the data is being deleted from.
271
    :type table: str
272
    :param column: Column to which the :ref:`value` is going to match.
273
    :type column: str
274
    :param value: Value in the row which is going to match to a value in :ref:`column`
275
    :type value: str
276
    :return: None
277
    """
278
    with db_pool.getconn() as con, con.cursor() as pg_cursor:
279
        try:
280
            format_str = "DELETE FROM %s WHERE %s = %s;"
281
            pg_cursor.execute(format_str, (AsIs(table), AsIs(column), value))
282
            con.commit()
283
        except psycopg2.Error as pge:
284
            log.error(pge)
285
            con.rollback()
286