SQLite_DB.reparent_orphaned_table_leaf_pages()   F
last analyzed

Complexity

Conditions 15

Size

Total Lines 57
Code Lines 47

Duplication

Lines 57
Ratio 100 %

Importance

Changes 0
Metric Value
cc 15
eloc 47
nop 1
dl 57
loc 57
rs 2.9998
c 0
b 0
f 0

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like src.db.SQLite_DB.reparent_orphaned_table_leaf_pages() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# MIT License
2
#
3
# Copyright (c) 2017 Matt Boyer
4
#
5
# Permission is hereby granted, free of charge, to any person obtaining a copy
6
# of this software and associated documentation files (the "Software"), to deal
7
# in the Software without restriction, including without limitation the rights
8
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9
# copies of the Software, and to permit persons to whom the Software is
10
# furnished to do so, subject to the following conditions:
11
#
12
# The above copyright notice and this permission notice shall be included in
13
# all copies or substantial portions of the Software.
14
#
15
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21
# SOFTWARE.
22
23
import os
24
import re
25
import stat
26
import struct
27
28
from . import constants
29
from . import _LOGGER
30
from .record import Record
31
from .pages import (
32
    Page, OverflowPage, FreelistLeafPage, FreelistTrunkPage, BTreePage,
33
    PtrmapPage
34
)
35
from .table import Table
36
from .tuples import (
37
    SQLite_header, SQLite_ptrmap_info, SQLite_master_record, type_specs
38
)
39
40
41
signatures = {}
42
43
44 View Code Duplication
class SQLite_DB(object):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
45
    def __init__(self, path, heuristics_registry):
46
        self._path = path
47
        self._page_types = {}
48
        self._header = self.parse_header()
49
        self._registry = heuristics_registry
50
51
        self._page_cache = None
52
        # Actual page objects go here
53
        self._pages = {}
54
        self.build_page_cache()
55
56
        self._ptrmap = {}
57
58
        # TODO Do we need all of these?
59
        self._table_roots = {}
60
        self._page_tables = {}
61
        self._tables = {}
62
        self._table_columns = {}
63
        self._freelist_leaves = []
64
        self._freelist_btree_pages = []
65
66
    @property
67
    def ptrmap(self):
68
        return self._ptrmap
69
70
    @property
71
    def header(self):
72
        return self._header
73
74
    @property
75
    def pages(self):
76
        return self._pages
77
78
    @property
79
    def tables(self):
80
        return self._tables
81
82
    @property
83
    def freelist_leaves(self):
84
        return self._freelist_leaves
85
86
    @property
87
    def table_columns(self):
88
        return self._table_columns
89
90
    def page_bytes(self, page_idx):
91
        try:
92
            return self._page_cache[page_idx]
93
        except KeyError as ex:
94
            raise ValueError(f"No cache for page {page_idx}") from ex
95
96
    def map_table_page(self, page_idx, table):
97
        assert isinstance(page_idx, int)
98
        assert isinstance(table, Table)
99
        self._page_tables[page_idx] = table
100
101
    def get_page_table(self, page_idx):
102
        assert isinstance(page_idx, int)
103
        try:
104
            return self._page_tables[page_idx]
105
        except KeyError:
106
            return None
107
108
    def __repr__(self):
109
        return '<SQLite DB, page count: {} | page size: {}>'.format(
110
            self.header.size_in_pages,
111
            self.header.page_size
112
        )
113
114
    def parse_header(self):
115
        header_bytes = None
116
        file_size = None
117
        with open(self._path, 'br') as sqlite:
118
            header_bytes = sqlite.read(100)
119
            file_size = os.fstat(sqlite.fileno())[stat.ST_SIZE]
120
121
        if not header_bytes:
122
            raise ValueError("Couldn't read SQLite header")
123
        assert isinstance(header_bytes, bytes)
124
        # This DB header is always big-endian
