Completed
Push — master ( cc2800...40418d )
by Marek
15s queued 13s
created

ige.SQLiteDatabase.Database._addNewCacheItem()   B

Complexity

Conditions 5

Size

Total Lines 41
Code Lines 27

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 5
eloc 27
nop 2
dl 0
loc 41
rs 8.7653
c 0
b 0
f 0
1
#
2
#  Copyright 2001 - 2016 Ludek Smid [http://www.ospace.net/]
3
#
4
#  This file is part of Outer Space.
5
#
6
#  Outer Space is free software; you can redistribute it and/or modify
7
#  it under the terms of the GNU General Public License as published by
8
#  the Free Software Foundation; either version 2 of the License, or
9
#  (at your option) any later version.
10
#
11
#  Outer Space is distributed in the hope that it will be useful,
12
#  but WITHOUT ANY WARRANTY; without even the implied warranty of
13
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
#  GNU General Public License for more details.
15
#
16
#  You should have received a copy of the GNU General Public License
17
#  along with Outer Space; if not, write to the Free Software
18
#  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
19
#
20
21
import ige
22
import os.path, log, os, sys, time, types, binascii, bz2
23
import cPickle as pickle
24
import sqlite3
25
26
IDX_PREV = 0
27
IDX_NEXT = 1
28
29
class Database:
30
31
    dbSchema = "data(oid integer primary key asc, data blog not null)"
32
    keyMethod = int
33
34
    def __init__(self, directory, dbName, cache = 128):
35
        log.message("Opening database", dbName)
36
        self.dbName = dbName
37
        self.nextID = 10000
38
        self.cacheSize = cache
39
        #
40
        try:
41
            os.makedirs(directory)
42
        except OSError:
43
            pass
44
        # open db
45
        self.connection = sqlite3.connect(os.path.join(directory, "%s.sqlite" % dbName))
46
        # allow 8-bits strings to be handled correctly (default is unicode)
47
        self.connection.text_factory = str
48
        self.cursor = self.connection.cursor()
49
        self.cursor.execute("create table if not exists %s" % self.dbSchema)
50
        self.connection.commit()
51
        # cache
52
        self.cache = {}
53
        self.cacheLinks = {
54
            "__first__": [None, "__last__"],
55
            "__last__": ["__first__", None],
56
        }
57
        # stats
58
        self.statCount = 0
59
        self.statHit = 0
60
        self.statSwap = 0
61
        self.statMiss = 0
62
        self.statCleanSwap = 0
63
64
    def _moveExistingCacheItem(self, key):
65
        #@log.debug("Cache MOVE ITEM", key, self.cacheLinks[key])
66
        # optimalization
67
        cl = self.cacheLinks
68
        # move item to the end of the chain
69
        prev, next = cl[key]
70
        cl[prev][IDX_NEXT] = next
71
        cl[next][IDX_PREV] = prev
72
        last = cl["__last__"]
73
        prev = last[IDX_PREV]
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable IDX_PREV does not seem to be defined.
Loading history...
74
        cl[key] = [prev, "__last__"]
75
        cl[prev][IDX_NEXT] = key
76
        last[IDX_PREV] = key
77
        # print chain
78
        #idx = "__first__"
79
        #result = []
80
        #while idx != None:
81
        #    item = self.cacheLinks[idx]
82
        #    result.append("%s:%s" % (idx, item))
83
        #    idx = item[IDX_NEXT]
84
        #log.debug("Cache CHAIN", " ".join(result))
85
86
    def _addNewCacheItem(self, key):
87
        #@log.debug("Cache ADD ITEM", key)
88
        # check cache size
89
        #@log.debug("Cache size", len(self.cache), self.cacheSize)
90
        if len(self.cache) > self.cacheSize:
91
            tries = len(self.cache)
92
            while len(self.cache) > self.cacheSize:
93
                # swap out oldest item
94
                current = self.cacheLinks["__first__"]
95
                oldKey = current[IDX_NEXT]
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable IDX_NEXT does not seem to be defined.
Loading history...
96
                tries -= 1
97
                if tries == 0:
98
                    # no room in the cache -> enlarge it
99
                    log.debug("NO ROOM IN THE CACHE", self.dbName, len(self.cache))
100
                    self.cacheSize += 100
101
                    break
102
                if sys.getrefcount(self.cache[oldKey]) > 2:
103
                    #@log.debug("CANNOT swap out", oldKey, "refs", sys.getrefcount(self.cache[oldKey]) - 2)
104
                    # try next element
105
                    self._moveExistingCacheItem(oldKey)
106
                    #@log.debug("Trying to swap out", current[IDX_NEXT])
107
                    continue
108
                else:
109
                    #@log.debug("Swapping out", oldKey)
110
                    self.statSwap += 1
111
                    #TODOif not self.cache[oldKey]._v_modified:
112
                    #TODO    self.statCleanSwap += 1
113
                    prev, next = self.cacheLinks[oldKey]
114
                    current[IDX_NEXT] = next
115
                    self.cacheLinks[next][IDX_PREV] = prev
