Issues (229)

server/lib/ige/SQLiteDatabase.py (6 issues)

1
#
2
#  Copyright 2001 - 2016 Ludek Smid [http://www.ospace.net/]
3
#
4
#  This file is part of Outer Space.
5
#
6
#  Outer Space is free software; you can redistribute it and/or modify
7
#  it under the terms of the GNU General Public License as published by
8
#  the Free Software Foundation; either version 2 of the License, or
9
#  (at your option) any later version.
10
#
11
#  Outer Space is distributed in the hope that it will be useful,
12
#  but WITHOUT ANY WARRANTY; without even the implied warranty of
13
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
#  GNU General Public License for more details.
15
#
16
#  You should have received a copy of the GNU General Public License
17
#  along with Outer Space; if not, write to the Free Software
18
#  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
19
#
20
21
import ige
22
import os.path, log, os, sys, time, types, binascii, bz2
23
import cPickle as pickle
24
import sqlite3
25
26
IDX_PREV = 0
27
IDX_NEXT = 1
28
29
class Database:
30
31
    dbSchema = "data(oid integer primary key asc, data blog not null)"
32
    keyMethod = int
33
34
    def __init__(self, directory, dbName, cache = 128):
35
        log.message("Opening database", dbName)
36
        self.dbName = dbName
37
        self.nextID = 10000
38
        self.cacheSize = cache
39
        #
40
        try:
41
            os.makedirs(directory)
42
        except OSError:
43
            pass
44
        # open db
45
        self.connection = sqlite3.connect(os.path.join(directory, "%s.sqlite" % dbName))
46
        # allow 8-bits strings to be handled correctly (default is unicode)
47
        self.connection.text_factory = str
48
        self.cursor = self.connection.cursor()
49
        self.cursor.execute("create table if not exists %s" % self.dbSchema)
50
        self.connection.commit()
51
        # cache
52
        self.cache = {}
53
        self.cacheLinks = {
54
            "__first__": [None, "__last__"],
55
            "__last__": ["__first__", None],
56
        }
57
        # stats
58
        self.statCount = 0
59
        self.statHit = 0
60
        self.statSwap = 0
61
        self.statMiss = 0
62
        self.statCleanSwap = 0
63
64
    def _moveExistingCacheItem(self, key):
65
        #@log.debug("Cache MOVE ITEM", key, self.cacheLinks[key])
66
        # optimalization
67
        cl = self.cacheLinks
68
        # move item to the end of the chain
69
        prev, next = cl[key]
70
        cl[prev][IDX_NEXT] = next
71
        cl[next][IDX_PREV] = prev
72
        last = cl["__last__"]
73
        prev = last[IDX_PREV]
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable IDX_PREV does not seem to be defined.
Loading history...
74
        cl[key] = [prev, "__last__"]
75
        cl[prev][IDX_NEXT] = key
76
        last[IDX_PREV] = key
77
        # print chain
78
        #idx = "__first__"
79
        #result = []
80
        #while idx != None:
81
        #    item = self.cacheLinks[idx]
82
        #    result.append("%s:%s" % (idx, item))
83
        #    idx = item[IDX_NEXT]
84
        #log.debug("Cache CHAIN", " ".join(result))
85
86
    def _addNewCacheItem(self, key):
87
        #@log.debug("Cache ADD ITEM", key)
88
        # check cache size
89
        #@log.debug("Cache size", len(self.cache), self.cacheSize)
90
        if len(self.cache) > self.cacheSize:
91
            tries = len(self.cache)
92
            while len(self.cache) > self.cacheSize:
93
                # swap out oldest item
94
                current = self.cacheLinks["__first__"]
95
                oldKey = current[IDX_NEXT]
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable IDX_NEXT does not seem to be defined.
Loading history...
96
                tries -= 1
97
                if tries == 0:
98
                    # no room in the cache -> enlarge it
99
                    log.debug("NO ROOM IN THE CACHE", self.dbName, len(self.cache))
100
                    self.cacheSize += 100
101
                    break
102
                if sys.getrefcount(self.cache[oldKey]) > 2:
103
                    #@log.debug("CANNOT swap out", oldKey, "refs", sys.getrefcount(self.cache[oldKey]) - 2)
104
                    # try next element
105
                    self._moveExistingCacheItem(oldKey)
106
                    #@log.debug("Trying to swap out", current[IDX_NEXT])
107
                    continue
108
                else:
109
                    #@log.debug("Swapping out", oldKey)
110
                    self.statSwap += 1
111
                    #TODOif not self.cache[oldKey]._v_modified:
112
                    #TODO    self.statCleanSwap += 1
113
                    prev, next = self.cacheLinks[oldKey]
114
                    current[IDX_NEXT] = next
115
                    self.cacheLinks[next][IDX_PREV] = prev
116
                    self.put(oldKey, pickle.dumps(self.cache[oldKey], pickle.HIGHEST_PROTOCOL))
117
                    del self.cacheLinks[oldKey]