125
        fields = SQLite_header(*struct.unpack(
126
            r'>16sHBBBBBBIIIIIIIIIIII20xII',
127
            header_bytes[:100]
128
        ))
129
        assert fields.page_size in constants.VALID_PAGE_SIZES
130
        db_size = fields.page_size * fields.size_in_pages
131
        assert db_size <= file_size
132
        assert (fields.page_size > 0) and \
133
            (fields.file_change_counter == fields.version_valid)
134
135
        if file_size < 1073741824:
136
            _LOGGER.debug("No lock-byte page in this file!")
137
138
        if fields.first_freelist_trunk > 0:
139
            self._page_types[fields.first_freelist_trunk] = \
140
                constants.FREELIST_TRUNK_PAGE
141
        _LOGGER.debug(fields)
142
        return fields
143
144
    def build_page_cache(self):
145
        # The SQLite docs use a numbering convention for pages where the
146
        # first page (the one that has the header) is page 1, with the next
147
        # ptrmap page being page 2, etc.
148
        page_cache = [None, ]
149
        with open(self._path, 'br') as sqlite:
150
            for page_idx in range(self._header.size_in_pages):
151
                page_offset = page_idx * self._header.page_size
152
                sqlite.seek(page_offset, os.SEEK_SET)
153
                page_cache.append(sqlite.read(self._header.page_size))
154
        self._page_cache = page_cache
155
        for page_idx in range(1, len(self._page_cache)):
156
            # We want these to be temporary objects, to be replaced with
157
            # more specialised objects as parsing progresses
158
            self._pages[page_idx] = Page(page_idx, self)
159
160
    def populate_freelist_pages(self):
161
        if 0 == self._header.first_freelist_trunk:
162
            _LOGGER.debug("This database has no freelist trunk page")
163
            return
164
165
        _LOGGER.info("Parsing freelist pages")
166
        parsed_trunks = 0
167
        parsed_leaves = 0
168
        freelist_trunk_idx = self._header.first_freelist_trunk
169
170
        while freelist_trunk_idx != 0:
171
            _LOGGER.debug(
172
                "Parsing freelist trunk page %d",
173
                freelist_trunk_idx
174
            )
175
176
            # Set _page_types value for this page
177
            self._page_types[freelist_trunk_idx] = \
178
                constants.FREELIST_TRUNK_PAGE
179
180
            trunk_bytes = bytes(self.pages[freelist_trunk_idx])
181
182
            next_freelist_trunk_page_idx, num_leaf_pages = struct.unpack(
183
                r'>II',
184
                trunk_bytes[:8]
185
            )
186
187
            # Now that we know how long the array of freelist page pointers is,
188
            # let's read it again
189
            trunk_array = struct.unpack(
190
                r'>{count}I'.format(count=2+num_leaf_pages),
191
                trunk_bytes[:(4*(2+num_leaf_pages))]
192
            )
193
194
            # We're skipping the first entries as they are realy the next trunk
195
            # index and the leaf count
196
            # TODO Fix that
197
            leaves_in_trunk = []
198
            for page_idx in trunk_array[2:]:
199
                # Let's prepare a specialised object for this freelist leaf
200
                # page
201
                leaf_page = FreelistLeafPage(
202
                    page_idx, self, freelist_trunk_idx
203
                )
204
                leaves_in_trunk.append(leaf_page)
205
                self._freelist_leaves.append(page_idx)
206
                self._pages[page_idx] = leaf_page
207
208
                self._page_types[page_idx] = constants.FREELIST_LEAF_PAGE
209
210
            trunk_page = FreelistTrunkPage(
211
                freelist_trunk_idx,
212
                self,
213
                leaves_in_trunk
214
            )
215
            self._pages[freelist_trunk_idx] = trunk_page
216
            # We've parsed this trunk page
217
            parsed_trunks += 1
218
            # ...And every leaf in it
219
            parsed_leaves += num_leaf_pages