116
                    self.put(oldKey, pickle.dumps(self.cache[oldKey], pickle.HIGHEST_PROTOCOL))
117
                    del self.cacheLinks[oldKey]
118
                    del self.cache[oldKey]
119
                    # found space
120
                    break
121
        # put item at the end of the chain
122
        last = self.cacheLinks["__last__"]
123
        prev = last[IDX_PREV]
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable IDX_PREV does not seem to be defined.
Loading history...
124
        self.cacheLinks[prev][IDX_NEXT] = key
125
        self.cacheLinks[key] = [prev, "__last__"]
126
        last[IDX_PREV] = key
127
        # print chain
128
        #idx = "__first__"
129
        #result = []
130
        #while idx != None:
131
        #    item = self.cacheLinks[idx]
132
        #    result.append("%s:%s" % (idx, item))
133
        #    idx = item[IDX_NEXT]
134
        #log.debug("Cache CHAIN", " ".join(result))
135
136
    def _updateCacheItem(self, key):
137
        if key in self.cache:
138
            return self._moveExistingCacheItem(key)
139
        return self._addNewCacheItem(key)
140
141
    def _delCacheItem(self, key):
142
        #@log.debug("Cache DEL ITEM", key)
143
        prev, next = self.cacheLinks[key]
144
        self.cacheLinks[prev][IDX_NEXT] = next
145
        self.cacheLinks[next][IDX_PREV] = prev
146
        del self.cacheLinks[key]
147
148
    def __getitem__(self, key):
149
        key = self.keyMethod(key)
150
        self.statCount += 1
151
        if key in self.cache:
152
            self.statHit += 1
153
            self._moveExistingCacheItem(key)
154
            return self.cache[key]
155
        self.statMiss += 1
156
        self.cursor.execute("select * from data where oid = ?", (key,))
157
        row = self.cursor.fetchone()
158
        if row is None:
159
            raise ige.NoSuchObjectException(key)
160
        item = pickle.loads(str(row[1]))
161
        self._addNewCacheItem(key)
162
        self.cache[key] = item
163
        #TODOitem.setModified(0)
164
        return item
165
166
    def __setitem__(self, key, value):
167
        key = self.keyMethod(key)
168
        if type(value) == types.InstanceType:
169
            value.oid = key
170
        # set value
171
        self._updateCacheItem(key)
172
        self.cache[key] = value
173
        #value.setModified(0)
174
        # write through new objects
175
        if not self.has_key(key):
176
            raise ige.ServerException("'%s' created using set method" % key)
177
178
    def __delitem__(self, key):
179
        key = self.keyMethod(key)
180
        if key in self.cache:
181
            self._delCacheItem(key)
182
            del self.cache[key]
183
        self.cursor.execute("delete from data where oid = ?", (key,))
184
185
    def has_key(self, key):
186
        key = self.keyMethod(key)
187
        self.cursor.execute("select oid from data where oid = ?", (key,))
188
        return self.cursor.fetchone() is not None
189
190
    def keys(self):
191
        self.cursor.execute("select oid from data")
192
        return [row[0] for row in self.cursor]
193
194
    def getItemLength(self, key):
195
        key = self.keyMethod(key)
196
        self.cursor.execute("select * from data where oid = ?", (key,))
197
        row = self.cursor.fetchone()
198
        if row is None:
199
            raise ige.NoSuchObjectException(key)
200
        return len(str(row[1]))
201
202
    def checkpoint(self):
203
        log.debug('DB Checkpoint', self.dbName)
204
        log.debug("Storing all objects")
205
        for key, value in self.cache.iteritems():
206
            self.put(key, pickle.dumps(value, pickle.HIGHEST_PROTOCOL))
207
        # commit transaction
208
        log.debug("Commiting transaction")
209
        self.connection.commit()
210
        log.debug("Commit completed")
211
        # TODO clear cache?
212
        # self.cache.clear()
213
        # self.cacheLinks.clear()
214
        # stats TODO: reenable
215
        self.cursor.execute("select * from data")
216
        items = 0
217
        for i in self.cursor:
218
            items += 1
219
        if self.statCount > 0:
220
            log.debug("****** %s" % self.dbName)
221
            log.debug("Items: %10d" % items)
222
            log.debug("Count: %10d" % self.statCount)
223
            log.debug("Hit  : %10d [%02d %%]" % (self.statHit, int(self.statHit * 100 / self.statCount)))
224
            log.debug("Miss : %10d [%02d %%]" % (self.statMiss, int(self.statMiss * 100 / self.statCount)))
225
            log.debug("Swap : %10d [%02d %%]" % (self.statSwap, int(self.statSwap * 100 / self.statCount)))
226
            log.debug("CSwap: %10d [%02d %%]" % (self.statCleanSwap, int(self.statCleanSwap * 100 / self.statCount)))
227
            log.debug("******")
228
        # more stats
229
        self.statCount = 0
230
        self.statHit = 0
231
        self.statMiss = 0
232
        self.statSwap = 0
