Passed
Push — master ( dcd1eb...4f7f97 )
by Cyb3r
02:51 queued 01:44
created

bot.utils.datahandler   A

Complexity

Total Complexity 25

Size/Duplication

Total Lines 292
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 124
dl 0
loc 292
rs 10
c 0
b 0
f 0
wmc 25

8 Functions

Rating   Name   Duplication   Size   Complexity  
A insert() 0 37 4
A _result_parser() 0 18 3
A table_create() 0 11 2
B _format_step() 0 41 7
A update() 0 44 3
A select() 0 47 2
A fetch() 0 30 2
A delete() 0 31 2
1
"""Handles all postgresql data and tables"""
2
import os
3
import typing
4
5
import psycopg2.errors
6
from psycopg2 import pool
7
from psycopg2.extensions import AsIs
8
from .tables import tables
9
from .logger import make_logger
10
11
# Imports the main logger
12
log = make_logger("database", os.getenv("LOG_LEVEL", "INFO"))
13
14
# Creates the connection to the database
15
db_pool = pool.ThreadedConnectionPool(minconn=1, maxconn=15, dsn=os.getenv("DATABASE_URL"))
16
DuplicateError = psycopg2.errors.lookup("23505")
17
18
19
def table_create() -> None:
20
    """Table_create
21
22
    Creates tables if they do not exist at startup. All tables are pulled from tables.py
23
    :return:
24
    """
25
    con = db_pool.getconn()
26
    pg_cursor = con.cursor()
27
    for table in tables:
28
        pg_cursor.execute(table)
29
    db_pool.putconn(con)
30
31
32
def _format_step(table: str) -> str:
33
    """
34
    Returns the format string to be used in insert. This was split from insert to make it less
35
    complex and easier to read.
36
37
    :param table: Name of the table that is being used
38
    :type table: str
39
    :return: String that will be used for cursor execution
40
    :rtype: str
41
    """
42
    if table == "schools":
43
        query_str = (
44
            "INSERT INTO schools"
45
            "(school, region, color, id, added_by, added_by_id) "
46
            "VALUES (%s, %s, %s, %s, %s, %s);"
47
        )
48
    elif table == "errors":
49
        query_str = (
50
            "INSERT INTO errors"
51
            "(id, command, message, error, time, ack) "
52
            "VALUES (%s, %s, %s, %s, %s, %s);"
53
        )
54
    elif table == "reports":
55
        query_str = (
56
            "INSERT INTO reports"
57
            "(id, name, name_id, message, time) "
58
            "VALUES (%s, %s, %s, %s, %s);"
59
        )
60
    elif table == "admin_channels":
61
        query_str = (
62
            "INSERT INTO admin_channels (name, id, log) "
63
            "VALUES (%s, %s, %s) ON CONFLICT DO NOTHING;"
64
        )
65
    elif table == "bot_admins":
66
        query_str = "INSERT INTO bot_admins (name, id) VALUES (%s, %s) ON CONFLICT DO NOTHING;"
67
    elif table == "regions":
68
        query_str = "INSERT INTO regions (name, id) VALUES (%s, %s)"
69
    else:
70
        log.error("Table {} not found.".format(table))
71
        return "error"
72
    return query_str
73
74
75
def _result_parser(column: str, fetched: list) -> typing.Union[list, typing.List[typing.Tuple]]:
76
    """Parse the query results
77
78
    :param column: The name of the column(s) only used to determine how to parse the
79
            results. Multiple commas need ','
80
    :type column: str
81
    :param fetched: Results from SQL query
82
    :type fetched: list
83
    :return: A normal list or a list of tuples.
84
    :rtype: list
85
    """
86
    # fetched is a list so it does not need breaking up
87
    if column == "*" or column.find(",") != -1:
88
        result = fetched
89
    # Breaks up the tuples to a standard list.
90
    else:
91
        result = [x[0] for x in fetched]
92
    return result
93
94
95
async def insert(table: str, data: list) -> typing.Union[None, str]:
96
    """Insert into a table
97
98
    **Asynchronous Function**
99
100
    Inserts a new row to an existing table. Get the string to execute with from _format_step()
101
102
    **PostgreSQL Equivalent:**
103
104
    INSERT into **table** VALUE (**data**);
105
106
    :param table: Table to perform the insert on
107
    :type table: str
108
    :param data: Data that gets placed into the format_str
109
    :type data: list
110
    :return: In the event of an error inserting into the table the string 'error' will be
111
            returned. If there is no error then 'None' will be returned.
112
    :rtype: str
113
    """
114
    format_str = _format_step(table)
115
    if format_str == "error":
116
        return "error"
117
    log.debug("String: {} Data {}".format(format_str, " ".join(map(str, data))))
118
    con = db_pool.getconn()
119
    pg_cursor = con.cursor()
120
    try:
121
        pg_cursor.execute(format_str, data)
122
        con.commit()
123
        return None
124
    except psycopg2.Error as pge:
125
        log.error(pge)
126
        pg_cursor.rollback()
127
        if isinstance(pge, DuplicateError):
128
            return "duplicate"
129
        return "error"
130
    finally:
131
        db_pool.putconn(con)