220
221
            freelist_trunk_idx = next_freelist_trunk_page_idx
222
223
        assert (parsed_trunks + parsed_leaves) == self._header.freelist_pages
224
        _LOGGER.info(
225
            "Freelist summary: %d trunk pages, %d leaf pages",
226
            parsed_trunks,
227
            parsed_leaves
228
        )
229
230
    def populate_overflow_pages(self):
231
        # Knowledge of the overflow pages can come from the pointer map (easy),
232
        # or the parsing of individual cells in table leaf pages (hard)
233
        #
234
        # For now, assume we already have a page type dict populated from the
235
        # ptrmap
236
        _LOGGER.info("Parsing overflow pages")
237
        overflow_count = 0
238
        for page_idx in sorted(self._page_types):
239
            page_type = self._page_types[page_idx]
240
            if page_type not in constants.OVERFLOW_PAGE_TYPES:
241
                continue
242
            overflow_page = OverflowPage(page_idx, self)
243
            self.pages[page_idx] = overflow_page
244
            overflow_count += 1
245
246
        _LOGGER.info("Overflow summary: %d pages", overflow_count)
247
248
    def populate_ptrmap_pages(self):
249
        if self._header.largest_btree_page == 0:
250
            # We don't have ptrmap pages in this DB. That sucks.
251
            _LOGGER.warning("%r does not have ptrmap pages!", self)
252
            for page_idx in range(1, self._header.size_in_pages):
253
                self._page_types[page_idx] = constants.UNKNOWN_PAGE
254
            return
255
256
        _LOGGER.info("Parsing ptrmap pages")
257
258
        ptrmap_page_idx = 2
259
        usable_size = self._header.page_size - self._header.reserved_length
260
        num_ptrmap_entries_in_page = usable_size // 5
261
        ptrmap_page_indices = []
262
263
        ptrmap_page_idx = 2
264
        while ptrmap_page_idx <= self._header.size_in_pages:
265
            page_bytes = self._page_cache[ptrmap_page_idx]
266
            ptrmap_page_indices.append(ptrmap_page_idx)
267
            self._page_types[ptrmap_page_idx] = constants.PTRMAP_PAGE
268
            page_ptrmap_entries = {}
269
270
            ptrmap_bytes = page_bytes[:5 * num_ptrmap_entries_in_page]
271
            for entry_idx in range(num_ptrmap_entries_in_page):
272
                ptr_page_idx = ptrmap_page_idx + entry_idx + 1
273
                page_type, page_ptr = struct.unpack(
274
                    r'>BI',
275
                    ptrmap_bytes[5*entry_idx:5*(entry_idx+1)]
276
                )
277
                if page_type == 0:
278
                    break
279
280
                ptrmap_entry = SQLite_ptrmap_info(
281
                    ptr_page_idx, page_type, page_ptr
282
                )
283
                assert ptrmap_entry.page_type in constants.PTRMAP_PAGE_TYPES
284
                if page_type == constants.BTREE_ROOT_PAGE:
285
                    assert page_ptr == 0
286
                    self._page_types[ptr_page_idx] = page_type
287
288
                elif page_type == constants.FREELIST_PAGE:
289
                    # Freelist pages are assumed to be known already
290
                    assert self._page_types[ptr_page_idx] in \
291
                        constants.FREELIST_PAGE_TYPES
292
                    assert page_ptr == 0
293
294
                elif page_type == constants.FIRST_OFLOW_PAGE:
295
                    assert page_ptr != 0
296
                    self._page_types[ptr_page_idx] = page_type
297
298
                elif page_type == constants.NON_FIRST_OFLOW_PAGE:
299
                    assert page_ptr != 0
300
                    self._page_types[ptr_page_idx] = page_type
301
302
                elif page_type == constants.BTREE_NONROOT_PAGE:
303
                    assert page_ptr != 0
304
                    self._page_types[ptr_page_idx] = page_type
305
306
                # _LOGGER.debug("%r", ptrmap_entry)
