Completed
Push — master ( fb4f1c...e82363 )
by Roy
01:19
created

BaseDB._insert()   A

Complexity

Conditions 4

Size

Total Lines 15

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 4
dl 0
loc 15
rs 9.2
c 0
b 0
f 0
1
#!/usr/bin/env python
2
# -*- encoding: utf-8 -*-
3
# vim: set et sw=4 ts=4 sts=4 ff=unix fenc=utf8:
4
# Author: Binux<[email protected]>
5
#         http://binux.me
6
# Created on 2012-08-30 17:43:49
7
8
from __future__ import unicode_literals, division, absolute_import
9
10
import logging
11
logger = logging.getLogger('database.basedb')
12
13
from six import itervalues
14
15
16
class BaseDB:
17
18
    '''
19
    BaseDB
20
21
    dbcur should be overwirte
22
    '''
23
    __tablename__ = None
24
    placeholder = '%s'
25
26
    @staticmethod
27
    def escape(string):
28
        return '`%s`' % string
29
30
    @property
31
    def dbcur(self):
32
        raise NotImplementedError
33
34
    def _execute(self, sql_query, values=[]):
35
        dbcur = self.dbcur
36
        dbcur.execute(sql_query, values)
37
        return dbcur
38
39
    def _select(self, tablename=None, what="*", where="", where_values=[], offset=0, limit=None):
40
        tablename = self.escape(tablename or self.__tablename__)
41
        if isinstance(what, list) or isinstance(what, tuple) or what is None:
42
            what = ','.join(self.escape(f) for f in what) if what else '*'
43
44
        sql_query = "SELECT %s FROM %s" % (what, tablename)
45
        if where:
46
            sql_query += " WHERE %s" % where
47
        if limit:
48
            sql_query += " LIMIT %d, %d" % (offset, limit)
49
        elif offset:
50
            sql_query += " LIMIT %d, %d" % (offset, -1)
51
        logger.debug("<sql: %s>", sql_query)
52
53
        for row in self._execute(sql_query, where_values):
54
            yield row
55
56
    def _select2dic(self, tablename=None, what="*", where="", where_values=[],
57
                    order=None, offset=0, limit=None):
58
        tablename = self.escape(tablename or self.__tablename__)
59
        if isinstance(what, list) or isinstance(what, tuple) or what is None:
60
            what = ','.join(self.escape(f) for f in what) if what else '*'
61
62
        sql_query = "SELECT %s FROM %s" % (what, tablename)
63
        if where:
64
            sql_query += " WHERE %s" % where
65
        if order:
66
            sql_query += ' ORDER BY %s' % order
67
        if limit:
68
            sql_query += " LIMIT %d, %d" % (offset, limit)
69
        elif offset:
70
            sql_query += " LIMIT %d, %d" % (offset, -1)
71
        logger.debug("<sql: %s>", sql_query)
72
73
        dbcur = self._execute(sql_query, where_values)
74
        fields = [f[0] for f in dbcur.description]
75
76
        for row in dbcur:
77
            yield dict(zip(fields, row))
78
79
    def _replace(self, tablename=None, **values):
80
        tablename = self.escape(tablename or self.__tablename__)
81
        if values:
82
            _keys = ", ".join(self.escape(k) for k in values)
83
            _values = ", ".join([self.placeholder, ] * len(values))
84
            sql_query = "REPLACE INTO %s (%s) VALUES (%s)" % (tablename, _keys, _values)
85
        else:
86
            sql_query = "REPLACE INTO %s DEFAULT VALUES" % tablename
87
        logger.debug("<sql: %s>", sql_query)
88
89
        if values:
90
            dbcur = self._execute(sql_query, list(itervalues(values)))
91
        else:
92
            dbcur = self._execute(sql_query)
93
        return dbcur.lastrowid
94
95
    def _insert(self, tablename=None, **values):
96
        tablename = self.escape(tablename or self.__tablename__)
97
        if values:
98
            _keys = ", ".join((self.escape(k) for k in values))
99
            _values = ", ".join([self.placeholder, ] * len(values))
100
            sql_query = "INSERT INTO %s (%s) VALUES (%s)" % (tablename, _keys, _values)
101
        else:
102
            sql_query = "INSERT INTO %s DEFAULT VALUES" % tablename
103
        logger.debug("<sql: %s>", sql_query)
104
105
        if values:
106
            dbcur = self._execute(sql_query, list(itervalues(values)))
107
        else:
108
            dbcur = self._execute(sql_query)
109
        return dbcur.lastrowid
110
111
    def _update(self, tablename=None, where="1=0", where_values=[], **values):
112
        tablename = self.escape(tablename or self.__tablename__)
113
        _key_values = ", ".join([
114
            "%s = %s" % (self.escape(k), self.placeholder) for k in values
115
        ])
116
        sql_query = "UPDATE %s SET %s WHERE %s" % (tablename, _key_values, where)
117
        logger.debug("<sql: %s>", sql_query)
118
119
        return self._execute(sql_query, list(itervalues(values)) + list(where_values))
120
121
    def _delete(self, tablename=None, where="1=0", where_values=[]):
122
        tablename = self.escape(tablename or self.__tablename__)
123
        sql_query = "DELETE FROM %s" % tablename
124
        if where:
125
            sql_query += " WHERE %s" % where
126
        logger.debug("<sql: %s>", sql_query)
127
128
        return self._execute(sql_query, where_values)
129
130
if __name__ == "__main__":
131
    import sqlite3
132
133
    class DB(BaseDB):
134
        __tablename__ = "test"
135
136
        def __init__(self):
137
            self.conn = sqlite3.connect(":memory:")
138
            cursor = self.conn.cursor()
139
            cursor.execute(
140
                '''CREATE TABLE `%s` (id INTEGER PRIMARY KEY AUTOINCREMENT, name, age)'''
141
                % self.__tablename__
142
            )
143
144
        @property
145
        def dbcur(self):
146
            return self.conn.cursor()
147
148
    db = DB()
149
    assert db._insert(db.__tablename__, name="binux", age=23) == 1
150
    assert db._select(db.__tablename__, "name, age").fetchone() == ("binux", 23)
151
    assert db._select2dic(db.__tablename__, "name, age")[0]["name"] == "binux"
152
    assert db._select2dic(db.__tablename__, "name, age")[0]["age"] == 23
153
    db._replace(db.__tablename__, id=1, age=24)
154
    assert db._select(db.__tablename__, "name, age").fetchone() == (None, 24)
155
    db._update(db.__tablename__, "id = 1", age=16)
156
    assert db._select(db.__tablename__, "name, age").fetchone() == (None, 16)
157
    db._delete(db.__tablename__, "id = 1")
158
    assert db._select(db.__tablename__).fetchall() == []
159