118
                    del self.cache[oldKey]
119
                    # found space
120
                    break
121
        # put item at the end of the chain
122
        last = self.cacheLinks["__last__"]
123
        prev = last[IDX_PREV]
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable IDX_PREV does not seem to be defined.
Loading history...
124
        self.cacheLinks[prev][IDX_NEXT] = key
125
        self.cacheLinks[key] = [prev, "__last__"]
126
        last[IDX_PREV] = key
127
        # print chain
128
        #idx = "__first__"
129
        #result = []
130
        #while idx != None:
131
        #    item = self.cacheLinks[idx]
132
        #    result.append("%s:%s" % (idx, item))
133
        #    idx = item[IDX_NEXT]
134
        #log.debug("Cache CHAIN", " ".join(result))
135
136
    def _updateCacheItem(self, key):
137
        if key in self.cache:
138
            return self._moveExistingCacheItem(key)
139
        return self._addNewCacheItem(key)
140
141
    def _delCacheItem(self, key):
142
        #@log.debug("Cache DEL ITEM", key)
143
        prev, next = self.cacheLinks[key]
144
        self.cacheLinks[prev][IDX_NEXT] = next
145
        self.cacheLinks[next][IDX_PREV] = prev
146
        del self.cacheLinks[key]
147
148
    def __getitem__(self, key):
149
        key = self.keyMethod(key)
150
        self.statCount += 1
151
        if key in self.cache:
152
            self.statHit += 1
153
            self._moveExistingCacheItem(key)
154
            return self.cache[key]
155
        self.statMiss += 1
156
        self.cursor.execute("select * from data where oid = ?", (key,))
157
        row = self.cursor.fetchone()
158
        if row is None:
159
            raise ige.NoSuchObjectException(key)
160
        item = pickle.loads(str(row[1]))
161
        self._addNewCacheItem(key)
162
        self.cache[key] = item
163
        #TODOitem.setModified(0)
164
        return item
165
166
    def __setitem__(self, key, value):
167
        key = self.keyMethod(key)
168
        if type(value) == types.InstanceType:
169
            value.oid = key
170
        # set value
171
        self._updateCacheItem(key)
172
        self.cache[key] = value
173
        #value.setModified(0)
174
        # write through new objects
175
        if not self.has_key(key):
176
            raise ige.ServerException("'%s' created using set method" % key)
177
178
    def __contains__(self, key):
179
        return self.has_key(key)
180
181
    def __delitem__(self, key):
182
        key = self.keyMethod(key)
183
        if key in self.cache:
184
            self._delCacheItem(key)
185
            del self.cache[key]
186
        self.cursor.execute("delete from data where oid = ?", (key,))
187
188
    def has_key(self, key):
189
        key = self.keyMethod(key)
190
        self.cursor.execute("select oid from data where oid = ?", (key,))
191
        return self.cursor.fetchone() is not None
192
193
194
    def keys(self):
195
        self.cursor.execute("select oid from data")
196
        return [row[0] for row in self.cursor]
197
198
    def getItemLength(self, key):
199
        key = self.keyMethod(key)
200
        self.cursor.execute("select * from data where oid = ?", (key,))
201
        row = self.cursor.fetchone()
202
        if row is None:
203
            raise ige.NoSuchObjectException(key)
204
        return len(str(row[1]))
205
206
    def checkpoint(self):
207
        log.debug('DB Checkpoint', self.dbName)
208
        log.debug("Storing all objects")
209
        for key, value in self.cache.iteritems():
210
            self.put(key, pickle.dumps(value, pickle.HIGHEST_PROTOCOL))
211
        # commit transaction
212
        log.debug("Commiting transaction")
213
        self.connection.commit()
214
        log.debug("Commit completed")
215
        # TODO clear cache?
216
        # self.cache.clear()
217
        # self.cacheLinks.clear()
218
        # stats TODO: reenable
219
        self.cursor.execute("select * from data")
220
        items = 0
221
        for i in self.cursor:
222
            items += 1
223
        if self.statCount > 0:
224
            log.debug("****** %s" % self.dbName)
225
            log.debug("Items: %10d" % items)
226
            log.debug("Count: %10d" % self.statCount)
227
            log.debug("Hit  : %10d [%02d %%]" % (self.statHit, int(self.statHit * 100 / self.statCount)))
228
            log.debug("Miss : %10d [%02d %%]" % (self.statMiss, int(self.statMiss * 100 / self.statCount)))
229
            log.debug("Swap : %10d [%02d %%]" % (self.statSwap, int(self.statSwap * 100 / self.statCount)))
230
            log.debug("CSwap: %10d [%02d %%]" % (self.statCleanSwap, int(self.statCleanSwap * 100 / self.statCount)))
231
            log.debug("******")
232
        # more stats
233
        self.statCount = 0
234
        self.statHit = 0
235
        self.statMiss = 0
236
        self.statSwap = 0
237
        self.statCleanSwap = 0
238
239
    def commit(self):