307
                self._ptrmap[ptr_page_idx] = ptrmap_entry
308
                page_ptrmap_entries[ptr_page_idx] = ptrmap_entry
309
310
            page = PtrmapPage(ptrmap_page_idx, self, page_ptrmap_entries)
311
            self._pages[ptrmap_page_idx] = page
312
            _LOGGER.debug("%r", page)
313
            ptrmap_page_idx += num_ptrmap_entries_in_page + 1
314
315
        _LOGGER.info(
316
            "Ptrmap summary: %d pages, %r",
317
            len(ptrmap_page_indices), ptrmap_page_indices
318
        )
319
320
    def populate_btree_pages(self):
321
        # TODO Should this use table information instead of scanning all pages?
322
        page_idx = 1
323
        while page_idx <= self._header.size_in_pages:
324
            try:
325
                if self._page_types[page_idx] in \
326
                        constants.NON_BTREE_PAGE_TYPES:
327
                    page_idx += 1
328
                    continue
329
            except KeyError:
330
                pass
331
332
            try:
333
                # We need to pass in the singleton registry instance
334
                page_obj = BTreePage(page_idx, self, self._registry)
335
            except ValueError:
336
                # This page isn't a valid btree page. This can happen if we
337
                # don't have a ptrmap to guide us
338
                _LOGGER.warning(
339
                    "Page %d (%s) is not a btree page",
340
                    page_idx,
341
                    self._page_types[page_idx]
342
                )
343
                page_idx += 1
344
                continue
345
346
            page_obj.parse_cells()
347
            self._page_types[page_idx] = page_obj.page_type
348
            self._pages[page_idx] = page_obj
349
            page_idx += 1
350
351
    def _parse_master_leaf_page(self, page):
352
        for cell_idx in page.cells:
353
            _, master_record = page.cells[cell_idx]
354
            assert isinstance(master_record, Record)
355
            fields = [
356
                master_record.fields[idx].value for idx in master_record.fields
357
            ]
358
            master_record = SQLite_master_record(*fields)
359
            if 'table' != master_record.type:
360
                continue
361
362
            self._table_roots[master_record.name] = \
363
                self.pages[master_record.rootpage]
364
365
            # This record describes a table in the schema, which means it
366
            # includes a SQL statement that defines the table's columns
367
            # We need to parse the field names out of that statement
368
            assert master_record.sql.startswith('CREATE TABLE')
369
            columns_re = re.compile(r'^CREATE TABLE (\S+) \((.*)\)$')
370
            match = columns_re.match(master_record.sql)
371
            if match:
372
                assert match.group(1) == master_record.name
373
                column_list = match.group(2)
374
                csl_between_parens_re = re.compile(r'\([^)]+\)')
375
                expunged = csl_between_parens_re.sub('', column_list)
376
377
                cols = [
378
                    statement.strip() for statement in expunged.split(',')
379
                ]
380
                cols = [
381
                    statement for statement in cols if not (
382
                        statement.startswith('PRIMARY') or
383
                        statement.startswith('UNIQUE')
384
                    )
385
                ]
386
                columns = [col.split()[0] for col in cols]
387
                signature = []
388
389
                # Some column definitions lack a type
390
                for col_def in cols:
391
                    def_tokens = col_def.split()
392
                    try:
393
                        col_type = def_tokens[1]
394
                    except IndexError:
395
                        signature.append(object)
396
                        continue
397
398
                    _LOGGER.debug(
399
                        "Column \"%s\" is defined as \"%s\"",
400
                        def_tokens[0], col_type
401
                    )
402
                    try:
403
                        signature.append(type_specs[col_type])
404
                    except KeyError:
405
                        _LOGGER.warning("No native type for \"%s\"", col_def)
406
                        signature.append(object)
407
                _LOGGER.info(
408
                    "Signature for table \"%s\": %r",
409
                    master_record.name, signature
410
                )
