Completed
Push — master ( 39eece...c8d455 )
by Roy
01:11
created

BaseDB._select2dic()   C

Complexity

Conditions 11

Size

Total Lines 20

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 11
dl 0
loc 20
rs 5.5714

How to fix   Complexity   

Complexity

Complex classes like BaseDB._select2dic() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
#!/usr/bin/env python
2
# -*- encoding: utf-8 -*-
3
# vim: set et sw=4 ts=4 sts=4 ff=unix fenc=utf8:
4
# Author: Binux<[email protected]>
5
#         http://binux.me
6
# Created on 2012-08-30 17:43:49
7
8
from __future__ import unicode_literals, division, absolute_import
9
10
import logging
11
logger = logging.getLogger('database.basedb')
12
13
from six import itervalues
14
15
16
class BaseDB:
17
18
    '''
19
    BaseDB
20
21
    dbcur should be overwirte
22
    '''
23
    __tablename__ = None
24
    placeholder = '%s'
25
26
    @staticmethod
27
    def escape(string):
28
        return '`%s`' % string
29
30
    @property
31
    def dbcur(self):
32
        raise NotImplementedError
33
34
    def _execute(self, sql_query, values=[]):
35
        dbcur = self.dbcur
36
        dbcur.execute(sql_query, values)
37
        return dbcur
38
39
    def _select(self, tablename=None, what="*", where="", where_values=[], offset=0, limit=None):
40
        tablename = self.escape(tablename or self.__tablename__)
41
        if isinstance(what, list) or isinstance(what, tuple) or what is None:
42
            what = ','.join(self.escape(f) for f in what) if what else '*'
43
44
        sql_query = "SELECT %s FROM %s" % (what, tablename)
45
        if where:
46
            sql_query += " WHERE %s" % where
47
        if limit:
48
            sql_query += " LIMIT %d, %d" % (offset, limit)
49
        logger.debug("<sql: %s>", sql_query)
50
51
        for row in self._execute(sql_query, where_values):
52
            yield row
53
54
    def _select2dic(self, tablename=None, what="*", where="", where_values=[],
55
                    order=None, offset=0, limit=None):
56
        tablename = self.escape(tablename or self.__tablename__)
57
        if isinstance(what, list) or isinstance(what, tuple) or what is None:
58
            what = ','.join(self.escape(f) for f in what) if what else '*'
59
60
        sql_query = "SELECT %s FROM %s" % (what, tablename)
61
        if where:
62
            sql_query += " WHERE %s" % where
63
        if order:
64
            sql_query += ' ORDER BY %s' % order
65
        if limit:
66
            sql_query += " LIMIT %d, %d" % (offset, limit)
67
        logger.debug("<sql: %s>", sql_query)
68
69
        dbcur = self._execute(sql_query, where_values)
70
        fields = [f[0] for f in dbcur.description]
71
72
        for row in dbcur:
73
            yield dict(zip(fields, row))
74
75 View Code Duplication
    def _replace(self, tablename=None, **values):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
76
        tablename = self.escape(tablename or self.__tablename__)
77
        if values:
78
            _keys = ", ".join(self.escape(k) for k in values)
79
            _values = ", ".join([self.placeholder, ] * len(values))
80
            sql_query = "REPLACE INTO %s (%s) VALUES (%s)" % (tablename, _keys, _values)
81
        else:
82
            sql_query = "REPLACE INTO %s DEFAULT VALUES" % tablename
83
        logger.debug("<sql: %s>", sql_query)
84
85
        if values:
86
            dbcur = self._execute(sql_query, list(itervalues(values)))
87
        else:
88
            dbcur = self._execute(sql_query)
89
        return dbcur.lastrowid
90
91 View Code Duplication
    def _insert(self, tablename=None, **values):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
92
        tablename = self.escape(tablename or self.__tablename__)
93
        if values:
94
            _keys = ", ".join((self.escape(k) for k in values))
95
            _values = ", ".join([self.placeholder, ] * len(values))
96
            sql_query = "INSERT INTO %s (%s) VALUES (%s)" % (tablename, _keys, _values)
97
        else:
98
            sql_query = "INSERT INTO %s DEFAULT VALUES" % tablename
99
        logger.debug("<sql: %s>", sql_query)
100
101
        if values:
102
            dbcur = self._execute(sql_query, list(itervalues(values)))
103
        else:
104
            dbcur = self._execute(sql_query)
105
        return dbcur.lastrowid
106
107
    def _update(self, tablename=None, where="1=0", where_values=[], **values):
108
        tablename = self.escape(tablename or self.__tablename__)
109
        _key_values = ", ".join([
110
            "%s = %s" % (self.escape(k), self.placeholder) for k in values
111
        ])
112
        sql_query = "UPDATE %s SET %s WHERE %s" % (tablename, _key_values, where)
113
        logger.debug("<sql: %s>", sql_query)
114
115
        return self._execute(sql_query, list(itervalues(values)) + list(where_values))
116
117
    def _delete(self, tablename=None, where="1=0", where_values=[]):
118
        tablename = self.escape(tablename or self.__tablename__)
119
        sql_query = "DELETE FROM %s" % tablename
120
        if where:
121
            sql_query += " WHERE %s" % where
122
        logger.debug("<sql: %s>", sql_query)
123
124
        return self._execute(sql_query, where_values)
125
126
if __name__ == "__main__":
127
    import sqlite3
128
129
    class DB(BaseDB):
130
        __tablename__ = "test"
131
132
        def __init__(self):
133
            self.conn = sqlite3.connect(":memory:")
134
            cursor = self.conn.cursor()
135
            cursor.execute(
136
                '''CREATE TABLE `%s` (id INTEGER PRIMARY KEY AUTOINCREMENT, name, age)'''
137
                % self.__tablename__
138
            )
139
140
        @property
141
        def dbcur(self):
142
            return self.conn.cursor()
143
144
    db = DB()
145
    assert db._insert(db.__tablename__, name="binux", age=23) == 1
146
    assert db._select(db.__tablename__, "name, age").fetchone() == ("binux", 23)
147
    assert db._select2dic(db.__tablename__, "name, age")[0]["name"] == "binux"
148
    assert db._select2dic(db.__tablename__, "name, age")[0]["age"] == 23
149
    db._replace(db.__tablename__, id=1, age=24)
150
    assert db._select(db.__tablename__, "name, age").fetchone() == (None, 24)
151
    db._update(db.__tablename__, "id = 1", age=16)
152
    assert db._select(db.__tablename__, "name, age").fetchone() == (None, 16)
153
    db._delete(db.__tablename__, "id = 1")
154
    assert db._select(db.__tablename__).fetchall() == []
155