Passed
Pull Request — master (#119)
by Cyb3r
01:20
created

bot.utils.datahandler.select()   A

Complexity

Conditions 2

Size

Total Lines 47
Code Lines 20

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 2
eloc 20
nop 5
dl 0
loc 47
rs 9.4
c 0
b 0
f 0
1
"""Handles all postgresql data and tables"""
2
import os
3
import typing
4
5
import psycopg2.errors
6
from psycopg2 import pool
7
from psycopg2.extensions import AsIs
8
from .tables import tables
9
from .logger import make_logger
10
11
# Imports the database logger
12
log = make_logger("database", os.getenv("LOG_LEVEL", "INFO"))
13
14
# Creates the connection to the database
15
16
db_pool = pool.ThreadedConnectionPool(minconn=1, maxconn=15, dsn=os.getenv("DATABASE_URL").strip())
17
DuplicateError = psycopg2.errors.lookup("23505")
18
19
20
def table_create() -> None:
21
    """Table_create
22
23
    Create tables if they do not exist at startup. All tables are pulled from tables.py
24
    :return:
25
    """
26
    with db_pool.getconn() as con:
27
        with con.cursor() as pg_cursor:
28
            try:
29
                for table in tables:
30
                    pg_cursor.execute(table)
31
                con.commit()
32
            except psycopg2.Error as pge:
33
                log.error(pge)
34
                con.rollback()
35
36
37
def _format_step(table: str) -> str:
38
    """
39
    Returns the format string to be used in insert. This was split from insert to make it less
40
    complex and easier to read.
41
42
    :param table: Name of the table that is being used
43
    :type table: str
44
    :return: String that will be used for cursor execution
45
    :rtype: str
46
    """
47
    match table:
48
        case "schools":
49
            query_str = (
50
                "INSERT INTO schools"
51
                "(school, region, color, id, added_by, added_by_id) "
52
                "VALUES (%s, %s, %s, %s, %s, %s);"
53
            )
54
        case "errors":
55
            query_str = (
56
                "INSERT INTO errors"
57
                "(id, command, message, error, time, ack) "
58
                "VALUES (%s, %s, %s, %s, %s, %s);"
59
            )
60
        case "reports":
61
            query_str = (
62
                "INSERT INTO reports"
63
                "(id, name, name_id, message, time) "
64
                "VALUES (%s, %s, %s, %s, %s);"
65
            )
66
        case "admin_channels":
67
            query_str = (
68
                "INSERT INTO admin_channels (name, id, log) "
69
                "VALUES (%s, %s, %s) ON CONFLICT DO NOTHING;"
70
            )
71
        case "bot_admins":
72
            query_str = "INSERT INTO bot_admins (name, id) VALUES (%s, %s) ON CONFLICT DO NOTHING;"
73
        case "regions":
74
            query_str = "INSERT INTO regions (name, id) VALUES (%s, %s);"
75
        case "keys":
76
            query_str = "INSERT INTO keys (key, value) VALUES (%s, %s);"
77
        case _:
78
            log.error(f"Table {table} not found.")
79
            return "error"
80
    return query_str
81
82
83
def _result_parser(column: str, fetched: list) -> typing.Union[list, typing.List[typing.Tuple]]:
84
    """Parse the query results
85
86
    :param column: The name of the column(s) only used to determine how to parse the
87
            results. Multiple commas need ','
88
    :type column: str
89
    :param fetched: Results from SQL query
90
    :type fetched: list
91
    :return: A normal list or a list of tuples.
92
    :rtype: list
93
    """
94
    # fetched is a list so it does not need breaking up
95
    if column == "*" or column.find(",") != -1:
96
        result = fetched
97
    # Breaks up the tuples to a standard list.
98
    else:
99
        result = [x[0] for x in fetched]
100
    return result
101
102
103
async def insert(table: str, data: list) -> typing.Union[None, str]:
104
    """Insert into a table
105
106
    **Asynchronous Function**
107
108
    Inserts a new row to an existing table. Get the string to execute with from _format_step()
109
110
    **PostgreSQL Equivalent:**
111
112
    INSERT into **table** VALUE (**data**);
113
114
    :param table: Table to perform the insert on
115
    :type table: str
116
    :param data: Data that gets placed into the format_str
117
    :type data: list
118
    :return: In the event of an error inserting into the table the string 'error' will be
119
            returned. If there is no error then 'None' will be returned.
120
    :rtype: str
121
    """
122
    format_str = _format_step(table)
123
    if format_str == "error":
124
        return "error"
125
    log.debug(f'String: {format_str} Data {" ".join(map(str, data))}')
126
    with db_pool.getconn() as con, con.cursor() as pg_cursor:
127
        try:
128
            pg_cursor.execute(format_str, data)
129
            con.commit()
130
            return None
131
        except psycopg2.Error as pge:
132
            log.error(pge)
133
            con.rollback()
134
            if isinstance(pge, DuplicateError):
135
                return "duplicate"
136
            return "error"
137
138
139
async def fetch(table: str, column: str) -> list:
140
    """Fetch a single column from a table
141
142
    **Asynchronous Function**
143
144
    Retrieves the full column values for **column** from the **table**.
145
146
    **Postgresql Equivalent:**
147
148
    SELECT **column** FROM **table**;
149
150
    :param table: Table that data is being fetched from.
151
    :type table: str
152
    :param column:
153
    :type column: str
154
    :return: List of values
155
    :rtype: list
156
    """
157
    with db_pool.getconn() as con, con.cursor() as pg_cursor:
158
        try:
159
            format_str = "SELECT %s FROM %s;"
160
            pg_cursor.execute(format_str, (AsIs(column), AsIs(table)))
161
            fetched = pg_cursor.fetchall()
162
            result = _result_parser(column, fetched)
163
            return result
164
        except psycopg2.Error as pge:
165
            log.error(pge)
166
            con.rollback()
167
            return []
168
169
170
async def select(
171
    table: str,
172
    column: str,
173
    where_column: str,
174
    where_value: typing.Union[str, bool, int],
175
    symbol: [str, bool] = "=",
176
) -> list:
177
    """Choice specific roles to return
178
179
    **Asynchronous Function**
180
181
182
    **Postgresql Equivalent:**
183
184
    SELECT :ref:`column` FROM :ref:`table`
185
    WHERE :ref:`where_column` :ref:`symbol` :ref:`where_value`;
186
187
    :param table: Table that data is being fetched from.
188
    :type table: str
189
    :param column: Column(s) that is being fetched. Multiple columns need to comma
190
            separated, if all columns are wanted then use '*'.
191
    :type column: str
192
    :param where_column: Column that is going have the value of :ref:`where_value`.
193
    :type where_column: str
194
    :param where_value: Value to match.
195
    :type where_value: str
196
    :param symbol: Symbol to use in comparison. Default is '='
197
    :type symbol: str
198
    :return: List of values that are the results
199
    :rtype: list
200
    """
201
    with db_pool.getconn() as con, con.cursor() as pg_cursor:
202
        try:
203
            format_str = "SELECT %s FROM %s WHERE %s %s %s;"
204
            pg_cursor.execute(
205
                format_str, (AsIs(column), AsIs(table), AsIs(where_column), symbol, where_value)
206
            )
207
            fetched = pg_cursor.fetchall()
208
            result = _result_parser(column, fetched)
209
            return result
210
        except psycopg2.Error as pge:
211
            log.error(pge)
212
            con.rollback()
213
            return []
214
215
216
async def update(
217
    table: str,
218
    column: str,
219
    where_value: str,
220
    new_value: typing.Union[str, bool],
221
    where_column: str = "",
222
) -> None:
223
    """Update
224
225
    **Asynchronous Function**
226
227
    **Postgresql Equivalent:**
228
229
    UPDATE **table** SET :ref:`column` = :ref:`new_value`
230
    WHERE :ref:`check_column` = :ref:`where_value`;
231
232
    :param table: Table that the data is being updated on.
233
    :type table: str
234
    :param column: Column that is being updated. Multiple columns are not supported.
235
    :type column: str
236
    :param where_value: Value that is going to be updated.
237
    :type where_value: str
238
    :param new_value: Value that is the replacement
239
    :type new_value: str
240
    :param where_column: Column to select from. Multiple columns are not supported.
241
    :type where_column: str
242
    :return: None
243
    """
244
    if not where_column:
245
        where_column = column
246
247
    with db_pool.getconn() as con, con.cursor() as pg_cursor:
248
        try:
249
            format_str = "UPDATE %s SET %s = %s WHERE %s = %s;"
250
            pg_cursor.execute(
251
                format_str,
252
                (AsIs(table), AsIs(column), new_value, AsIs(where_column), where_value),
253
            )
254
            con.commit()
255
        except psycopg2.Error as pge:
256
            log.error(pge)
257
            con.rollback()
258
259
260
async def delete(table: str, column: str, value: str) -> None:
261
    """Delete
262
263
    **Asynchronous Function**
264
265
    Delete value from table
266
267
    **Postgresql Equivalent:**
268
269
    DELETE FROM :ref:`table` WHERE :ref:`column` = :ref:`value`;
270
271
    :param table: Table that the data is being deleted from.
272
    :type table: str
273
    :param column: Column to which the :ref:`value` is going to match.
274
    :type column: str
275
    :param value: Value in the row which is going to match to a value in :ref:`column`
276
    :type value: str
277
    :return: None
278
    """
279
    with db_pool.getconn() as con, con.cursor() as pg_cursor:
280
        try:
281
            format_str = "DELETE FROM %s WHERE %s = %s;"
282
            pg_cursor.execute(format_str, (AsIs(table), AsIs(column), value))
283
            con.commit()
284
        except psycopg2.Error as pge:
285
            log.error(pge)
286
            con.rollback()
287