Passed
Pull Request — master (#50)
by Cyb3r
01:18
created

bot.utils.datahandler   A

Complexity

Total Complexity 25

Size/Duplication

Total Lines 288
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 124
dl 0
loc 288
rs 10
c 0
b 0
f 0
wmc 25

8 Functions

Rating   Name   Duplication   Size   Complexity  
A insert() 0 37 4
A _result_parser() 0 18 3
A table_create() 0 11 2
B _format_step() 0 41 7
A update() 0 44 3
A select() 0 47 2
A fetch() 0 28 2
A delete() 0 29 2
1
"""Handles all postgresql data and tables"""
2
import os
3
import typing
4
5
import psycopg2.errors
6
from psycopg2 import pool
7
from psycopg2.extensions import AsIs
8
from .tables import tables
9
from .logger import make_logger
10
11
# Imports the main logger
12
log = make_logger("database", os.getenv("LOG_LEVEL", "INFO"))
13
14
# Creates the connection to the database
15
db_pool = pool.ThreadedConnectionPool(minconn=1, maxconn=15, dsn=os.getenv("DATABASE_URL"))
16
DuplicateError = psycopg2.errors.lookup("23505")
17
18
19
def table_create() -> None:
20
    """Table_create
21
22
    Creates tables if they do not exist at startup. All tables are pulled from tables.py
23
    :return:
24
    """
25
    con = db_pool.getconn()
26
    pg_cursor = con.cursor()
27
    for table in tables:
28
        pg_cursor.execute(table)
29
    db_pool.putconn(con)
30
31
32
def _format_step(table: str) -> str:
33
    """
34
    Returns the format string to be used in insert. This was split from insert to make it less
35
    complex and easier to read.
36
37
    :param table: Name of the table that is being used
38
    :type table: str
39
    :return: String that will be used for cursor execution
40
    :rtype: str
41
    """
42
    if table == "schools":
43
        query_str = (
44
            "INSERT INTO schools"
45
            "(school, region, color, id, added_by, added_by_id) "
46
            "VALUES (%s, %s, %s, %s, %s, %s);"
47
        )
48
    elif table == "errors":
49
        query_str = (
50
            "INSERT INTO errors"
51
            "(id, command, message, error, time, ack) "
52
            "VALUES (%s, %s, %s, %s, %s, %s);"
53
        )
54
    elif table == "reports":
55
        query_str = (
56
            "INSERT INTO reports"
57
            "(id, name, name_id, message, time) "
58
            "VALUES (%s, %s, %s, %s, %s);"
59
        )
60
    elif table == "admin_channels":
61
        query_str = (
62
            "INSERT INTO admin_channels (name, id, log) "
63
            "VALUES (%s, %s, %s) ON CONFLICT DO NOTHING;"
64
        )
65
    elif table == "bot_admins":
66
        query_str = "INSERT INTO bot_admins (name, id) VALUES (%s, %s) ON CONFLICT DO NOTHING;"
67
    elif table == "regions":
68
        query_str = "INSERT INTO regions (name, id) VALUES (%s, %s)"
69
    else:
70
        log.error("Table {} not found.".format(table))
71
        return "error"
72
    return query_str
73
74
75
def _result_parser(column: str, fetched: list) -> typing.Union[list, typing.List[typing.Tuple]]:
76
    """Parse the query results
77
78
    :param column: The name of the column(s) only used to determine how to parse the
79
            results. Multiple commas need ','
80
    :type column: str
81
    :param fetched: Results from SQL query
82
    :type fetched: list
83
    :return: A normal list or a list of tuples.
84
    :rtype: list
85
    """
86
    # fetched is a list so it does not need breaking up
87
    if column == "*" or column.find(",") != -1:
88
        result = fetched
89
    # Breaks up the tuples to a standard list.
90
    else:
91
        result = [x[0] for x in fetched]
92
    return result
93
94
95
async def insert(table: str, data: list) -> typing.Union[None, str]:
96
    """Insert into a table
97
98
    **Asynchronous Function**
99
100
    Inserts a new row to an existing table. Get the string to execute with from _format_step()
101
102
    **PostgreSQL Equivalent:**
103
104
    INSERT into **table** VALUE (**data**);
105
106
    :param table: Table to perform the insert on
107
    :type table: str
108
    :param data: Data that gets placed into the format_str
109
    :type data: list
110
    :return: In the event of an error inserting into the table the string 'error' will be
111
            returned. If there is no error then 'None' will be returned.
112
    :rtype: str
113
    """
114
    format_str = _format_step(table)
115
    if format_str == "error":
116
        return "error"
117
    log.debug("String: {} Data {}".format(format_str, " ".join(map(str, data))))
118
    con = db_pool.getconn()
119
    pg_cursor = con.cursor()
120
    try:
121
        pg_cursor.execute(format_str, data)
122
        con.commit()
123
        return None
124
    except psycopg2.Error as pge:
125
        log.error(pge)
126
        pg_cursor.rollback()
127
        if isinstance(pge, DuplicateError):
128
            return "duplicate"
129
        return "error"
130
    finally:
131
        db_pool.putconn(con)
132
133
134
async def fetch(table: str, column: str) -> list:
135
    """Fetch a single column from a table
136
137
    Retrieves the full column values for **column** from the **table**.
138
139
    **Postgresql Equivalent:**
140
141
    SELECT **column** FROM **table**;
142
143
    :param table: Table that data is being fetched from.
144
    :type table: str
145
    :param column:
146
    :type column: str
147
    :return: List of values
148
    :rtype: list
149
    """
150
    con = db_pool.getconn()
151
    pg_cursor = con.cursor()
152
    try:
153
        format_str = "SELECT %s FROM %s;"
154
        pg_cursor.execute(format_str, (AsIs(column), AsIs(table)))
155
        fetched = pg_cursor.fetchall()
156
        return _result_parser(column, fetched)
157
    except psycopg2.Error as pge:
158
        log.error(pge)
159
        return []
160
    finally:
161
        db_pool.putconn(con)
162
163
164
async def select(
165
    table: str,
166
    column: str,
167
    where_column: str,
168
    where_value: typing.Union[str, bool, int],
169
    symbol: [str, bool] = "=",
170
) -> list:
171
    """Choice specific roles to return
172
173
    **Asynchronous Function**
174
175
176
    **Postgresql Equivalent:**
177
178
    SELECT :ref:`column` FROM :ref:`table`
179
    WHERE :ref:`where_column` :ref:`symbol` :ref:`where_value`;
180
181
    :param table: Table that data is being fetched from.
182
    :type table: str
183
    :param column: Column(s) that is being fetched. Multiple columns need to comma
184
            separated, if all columns are wanted then use '*'.
185
    :type column: str
186
    :param where_column: Column that is going have the value of :ref:`where_value`.
187
    :type where_column: str
188
    :param where_value: Value to match.
189
    :type where_value: str
190
    :param symbol: Symbol to use in comparison. Default is '='
191
    :type symbol: str
192
    :return: List of values that are the results
193
    :rtype: list
194
    """
195
    con = db_pool.getconn()
196
    pg_cursor = con.cursor()
197
    try:
198
        format_str = "SELECT %s FROM %s WHERE %s %s %s;"
199
        pg_cursor.execute(
200
            format_str,
201
            (AsIs(column), AsIs(table), AsIs(where_column), AsIs(symbol), where_value),
202
        )
203
        fetched = pg_cursor.fetchall()
204
        return _result_parser(column, fetched)
205
    except psycopg2.Error as pge:
206
        log.error(pge)
207
        con.rollback()
208
        return []
209
    finally:
210
        db_pool.putconn(con)
211
212
213
async def update(
214
    table: str,
215
    column: str,
216
    where_value: str,
217
    new_value: typing.Union[str, bool],
218
    where_column: str = "",
219
) -> None:
220
    """Update
221
222
    **Asynchronous Function**
223
224
    **Postgresql Equivalent:**
225
226
    UPDATE **table** SET :ref:`column` = :ref:`new_value`
227
    WHERE :ref:`check_column` = :ref:`where_value`;
228
229
    :param table: Table that the data is being updated on.
230
    :type table: str
231
    :param column: Column that is being updated. Multiple columns are not supported.
232
    :type column: str
233
    :param where_value: Value that is going to be updated.
234
    :type where_value: str
235
    :param new_value: Value that is the replacement
236
    :type new_value: str
237
    :param where_column: Column to select from. Multiple columns are not supported.
238
    :type where_column: str
239
    :return: None
240
    """
241
    if not where_column:
242
        where_column = column
243
    con = db_pool.getconn()
244
    pg_cursor = con.cursor()
245
    try:
246
        format_str = "UPDATE %s SET %s = %s where %s = %s"
247
        pg_cursor.execute(
248
            format_str,
249
            (AsIs(table), AsIs(column), new_value, AsIs(where_column), where_value),
250
        )
251
        con.commit()
252
    except psycopg2.Error as pge:
253
        log.error(pge)
254
        con.rollback()
255
    finally:
256
        db_pool.putconn(con)
257
258
259
async def delete(table: str, column: str, value: str) -> None:
260
    """Delete
261
262
    Delete value from table
263
264
    **Postgresql Equivalent:**
265
266
    DELETE FROM :ref:`table` WHERE :ref:`column` = :ref:`value`;
267
268
    :param table: Table that the data is being deleted from.
269
    :type table: str
270
    :param column: Column to which the :ref:`value` is going to match.
271
    :type column: str
272
    :param value: Value in the row which is going to match to a value in :ref:`column`
273
    :type value: str
274
    :return: None
275
    """
276
    con = db_pool.getconn()
277
    pg_cursor = con.cursor()
278
    try:
279
        log.info("Deleting {} where {} from {}".format(column, value, table))
280
        format_str = "DELETE FROM %s WHERE %s = %s"
281
        pg_cursor.execute(format_str, (AsIs(table), AsIs(column), value))
282
        con.commit()
283
    except psycopg2.Error as pge:
284
        log.error(pge)
285
        con.rollback()
286
    finally:
287
        db_pool.putconn(con)
288