233
        self.statCleanSwap = 0
234
235
    def commit(self):
236
        pass
237
        #log.debug("COMMIT?")
238
        #self.txn.commit()
239
        #self.txn = dbEnv.txn_begin()
240
241
    def checkpointDatabase(self):
242
        # checkpoint db (TODO can be moved into the separate process?)
243
        log.debug("Metakit Checkpoint")
244
245
    def shutdown(self):
246
        log.message('DB Shutting down', self.dbName)
247
        self.checkpoint()
248
        del self.connection
249
250
    def clear(self):
251
        log.message("Deleting database", self.dbName)
252
        self.cursor.execute("delete from data")
253
        self.connection.commit()
254
        self.cache.clear()
255
        self.cacheLinks = {
256
            "__first__": [None, "__last__"],
257
            "__last__": ["__first__", None],
258
        }
259
        log.debug("Database is empty")
260
261
    def create(self, object, id = None):
262
        #@log.debug("Creating new object", id)
263
        if not id:
264
            id = self.nextID
265
            while self.has_key(id):
266
                id += 1
267
            self.nextID = id + 1
268
            id = self.keyMethod(id)
269
            object.oid = id
270
        elif hasattr(object, "oid") and object.oid != id:
271
            id = self.keyMethod(id)
272
            log.message("Object OID '%s' != forced OID '%s' - FIXING" % (object.oid, id))
273
            object.oid = id
274
        else:
275
            id = self.keyMethod(id)
276
        #@log.debug("OID =", id)
277
        if self.has_key(id):
278
            raise ige.ServerException("'%s' created twice" % id)
279
        self.cache[id] = object
280
        self._addNewCacheItem(id)
281
        self.put(id, pickle.dumps(object, pickle.HIGHEST_PROTOCOL))
282
        return id
283
284
    def delete(self, key):
285
        del self[key]
286
287
    def get(self, key, default = None):
288
        if self.has_key(key):
289
            return self[key]
290
        else:
291
            return default
292
293
    def put(self, key, data):
294
        self.cursor.execute("select oid from data where oid = ?", (key,))
295
        row = self.cursor.fetchone()
296
        if row:
297
            self.cursor.execute("update data set data = ? where oid = ?", (sqlite3.Binary(data), key))
298
        else:
299
            self.cursor.execute("insert into data (oid, data) values (?, ?)", (key, sqlite3.Binary(data)))
300
        #per put commits impacts performance significantly
301
        #self.connection.commit()
302
303
    def restore(self, filename):
304
        log.message("Restoring database from file", filename)
305
        fh = file(filename, "r")
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable file does not seem to be defined.
Loading history...
306
        line = fh.readline().strip()
307
        if line != "IGE OUTER SPACE BACKUP VERSION 1":
308
            raise ige.ServerException("Incorrect header: %s" % line)
309
        while True:
310
            key = fh.readline().strip()
311
            if key == "END OF BACKUP":
312
                break
313
            data = fh.readline().strip()
314
            key = int(binascii.a2b_hex(key))
315
            data = binascii.a2b_hex(data)
316
            #@log.debug("Storing key", key)
317
            self.put(key, data)
318
        log.message("Database restored")
319
320
    def backup(self, basename):
321
        self.checkpoint()
322
        filename = "%s-%s.osbackup" % (basename, self.dbName)
323
        log.message("Creating backup", filename)
324
        fh = file(filename, "w") #bz2.BZ2File(filename, "w")
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable file does not seem to be defined.
Loading history...
325
        fh.write("IGE OUTER SPACE BACKUP VERSION 1\n")
326
        for key in self.keys():
327
            fh.write(binascii.b2a_hex(str(key)))
328
            fh.write("\n")
329
            fh.write(binascii.b2a_hex(pickle.dumps(self[key], pickle.HIGHEST_PROTOCOL)))
330
            fh.write("\n")
331
        fh.write("END OF BACKUP\n")
332
        fh.close()
333
        log.message("Backup completed")
334
335
class DatabaseString(Database):
336
337
    dbSchema = "data(oid text primary key asc, data blog not null)"
338
    keyMethod = str
339
340
    def restore(self, filename, include = None):
341
        log.message("Restoring database from file", filename)
342
        fh = file(filename, "r")
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable file does not seem to be defined.
Loading history...
343
        line = fh.readline().strip()
344
        if line != "IGE OUTER SPACE BACKUP VERSION 1":
345
            raise ige.ServerException("Incorrect header: %s" % line)
346
        imported = 0
347
        skipped = 0
348
        while True:
349
            key = fh.readline().strip()
350
            if key == "END OF BACKUP":
351
                break
352
            data = fh.readline().strip()
353
            key = binascii.a2b_hex(key)
354
            if include and not include(key):
355
                skipped += 1
356
                continue
357
            imported += 1
358
            data = binascii.a2b_hex(data)
359
            #@log.debug("Storing key", key)
360
            self.put(key, data)
361
        log.message("Database restored (%d imported, %d skipped)" % (imported, skipped))
362
363