240
        pass
241
        #log.debug("COMMIT?")
242
        #self.txn.commit()
243
        #self.txn = dbEnv.txn_begin()
244
245
    def checkpointDatabase(self):
246
        # checkpoint db (TODO can be moved into the separate process?)
247
        log.debug("Metakit Checkpoint")
248
249
    def shutdown(self):
250
        log.message('DB Shutting down', self.dbName)
251
        self.checkpoint()
252
        del self.connection
253
254
    def clear(self):
255
        log.message("Deleting database", self.dbName)
256
        self.cursor.execute("delete from data")
257
        self.connection.commit()
258
        self.cache.clear()
259
        self.cacheLinks = {
260
            "__first__": [None, "__last__"],
261
            "__last__": ["__first__", None],
262
        }
263
        log.debug("Database is empty")
264
265
    def create(self, object, id = None):
266
        #@log.debug("Creating new object", id)
267
        if not id:
268
            id = self.nextID
269
            while self.has_key(id):
270
                id += 1
271
            self.nextID = id + 1
272
            id = self.keyMethod(id)
273
            object.oid = id
274
        elif hasattr(object, "oid") and object.oid != id:
275
            id = self.keyMethod(id)
276
            log.message("Object OID '%s' != forced OID '%s' - FIXING" % (object.oid, id))
277
            object.oid = id
278
        else:
279
            id = self.keyMethod(id)
280
        #@log.debug("OID =", id)
281
        if self.has_key(id):
282
            raise ige.ServerException("'%s' created twice" % id)
283
        self.cache[id] = object
284
        self._addNewCacheItem(id)
285
        self.put(id, pickle.dumps(object, pickle.HIGHEST_PROTOCOL))
286
        return id
287
288
    def delete(self, key):
289
        del self[key]
290
291
    def get(self, key, default = None):
292
        if self.has_key(key):
293
            return self[key]
294
        else:
295
            return default
296
297
    def put(self, key, data):
298
        self.cursor.execute("select oid from data where oid = ?", (key,))
299
        row = self.cursor.fetchone()
300
        if row:
301
            self.cursor.execute("update data set data = ? where oid = ?", (sqlite3.Binary(data), key))
302
        else:
303
            self.cursor.execute("insert into data (oid, data) values (?, ?)", (key, sqlite3.Binary(data)))
304
        #per put commits impacts performance significantly
305
        #self.connection.commit()
306
307
    def restore(self, filename):
308
        log.message("Restoring database from file", filename)
309
        fh = file(filename, "r")
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable file does not seem to be defined.
Loading history...
310
        line = fh.readline().strip()
311
        if line != "IGE OUTER SPACE BACKUP VERSION 1":
312
            raise ige.ServerException("Incorrect header: %s" % line)
313
        while True:
314
            key = fh.readline().strip()
315
            if key == "END OF BACKUP":
316
                break
317
            data = fh.readline().strip()
318
            key = int(binascii.a2b_hex(key))
319
            data = binascii.a2b_hex(data)
320
            #@log.debug("Storing key", key)
321
            self.put(key, data)
322
        log.message("Database restored")
323
324
    def backup(self, basename):
325
        self.checkpoint()
326
        filename = "%s-%s.osbackup" % (basename, self.dbName)
327
        log.message("Creating backup", filename)
328
        fh = file(filename, "w") #bz2.BZ2File(filename, "w")
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable file does not seem to be defined.
Loading history...
329
        fh.write("IGE OUTER SPACE BACKUP VERSION 1\n")
330
        for key in self.keys():
331
            fh.write(binascii.b2a_hex(str(key)))
332
            fh.write("\n")
333
            fh.write(binascii.b2a_hex(pickle.dumps(self[key], pickle.HIGHEST_PROTOCOL)))
334
            fh.write("\n")
335
        fh.write("END OF BACKUP\n")
336
        fh.close()
337
        log.message("Backup completed")
338
339
class DatabaseString(Database):
340
341
    dbSchema = "data(oid text primary key asc, data blog not null)"
342
    keyMethod = str
343
344
    def restore(self, filename, include = None):
345
        log.message("Restoring database from file", filename)
346
        fh = file(filename, "r")
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable file does not seem to be defined.
Loading history...
347
        line = fh.readline().strip()
348
        if line != "IGE OUTER SPACE BACKUP VERSION 1":
349
            raise ige.ServerException("Incorrect header: %s" % line)
350
        imported = 0
351
        skipped = 0
352
        while True:
353
            key = fh.readline().strip()
354
            if key == "END OF BACKUP":
355
                break
356
            data = fh.readline().strip()
357
            key = binascii.a2b_hex(key)
358
            if include and not include(key):
359
                skipped += 1
360
                continue
361
            imported += 1
362
            data = binascii.a2b_hex(data)
363
            #@log.debug("Storing key", key)
364
            self.put(key, data)
365
        log.message("Database restored (%d imported, %d skipped)" % (imported, skipped))
366
367