411
                signatures[master_record.name] = signature
412
413
                _LOGGER.info(
414
                    "Columns for table \"%s\": %r",
415
                    master_record.name, columns
416
                )
417
                self._table_columns[master_record.name] = columns
418
419
    def map_tables(self):
420
        first_page = self.pages[1]
421
        assert isinstance(first_page, BTreePage)
422
423
        master_table = Table('sqlite_master', self, first_page, signatures)
424
        self._table_columns.update(constants.SQLITE_TABLE_COLUMNS)
425
426
        for master_leaf in master_table.leaves:
427
            self._parse_master_leaf_page(master_leaf)
428
429
        assert all(
430
            isinstance(root, BTreePage) for root in self._table_roots.values()
431
        )
432
        assert all(
433
            root.parent is None for root in self._table_roots.values()
434
        )
435
436
        self.map_table_page(1, master_table)
437
        self._table_roots['sqlite_master'] = self.pages[1]
438
439
        for table_name, rootpage in self._table_roots.items():
440
            try:
441
                table_obj = Table(table_name, self, rootpage, signatures)
442
            except Exception as ex:  # pylint:disable=W0703
443
                # pdb.set_trace()
444
                _LOGGER.warning(
445
                    "Caught %r while instantiating table object for \"%s\"",
446
                    ex, table_name
447
                )
448
            else:
449
                self._tables[table_name] = table_obj
450
451
    def reparent_orphaned_table_leaf_pages(self):
452
        reparented_pages = []
453
        for page in self.pages.values():
454
            if not isinstance(page, BTreePage):
455
                continue
456
            if page.page_type != "Table Leaf":
457
                continue
458
459
            table = page.table
460
            if not table:
461
                parent = page
462
                root_table = None
463
                while parent:
464
                    root_table = parent.table
465
                    parent = parent.parent
466
                if root_table is None:
467
                    self._freelist_btree_pages.append(page)
468
469
                if root_table is None:
470
                    if not page.cells:
471
                        continue
472
473
                    first_record = page.cells[0][1]
474
                    matches = []
475
                    for table_name in signatures:
476
                        # All records within a given page are for the same
477
                        # table
478
                        if self.tables[table_name].check_signature(
479
                                first_record):
480
                            matches.append(self.tables[table_name])
481
                    if not matches:
482
                        _LOGGER.error(
483
                            "Couldn't find a matching table for %r",
484
                            page
485
                        )
486
                        continue
487
                    if len(matches) > 1:
488
                        _LOGGER.error(
489
                            "Multiple matching tables for %r: %r",
490
                            page, matches
491
                        )
492
                        continue
493
                    elif len(matches) == 1:
494
                        root_table = matches[0]
495
496
                _LOGGER.debug(
497
                    "Reparenting %r to table \"%s\"",
498
                    page, root_table.name
499
                )
500
                root_table.add_leaf(page)
501
                self.map_table_page(page.idx, root_table)
502
                reparented_pages.append(page)
503
504
        if reparented_pages:
505
            _LOGGER.info(
506
                "Reparented %d pages: %r",
507
                len(reparented_pages), [p.idx for p in reparented_pages]
508
            )
509
510
    def grep(self, needle):
511
        match_found = False
512
        page_idx = 1
513
        needle_re = re.compile(needle.encode('utf-8'))
514
        while (page_idx <= self.header.size_in_pages):
515
            page = self.pages[page_idx]
516
            page_offsets = []
517
            for match in needle_re.finditer(bytes(page)):
518
                needle_offset = match.start()
519
                page_offsets.append(needle_offset)
520
            if page_offsets:
521
                _LOGGER.info(
522
                    "Found search term in page %r @ offset(s) %s",
523
                    page, ', '.join(str(offset) for offset in page_offsets)
524
                )
525
            page_idx += 1
526
        if not match_found:
527
            _LOGGER.warning(
528
                "Search term not found",
529
            )
530