Passed
Pull Request — master (#50)
by Cyb3r
01:15
created

bot.utils.datahandler   A

Complexity

Total Complexity 25

Size/Duplication

Total Lines 286
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 122
dl 0
loc 286
rs 10
c 0
b 0
f 0
wmc 25

8 Functions

Rating   Name   Duplication   Size   Complexity  
A insert() 0 37 4
A _result_parser() 0 18 3
A update() 0 44 3
A table_create() 0 11 2
B _format_step() 0 41 7
A select() 0 46 2
A fetch() 0 27 2
A delete() 0 29 2
1
"""Handles all postgresql data and tables"""
2
import os
3
import typing
4
5
import psycopg2.errors
6
from psycopg2 import pool
7
from psycopg2.extensions import AsIs
8
from .tables import tables
9
from .logger import make_logger
10
11
# Imports the main logger
12
log = make_logger("database", os.getenv("LOG_LEVEL", "INFO"))
13
14
# Creates the connection to the database
15
db_pool = pool.ThreadedConnectionPool(minconn=1, maxconn=15, dsn=os.getenv("DATABASE_URL"))
16
DuplicateError = psycopg2.errors.lookup("23505")
17
18
19
def table_create() -> None:
20
    """Table_create
21
22
    Creates tables if they do not exist at startup. All tables are pulled from tables.py
23
    :return:
24
    """
25
    con = db_pool.getconn()
26
    pg_cursor = con.cursor()
27
    for table in tables:
28
        pg_cursor.execute(table)
29
    db_pool.putconn(con)
30
31
32
def _format_step(table: str) -> str:
33
    """
34
    Returns the format string to be used in insert. This was split from insert to make it less
35
    complex and easier to read.
36
37
    :param table: Name of the table that is being used
38
    :type table: str
39
    :return: String that will be used for cursor execution
40
    :rtype: str
41
    """
42
    if table == "schools":
43
        query_str = (
44
            "INSERT INTO schools"
45
            "(school, region, color, id, added_by, added_by_id) "
46
            "VALUES (%s, %s, %s, %s, %s, %s);"
47
        )
48
    elif table == "errors":
49
        query_str = (
50
            "INSERT INTO errors"
51
            "(id, command, message, error, time, ack) "
52
            "VALUES (%s, %s, %s, %s, %s, %s);"
53
        )
54
    elif table == "reports":
55
        query_str = (
56
            "INSERT INTO reports"
57
            "(id, name, name_id, message, time) "
58
            "VALUES (%s, %s, %s, %s, %s);"
59
        )
60
    elif table == "admin_channels":
61
        query_str = (
62
            "INSERT INTO admin_channels (name, id, log) "
63
            "VALUES (%s, %s, %s) ON CONFLICT DO NOTHING;"
64
        )
65
    elif table == "bot_admins":
66
        query_str = "INSERT INTO bot_admins (name, id) VALUES (%s, %s) ON CONFLICT DO NOTHING;"
67
    elif table == "regions":
68
        query_str = "INSERT INTO regions (name, id) VALUES (%s, %s)"
69
    else:
70
        log.error("Table {} not found.".format(table))
71
        return "error"
72
    return query_str
73
74
75
def _result_parser(column: str, fetched: list) -> typing.Union[list, typing.List[typing.Tuple]]:
76
    """Parse the query results
77
78
    :param column: The name of the column(s) only used to determine how to parse the
79
            results. Multiple commas need ','
80
    :type column: str
81
    :param fetched: Results from SQL query
82
    :type fetched: list
83
    :return: A normal list or a list of tuples.
84
    :rtype: list
85
    """
86
    # fetched is a list so it does not need breaking up
87
    if column == "*" or column.find(",") != -1:
88
        result = fetched
89
    # Breaks up the tuples to a standard list.
90
    else:
91
        result = [x[0] for x in fetched]
92
    return result
93
94
95
async def insert(table: str, data: list) -> typing.Union[None, str]:
96
    """Insert into a table
97
98
    **Asynchronous Function**
99
100
    Inserts a new row to an existing table. Get the string to execute with from _format_step()
101
102
    **PostgreSQL Equivalent:**
103
104
    INSERT into **table** VALUE (**data**);
105
106
    :param table: Table to perform the insert on
107
    :type table: str
108
    :param data: Data that gets placed into the format_str
109
    :type data: list
110
    :return: In the event of an error inserting into the table the string 'error' will be
111
            returned. If there is no error then 'None' will be returned.
112
    :rtype: str
113
    """
114
    format_str = _format_step(table)
115
    if format_str == "error":
116
        return "error"
117
    log.debug("String: {} Data {}".format(format_str, " ".join(map(str, data))))
118
    con = db_pool.getconn()
119
    pg_cursor = con.cursor()
120
    try:
121
        pg_cursor.execute(format_str, data)
122
        con.commit()
123
        return None
124
    except psycopg2.Error as pge:
125
        log.error(pge)
126
        pg_cursor.rollback()
127
        if isinstance(pge, DuplicateError):
128
            return "duplicate"
129
        return "error"
130
    finally:
131
        db_pool.putconn(con)
132
133
134
async def fetch(table: str, column: str) -> list:
135
    """Fetch a single column from a table
136
137
    Retrieves the full column values for **column** from the **table**.
138
139
    **Postgresql Equivalent:**
140
141
    SELECT **column** FROM **table**;
142
143
    :param table: Table that data is being fetched from.
144
    :type table: str
145
    :param column:
146
    :type column: str
147
    :return: List of values
148
    :rtype: list
149
    """
150
    con = db_pool.getconn()
151
    pg_cursor = con.cursor()
152
    try:
153
        format_str = "SELECT %s FROM %s;"
154
        pg_cursor.execute(format_str, (AsIs(column), AsIs(table)))
155
        fetched = pg_cursor.fetchall()
156
        return _result_parser(column, fetched)
157
    except psycopg2.Error as pge:
158
        log.error(pge)
159
    finally:
160
        db_pool.putconn(con)
161
162
163
async def select(
164
    table: str,
165
    column: str,
166
    where_column: str,
167
    where_value: typing.Union[str, bool, int],
168
    symbol: [str, bool] = "=",
169
) -> list:
170
    """Choice specific roles to return
171
172
    **Asynchronous Function**
173
174
175
    **Postgresql Equivalent:**
176
177
    SELECT :ref:`column` FROM :ref:`table`
178
    WHERE :ref:`where_column` :ref:`symbol` :ref:`where_value`;
179
180
    :param table: Table that data is being fetched from.
181
    :type table: str
182
    :param column: Column(s) that is being fetched. Multiple columns need to comma
183
            separated, if all columns are wanted then use '*'.
184
    :type column: str
185
    :param where_column: Column that is going have the value of :ref:`where_value`.
186
    :type where_column: str
187
    :param where_value: Value to match.
188
    :type where_value: str
189
    :param symbol: Symbol to use in comparison. Default is '='
190
    :type symbol: str
191
    :return: List of values that are the results
192
    :rtype: list
193
    """
194
    con = db_pool.getconn()
195
    pg_cursor = con.cursor()
196
    try:
197
        format_str = "SELECT %s FROM %s WHERE %s %s %s;"
198
        pg_cursor.execute(
199
            format_str,
200
            (AsIs(column), AsIs(table), AsIs(where_column), AsIs(symbol), where_value),
201
        )
202
        fetched = pg_cursor.fetchall()
203
        return _result_parser(column, fetched)
204
    except psycopg2.Error as pge:
205
        log.error(pge)
206
        con.rollback()
207
    finally:
208
        db_pool.putconn(con)
209
210
211
async def update(
212
    table: str,
213
    column: str,
214
    where_value: str,
215
    new_value: typing.Union[str, bool],
216
    where_column: str = "",
217
) -> None:
218
    """Update
219
220
    **Asynchronous Function**
221
222
    **Postgresql Equivalent:**
223
224
    UPDATE **table** SET :ref:`column` = :ref:`new_value`
225
    WHERE :ref:`check_column` = :ref:`where_value`;
226
227
    :param table: Table that the data is being updated on.
228
    :type table: str
229
    :param column: Column that is being updated. Multiple columns are not supported.
230
    :type column: str
231
    :param where_value: Value that is going to be updated.
232
    :type where_value: str
233
    :param new_value: Value that is the replacement
234
    :type new_value: str
235
    :param where_column: Column to select from. Multiple columns are not supported.
236
    :type where_column: str
237
    :return: None
238
    """
239
    if not where_column:
240
        where_column = column
241
    con = db_pool.getconn()
242
    pg_cursor = con.cursor()
243
    try:
244
        format_str = "UPDATE %s SET %s = %s where %s = %s"
245
        pg_cursor.execute(
246
            format_str,
247
            (AsIs(table), AsIs(column), new_value, AsIs(where_column), where_value),
248
        )
249
        con.commit()
250
    except psycopg2.Error as pge:
251
        log.error(pge)
252
        con.rollback()
253
    finally:
254
        db_pool.putconn(con)
255
256
257
async def delete(table: str, column: str, value: str) -> None:
258
    """Delete
259
260
    Delete value from table
261
262
    **Postgresql Equivalent:**
263
264
    DELETE FROM :ref:`table` WHERE :ref:`column` = :ref:`value`;
265
266
    :param table: Table that the data is being deleted from.
267
    :type table: str
268
    :param column: Column to which the :ref:`value` is going to match.
269
    :type column: str
270
    :param value: Value in the row which is going to match to a value in :ref:`column`
271
    :type value: str
272
    :return: None
273
    """
274
    con = db_pool.getconn()
275
    pg_cursor = con.cursor()
276
    try:
277
        log.info("Deleting {} where {} from {}".format(column, value, table))
278
        format_str = "DELETE FROM %s WHERE %s = %s"
279
        pg_cursor.execute(format_str, (AsIs(table), AsIs(column), value))
280
        con.commit()
281
    except psycopg2.Error as pge:
282
        log.error(pge)
283
        con.rollback()
284
    finally:
285
        db_pool.putconn(con)
286