1 | # |
||
2 | # Copyright 2001 - 2016 Ludek Smid [http://www.ospace.net/] |
||
3 | # |
||
4 | # This file is part of Outer Space. |
||
5 | # |
||
6 | # Outer Space is free software; you can redistribute it and/or modify |
||
7 | # it under the terms of the GNU General Public License as published by |
||
8 | # the Free Software Foundation; either version 2 of the License, or |
||
9 | # (at your option) any later version. |
||
10 | # |
||
11 | # Outer Space is distributed in the hope that it will be useful, |
||
12 | # but WITHOUT ANY WARRANTY; without even the implied warranty of |
||
13 | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
||
14 | # GNU General Public License for more details. |
||
15 | # |
||
16 | # You should have received a copy of the GNU General Public License |
||
17 | # along with Outer Space; if not, write to the Free Software |
||
18 | # Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA |
||
19 | # |
||
20 | |||
21 | import ige |
||
22 | import os.path, log, os, sys, time, types, binascii, bz2 |
||
23 | import cPickle as pickle |
||
24 | import sqlite3 |
||
25 | |||
26 | IDX_PREV = 0 |
||
27 | IDX_NEXT = 1 |
||
28 | |||
29 | class Database: |
||
30 | |||
31 | dbSchema = "data(oid integer primary key asc, data blog not null)" |
||
32 | keyMethod = int |
||
33 | |||
34 | def __init__(self, directory, dbName, cache = 128): |
||
35 | log.message("Opening database", dbName) |
||
36 | self.dbName = dbName |
||
37 | self.nextID = 10000 |
||
38 | self.cacheSize = cache |
||
39 | # |
||
40 | try: |
||
41 | os.makedirs(directory) |
||
42 | except OSError: |
||
43 | pass |
||
44 | # open db |
||
45 | self.connection = sqlite3.connect(os.path.join(directory, "%s.sqlite" % dbName)) |
||
46 | # allow 8-bits strings to be handled correctly (default is unicode) |
||
47 | self.connection.text_factory = str |
||
48 | self.cursor = self.connection.cursor() |
||
49 | self.cursor.execute("create table if not exists %s" % self.dbSchema) |
||
50 | self.connection.commit() |
||
51 | # cache |
||
52 | self.cache = {} |
||
53 | self.cacheLinks = { |
||
54 | "__first__": [None, "__last__"], |
||
55 | "__last__": ["__first__", None], |
||
56 | } |
||
57 | # stats |
||
58 | self.statCount = 0 |
||
59 | self.statHit = 0 |
||
60 | self.statSwap = 0 |
||
61 | self.statMiss = 0 |
||
62 | self.statCleanSwap = 0 |
||
63 | |||
64 | def _moveExistingCacheItem(self, key): |
||
65 | #@log.debug("Cache MOVE ITEM", key, self.cacheLinks[key]) |
||
66 | # optimalization |
||
67 | cl = self.cacheLinks |
||
68 | # move item to the end of the chain |
||
69 | prev, next = cl[key] |
||
70 | cl[prev][IDX_NEXT] = next |
||
71 | cl[next][IDX_PREV] = prev |
||
72 | last = cl["__last__"] |
||
73 | prev = last[IDX_PREV] |
||
0 ignored issues
–
show
Comprehensibility
Best Practice
introduced
by
![]() |
|||
74 | cl[key] = [prev, "__last__"] |
||
75 | cl[prev][IDX_NEXT] = key |
||
76 | last[IDX_PREV] = key |
||
77 | # print chain |
||
78 | #idx = "__first__" |
||
79 | #result = [] |
||
80 | #while idx != None: |
||
81 | # item = self.cacheLinks[idx] |
||
82 | # result.append("%s:%s" % (idx, item)) |
||
83 | # idx = item[IDX_NEXT] |
||
84 | #log.debug("Cache CHAIN", " ".join(result)) |
||
85 | |||
86 | def _addNewCacheItem(self, key): |
||
87 | #@log.debug("Cache ADD ITEM", key) |
||
88 | # check cache size |
||
89 | #@log.debug("Cache size", len(self.cache), self.cacheSize) |
||
90 | if len(self.cache) > self.cacheSize: |
||
91 | tries = len(self.cache) |
||
92 | while len(self.cache) > self.cacheSize: |
||
93 | # swap out oldest item |
||
94 | current = self.cacheLinks["__first__"] |
||
95 | oldKey = current[IDX_NEXT] |
||
0 ignored issues
–
show
Comprehensibility
Best Practice
introduced
by
|
|||
96 | tries -= 1 |
||
97 | if tries == 0: |
||
98 | # no room in the cache -> enlarge it |
||
99 | log.debug("NO ROOM IN THE CACHE", self.dbName, len(self.cache)) |
||
100 | self.cacheSize += 100 |
||
101 | break |
||
102 | if sys.getrefcount(self.cache[oldKey]) > 2: |
||
103 | #@log.debug("CANNOT swap out", oldKey, "refs", sys.getrefcount(self.cache[oldKey]) - 2) |
||
104 | # try next element |
||
105 | self._moveExistingCacheItem(oldKey) |
||
106 | #@log.debug("Trying to swap out", current[IDX_NEXT]) |
||
107 | continue |
||
108 | else: |
||
109 | #@log.debug("Swapping out", oldKey) |
||
110 | self.statSwap += 1 |
||
111 | #TODOif not self.cache[oldKey]._v_modified: |
||
112 | #TODO self.statCleanSwap += 1 |
||
113 | prev, next = self.cacheLinks[oldKey] |
||
114 | current[IDX_NEXT] = next |
||
115 | self.cacheLinks[next][IDX_PREV] = prev |
||
116 | self.put(oldKey, pickle.dumps(self.cache[oldKey], pickle.HIGHEST_PROTOCOL)) |
||
117 | del self.cacheLinks[oldKey] |
||
118 | del self.cache[oldKey] |
||
119 | # found space |
||
120 | break |
||
121 | # put item at the end of the chain |
||
122 | last = self.cacheLinks["__last__"] |
||
123 | prev = last[IDX_PREV] |
||
0 ignored issues
–
show
Comprehensibility
Best Practice
introduced
by
|
|||
124 | self.cacheLinks[prev][IDX_NEXT] = key |
||
125 | self.cacheLinks[key] = [prev, "__last__"] |
||
126 | last[IDX_PREV] = key |
||
127 | # print chain |
||
128 | #idx = "__first__" |
||
129 | #result = [] |
||
130 | #while idx != None: |
||
131 | # item = self.cacheLinks[idx] |
||
132 | # result.append("%s:%s" % (idx, item)) |
||
133 | # idx = item[IDX_NEXT] |
||
134 | #log.debug("Cache CHAIN", " ".join(result)) |
||
135 | |||
136 | def _updateCacheItem(self, key): |
||
137 | if key in self.cache: |
||
138 | return self._moveExistingCacheItem(key) |
||
139 | return self._addNewCacheItem(key) |
||
140 | |||
141 | def _delCacheItem(self, key): |
||
142 | #@log.debug("Cache DEL ITEM", key) |
||
143 | prev, next = self.cacheLinks[key] |
||
144 | self.cacheLinks[prev][IDX_NEXT] = next |
||
145 | self.cacheLinks[next][IDX_PREV] = prev |
||
146 | del self.cacheLinks[key] |
||
147 | |||
148 | def __getitem__(self, key): |
||
149 | key = self.keyMethod(key) |
||
150 | self.statCount += 1 |
||
151 | if key in self.cache: |
||
152 | self.statHit += 1 |
||
153 | self._moveExistingCacheItem(key) |
||
154 | return self.cache[key] |
||
155 | self.statMiss += 1 |
||
156 | self.cursor.execute("select * from data where oid = ?", (key,)) |
||
157 | row = self.cursor.fetchone() |
||
158 | if row is None: |
||
159 | raise ige.NoSuchObjectException(key) |
||
160 | item = pickle.loads(str(row[1])) |
||
161 | self._addNewCacheItem(key) |
||
162 | self.cache[key] = item |
||
163 | #TODOitem.setModified(0) |
||
164 | return item |
||
165 | |||
166 | def __setitem__(self, key, value): |
||
167 | key = self.keyMethod(key) |
||
168 | if type(value) == types.InstanceType: |
||
169 | value.oid = key |
||
170 | # set value |
||
171 | self._updateCacheItem(key) |
||
172 | self.cache[key] = value |
||
173 | #value.setModified(0) |
||
174 | # write through new objects |
||
175 | if not self.has_key(key): |
||
176 | raise ige.ServerException("'%s' created using set method" % key) |
||
177 | |||
178 | def __contains__(self, key): |
||
179 | return self.has_key(key) |
||
180 | |||
181 | def __delitem__(self, key): |
||
182 | key = self.keyMethod(key) |
||
183 | if key in self.cache: |
||
184 | self._delCacheItem(key) |
||
185 | del self.cache[key] |
||
186 | self.cursor.execute("delete from data where oid = ?", (key,)) |
||
187 | |||
188 | def has_key(self, key): |
||
189 | key = self.keyMethod(key) |
||
190 | self.cursor.execute("select oid from data where oid = ?", (key,)) |
||
191 | return self.cursor.fetchone() is not None |
||
192 | |||
193 | |||
194 | def keys(self): |
||
195 | self.cursor.execute("select oid from data") |
||
196 | return [row[0] for row in self.cursor] |
||
197 | |||
198 | def getItemLength(self, key): |
||
199 | key = self.keyMethod(key) |
||
200 | self.cursor.execute("select * from data where oid = ?", (key,)) |
||
201 | row = self.cursor.fetchone() |
||
202 | if row is None: |
||
203 | raise ige.NoSuchObjectException(key) |
||
204 | return len(str(row[1])) |
||
205 | |||
206 | def checkpoint(self): |
||
207 | log.debug('DB Checkpoint', self.dbName) |
||
208 | log.debug("Storing all objects") |
||
209 | for key, value in self.cache.iteritems(): |
||
210 | self.put(key, pickle.dumps(value, pickle.HIGHEST_PROTOCOL)) |
||
211 | # commit transaction |
||
212 | log.debug("Commiting transaction") |
||
213 | self.connection.commit() |
||
214 | log.debug("Commit completed") |
||
215 | # TODO clear cache? |
||
216 | # self.cache.clear() |
||
217 | # self.cacheLinks.clear() |
||
218 | # stats TODO: reenable |
||
219 | self.cursor.execute("select * from data") |
||
220 | items = 0 |
||
221 | for i in self.cursor: |
||
222 | items += 1 |
||
223 | if self.statCount > 0: |
||
224 | log.debug("****** %s" % self.dbName) |
||
225 | log.debug("Items: %10d" % items) |
||
226 | log.debug("Count: %10d" % self.statCount) |
||
227 | log.debug("Hit : %10d [%02d %%]" % (self.statHit, int(self.statHit * 100 / self.statCount))) |
||
228 | log.debug("Miss : %10d [%02d %%]" % (self.statMiss, int(self.statMiss * 100 / self.statCount))) |
||
229 | log.debug("Swap : %10d [%02d %%]" % (self.statSwap, int(self.statSwap * 100 / self.statCount))) |
||
230 | log.debug("CSwap: %10d [%02d %%]" % (self.statCleanSwap, int(self.statCleanSwap * 100 / self.statCount))) |
||
231 | log.debug("******") |
||
232 | # more stats |
||
233 | self.statCount = 0 |
||
234 | self.statHit = 0 |
||
235 | self.statMiss = 0 |
||
236 | self.statSwap = 0 |
||
237 | self.statCleanSwap = 0 |
||
238 | |||
239 | def commit(self): |
||
240 | pass |
||
241 | #log.debug("COMMIT?") |
||
242 | #self.txn.commit() |
||
243 | #self.txn = dbEnv.txn_begin() |
||
244 | |||
245 | def checkpointDatabase(self): |
||
246 | # checkpoint db (TODO can be moved into the separate process?) |
||
247 | log.debug("Metakit Checkpoint") |
||
248 | |||
249 | def shutdown(self): |
||
250 | log.message('DB Shutting down', self.dbName) |
||
251 | self.checkpoint() |
||
252 | del self.connection |
||
253 | |||
254 | def clear(self): |
||
255 | log.message("Deleting database", self.dbName) |
||
256 | self.cursor.execute("delete from data") |
||
257 | self.connection.commit() |
||
258 | self.cache.clear() |
||
259 | self.cacheLinks = { |
||
260 | "__first__": [None, "__last__"], |
||
261 | "__last__": ["__first__", None], |
||
262 | } |
||
263 | log.debug("Database is empty") |
||
264 | |||
265 | def create(self, object, id = None): |
||
266 | #@log.debug("Creating new object", id) |
||
267 | if not id: |
||
268 | id = self.nextID |
||
269 | while self.has_key(id): |
||
270 | id += 1 |
||
271 | self.nextID = id + 1 |
||
272 | id = self.keyMethod(id) |
||
273 | object.oid = id |
||
274 | elif hasattr(object, "oid") and object.oid != id: |
||
275 | id = self.keyMethod(id) |
||
276 | log.message("Object OID '%s' != forced OID '%s' - FIXING" % (object.oid, id)) |
||
277 | object.oid = id |
||
278 | else: |
||
279 | id = self.keyMethod(id) |
||
280 | #@log.debug("OID =", id) |
||
281 | if self.has_key(id): |
||
282 | raise ige.ServerException("'%s' created twice" % id) |
||
283 | self.cache[id] = object |
||
284 | self._addNewCacheItem(id) |
||
285 | self.put(id, pickle.dumps(object, pickle.HIGHEST_PROTOCOL)) |
||
286 | return id |
||
287 | |||
288 | def delete(self, key): |
||
289 | del self[key] |
||
290 | |||
291 | def get(self, key, default = None): |
||
292 | if self.has_key(key): |
||
293 | return self[key] |
||
294 | else: |
||
295 | return default |
||
296 | |||
297 | def put(self, key, data): |
||
298 | self.cursor.execute("select oid from data where oid = ?", (key,)) |
||
299 | row = self.cursor.fetchone() |
||
300 | if row: |
||
301 | self.cursor.execute("update data set data = ? where oid = ?", (sqlite3.Binary(data), key)) |
||
302 | else: |
||
303 | self.cursor.execute("insert into data (oid, data) values (?, ?)", (key, sqlite3.Binary(data))) |
||
304 | #per put commits impacts performance significantly |
||
305 | #self.connection.commit() |
||
306 | |||
307 | def restore(self, filename): |
||
308 | log.message("Restoring database from file", filename) |
||
309 | fh = file(filename, "r") |
||
0 ignored issues
–
show
Comprehensibility
Best Practice
introduced
by
|
|||
310 | line = fh.readline().strip() |
||
311 | if line != "IGE OUTER SPACE BACKUP VERSION 1": |
||
312 | raise ige.ServerException("Incorrect header: %s" % line) |
||
313 | while True: |
||
314 | key = fh.readline().strip() |
||
315 | if key == "END OF BACKUP": |
||
316 | break |
||
317 | data = fh.readline().strip() |
||
318 | key = int(binascii.a2b_hex(key)) |
||
319 | data = binascii.a2b_hex(data) |
||
320 | #@log.debug("Storing key", key) |
||
321 | self.put(key, data) |
||
322 | log.message("Database restored") |
||
323 | |||
324 | def backup(self, basename): |
||
325 | self.checkpoint() |
||
326 | filename = "%s-%s.osbackup" % (basename, self.dbName) |
||
327 | log.message("Creating backup", filename) |
||
328 | fh = file(filename, "w") #bz2.BZ2File(filename, "w") |
||
0 ignored issues
–
show
Comprehensibility
Best Practice
introduced
by
|
|||
329 | fh.write("IGE OUTER SPACE BACKUP VERSION 1\n") |
||
330 | for key in self.keys(): |
||
331 | fh.write(binascii.b2a_hex(str(key))) |
||
332 | fh.write("\n") |
||
333 | fh.write(binascii.b2a_hex(pickle.dumps(self[key], pickle.HIGHEST_PROTOCOL))) |
||
334 | fh.write("\n") |
||
335 | fh.write("END OF BACKUP\n") |
||
336 | fh.close() |
||
337 | log.message("Backup completed") |
||
338 | |||
339 | class DatabaseString(Database): |
||
340 | |||
341 | dbSchema = "data(oid text primary key asc, data blog not null)" |
||
342 | keyMethod = str |
||
343 | |||
344 | def restore(self, filename, include = None): |
||
345 | log.message("Restoring database from file", filename) |
||
346 | fh = file(filename, "r") |
||
0 ignored issues
–
show
Comprehensibility
Best Practice
introduced
by
|
|||
347 | line = fh.readline().strip() |
||
348 | if line != "IGE OUTER SPACE BACKUP VERSION 1": |
||
349 | raise ige.ServerException("Incorrect header: %s" % line) |
||
350 | imported = 0 |
||
351 | skipped = 0 |
||
352 | while True: |
||
353 | key = fh.readline().strip() |
||
354 | if key == "END OF BACKUP": |
||
355 | break |
||
356 | data = fh.readline().strip() |
||
357 | key = binascii.a2b_hex(key) |
||
358 | if include and not include(key): |
||
359 | skipped += 1 |
||
360 | continue |
||
361 | imported += 1 |
||
362 | data = binascii.a2b_hex(data) |
||
363 | #@log.debug("Storing key", key) |
||
364 | self.put(key, data) |
||
365 | log.message("Database restored (%d imported, %d skipped)" % (imported, skipped)) |
||
366 | |||
367 |