132
133
134
async def fetch(table: str, column: str) -> list:
135
    """Fetch a single column from a table
136
137
    **Asynchronous Function**
138
139
    Retrieves the full column values for **column** from the **table**.
140
141
    **Postgresql Equivalent:**
142
143
    SELECT **column** FROM **table**;
144
145
    :param table: Table that data is being fetched from.
146
    :type table: str
147
    :param column:
148
    :type column: str
149
    :return: List of values
150
    :rtype: list
151
    """
152
    con = db_pool.getconn()
153
    pg_cursor = con.cursor()
154
    try:
155
        format_str = "SELECT %s FROM %s;"
156
        pg_cursor.execute(format_str, (AsIs(column), AsIs(table)))
157
        fetched = pg_cursor.fetchall()
158
        return _result_parser(column, fetched)
159
    except psycopg2.Error as pge:
160
        log.error(pge)
161
        return []
162
    finally:
163
        db_pool.putconn(con)
164
165
166
async def select(
167
    table: str,
168
    column: str,
169
    where_column: str,
170
    where_value: typing.Union[str, bool, int],
171
    symbol: [str, bool] = "=",
172
) -> list:
173
    """Choice specific roles to return
174
175
    **Asynchronous Function**
176
177
178
    **Postgresql Equivalent:**
179
180
    SELECT :ref:`column` FROM :ref:`table`
181
    WHERE :ref:`where_column` :ref:`symbol` :ref:`where_value`;
182
183
    :param table: Table that data is being fetched from.
184
    :type table: str
185
    :param column: Column(s) that is being fetched. Multiple columns need to comma
186
            separated, if all columns are wanted then use '*'.
187
    :type column: str
188
    :param where_column: Column that is going have the value of :ref:`where_value`.
189
    :type where_column: str
190
    :param where_value: Value to match.
191
    :type where_value: str
192
    :param symbol: Symbol to use in comparison. Default is '='
193
    :type symbol: str
194
    :return: List of values that are the results
195
    :rtype: list
196
    """
197
    con = db_pool.getconn()
198
    pg_cursor = con.cursor()
199
    try:
200
        format_str = "SELECT %s FROM %s WHERE %s %s %s;"
201
        pg_cursor.execute(
202
            format_str,
203
            (AsIs(column), AsIs(table), AsIs(where_column), AsIs(symbol), where_value),
204
        )
205
        fetched = pg_cursor.fetchall()
206
        return _result_parser(column, fetched)
207
    except psycopg2.Error as pge:
208
        log.error(pge)
209
        con.rollback()
210
        return []
211
    finally:
212
        db_pool.putconn(con)
213
214
215
async def update(
216
    table: str,
217
    column: str,
218
    where_value: str,
219
    new_value: typing.Union[str, bool],
220
    where_column: str = "",
221
) -> None:
222
    """Update
223
224
    **Asynchronous Function**
225
226
    **Postgresql Equivalent:**
227
228
    UPDATE **table** SET :ref:`column` = :ref:`new_value`
229
    WHERE :ref:`check_column` = :ref:`where_value`;
230
231
    :param table: Table that the data is being updated on.
232
    :type table: str
233
    :param column: Column that is being updated. Multiple columns are not supported.
234
    :type column: str
235
    :param where_value: Value that is going to be updated.
236
    :type where_value: str
237
    :param new_value: Value that is the replacement
238
    :type new_value: str
239
    :param where_column: Column to select from. Multiple columns are not supported.
240
    :type where_column: str
241
    :return: None
242
    """
243
    if not where_column:
244
        where_column = column
245
    con = db_pool.getconn()
246
    pg_cursor = con.cursor()
247
    try:
248
        format_str = "UPDATE %s SET %s = %s where %s = %s"
249
        pg_cursor.execute(
250
            format_str,
251
            (AsIs(table), AsIs(column), new_value, AsIs(where_column), where_value),
252
        )
253
        con.commit()
254
    except psycopg2.Error as pge:
255
        log.error(pge)
256
        con.rollback()
257
    finally:
258
        db_pool.putconn(con)
259
260
261
async def delete(table: str, column: str, value: str) -> None:
262
    """Delete
263
264
    **Asynchronous Function**
265
266
    Delete value from table
267
268
    **Postgresql Equivalent:**
269
270
    DELETE FROM :ref:`table` WHERE :ref:`column` = :ref:`value`;
271
272
    :param table: Table that the data is being deleted from.
273
    :type table: str
274
    :param column: Column to which the :ref:`value` is going to match.
275
    :type column: str
276
    :param value: Value in the row which is going to match to a value in :ref:`column`
277
    :type value: str
278
    :return: None
279
    """
280
    con = db_pool.getconn()
281
    pg_cursor = con.cursor()
282
    try:
283
        log.info("Deleting {} where {} from {}".format(column, value, table))
284
        format_str = "DELETE FROM %s WHERE %s = %s"
285
        pg_cursor.execute(format_str, (AsIs(table), AsIs(column), value))
286
        con.commit()
287
    except psycopg2.Error as pge:
288
        log.error(pge)
289
        con.rollback()
290
    finally:
291
        db_pool.putconn(con)
292