Issues (17)

src/sqlite_recover.py (5 issues)

1
# MIT License
2
#
3
# Copyright (c) 2017 Matt Boyer
4
#
5
# Permission is hereby granted, free of charge, to any person obtaining a copy
6
# of this software and associated documentation files (the "Software"), to deal
7
# in the Software without restriction, including without limitation the rights
8
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9
# copies of the Software, and to permit persons to whom the Software is
10
# furnished to do so, subject to the following conditions:
11
#
12
# The above copyright notice and this permission notice shall be included in
13
# all copies or substantial portions of the Software.
14
#
15
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21
# SOFTWARE.
22
23
import argparse
24
import logging
25
import os.path
26
import shutil
27
import sqlite3
28
29
from . import PROJECT_DESCRIPTION, PROJECT_NAME
30
from . import _LOGGER
31
from .db import SQLite_DB
32
from .heuristics import HeuristicsRegistry
33
from .pages import Page
34
35
36 View Code Duplication
def gen_output_dir(db_path):
0 ignored issues
show
This code seems to be duplicated in your project.
Loading history...
37
    db_abspath = os.path.abspath(db_path)
38
    db_dir, db_name = os.path.split(db_abspath)
39
40
    munged_name = db_name.replace('.', '_')
41
    out_dir = os.path.join(db_dir, munged_name)
42
    if not os.path.exists(out_dir):
43
        return out_dir
44
    suffix = 1
45
    while suffix <= 10:
46
        out_dir = os.path.join(db_dir, "{}_{}".format(munged_name, suffix))
47
        if not os.path.exists(out_dir):
48
            return out_dir
49
        suffix += 1
50
    raise SystemError(
51
        "Unreasonable number of output directories for {}".format(db_path)
52
    )
53
54
55 View Code Duplication
def _load_db(sqlite_path):
0 ignored issues
show
This code seems to be duplicated in your project.
Loading history...
56
    _LOGGER.info("Processing %s", sqlite_path)
57
    registry = HeuristicsRegistry()
58
    registry.load_heuristics()
59
60
    db = SQLite_DB(sqlite_path, registry)
61
    _LOGGER.info("Database: %r", db)
62
63
    db.populate_freelist_pages()
64
    db.populate_ptrmap_pages()
65
    db.populate_overflow_pages()
66
67
    # Should we aim to instantiate specialised b-tree objects here, or is the
68
    # use of generic btree page objects acceptable?
69
    db.populate_btree_pages()
70
71
    db.map_tables()
72
73
    # We need a first pass to process tables that are disconnected
74
    # from their table's root page
75
    db.reparent_orphaned_table_leaf_pages()
76
77
    # All pages should now be represented by specialised objects
78
    assert(all(isinstance(p, Page) for p in db.pages.values()))
79
    assert(not any(type(p) is Page for p in db.pages.values()))
80
    return db
81
82
83 View Code Duplication
def dump_to_csv(args):
0 ignored issues
show
This code seems to be duplicated in your project.
Loading history...
84
    out_dir = args.output_dir or gen_output_dir(args.sqlite_path)
85
    db = _load_db(args.sqlite_path)
86
87
    if os.path.exists(out_dir):
88
        raise ValueError("Output directory {} exists!".format(out_dir))
89
    os.mkdir(out_dir)
90
91
    for table_name in sorted(db.tables):
92
        table = db.tables[table_name]
93
        _LOGGER.info("Table \"%s\"", table)
94
        table.recover_records(args.database_name)
95
        table.csv_dump(out_dir)
96
97
98 View Code Duplication
def undelete(args):
0 ignored issues
show
This code seems to be duplicated in your project.
Loading history...
99
    db_abspath = os.path.abspath(args.sqlite_path)
100
    db = _load_db(db_abspath)
101
102
    output_path = os.path.abspath(args.output_path)
103
    if os.path.exists(output_path):
104
        raise ValueError("Output file {} exists!".format(output_path))
105
106
    shutil.copyfile(db_abspath, output_path)
107
    with sqlite3.connect(output_path) as output_db_connection:
108
        cursor = output_db_connection.cursor()
109
        for table_name in sorted(db.tables):
110
            table = db.tables[table_name]
111
            _LOGGER.info("Table \"%s\"", table)
112
            table.recover_records(args.database_name)
113
114
            failed_inserts = 0
115
            constraint_violations = 0
116
            successful_inserts = 0
117
            for leaf_page in table.leaves:
118
                if not leaf_page.recovered_records:
119
                    continue
120
121
                for record in leaf_page.recovered_records:
122
                    insert_statement, values = table.build_insert_SQL(record)
123
124
                    try:
125
                        cursor.execute(insert_statement, values)
126
                    except sqlite3.IntegrityError:
127
                        # We gotta soldier on, there's not much we can do if a
128
                        # constraint is violated by this insert
129
                        constraint_violations += 1
130
                    except (
131
                                sqlite3.ProgrammingError,
132
                                sqlite3.OperationalError,
133
                                sqlite3.InterfaceError
134
                            ) as insert_ex:
135
                        _LOGGER.warning(
136
                            (
137
                                "Caught %r while executing INSERT statement "
138
                                "in \"%s\""
139
                            ),
140
                            insert_ex,
141
                            table
142
                        )
143
                        failed_inserts += 1
144
                        # pdb.set_trace()
145
                    else:
146
                        successful_inserts += 1
147
            if failed_inserts > 0:
148
                _LOGGER.warning(
149
                    "%d failed INSERT statements in \"%s\"",
150
                    failed_inserts, table
151
                )
152
            if constraint_violations > 0:
153
                _LOGGER.warning(
154
                    "%d constraint violations statements in \"%s\"",
155
                    constraint_violations, table
156
                )
157
            _LOGGER.info(
158
                "%d successful INSERT statements in \"%s\"",
159
                successful_inserts, table
160
            )
161
162
163
def find_in_db(args):
164
    db = _load_db(args.sqlite_path)
165
    db.grep(args.needle)
166
167
168
def list_supported(args):  # pylint:disable=W0613
169
    registry = HeuristicsRegistry()
170
    registry.load_heuristics()
171
    for db in registry.groupings:
172
        print(db)
173
174
175
subcmd_actions = {
176
    'csv':  dump_to_csv,
177
    'grep': find_in_db,
178
    'undelete': undelete,
179
    'list': list_supported,
180
}
181
182
183
def subcmd_dispatcher(arg_ns):
184
    return subcmd_actions[arg_ns.subcmd](arg_ns)
185
186
187 View Code Duplication
def main():
0 ignored issues
show
This code seems to be duplicated in your project.
Loading history...
188
189
    verbose_parser = argparse.ArgumentParser(add_help=False)
190
    verbose_parser.add_argument(
191
        '-v', '--verbose',
192
        action='count',
193
        help='Give *A LOT* more output.',
194
    )
195
196
    cli_parser = argparse.ArgumentParser(
197
        description=PROJECT_DESCRIPTION,
198
        parents=[verbose_parser],
199
    )
200
201
    subcmd_parsers = cli_parser.add_subparsers(
202
        title='Subcommands',
203
        description='%(prog)s implements the following subcommands:',
204
        dest='subcmd',
205
    )
206
207
    csv_parser = subcmd_parsers.add_parser(
208
        'csv',
209
        parents=[verbose_parser],
210
        help='Dumps visible and recovered records to CSV files',
211
        description=(
212
            'Recovers as many records as possible from the database passed as '
213
            'argument and outputs all visible and recovered records to CSV '
214
            'files in output_dir'
215
        ),
216
    )
217
    csv_parser.add_argument(
218
        'sqlite_path',
219
        help='sqlite3 file path'
220
    )
221
    csv_parser.add_argument(
222
        'output_dir',
223
        nargs='?',
224
        default=None,
225
        help='Output directory'
226
    )
227
    csv_parser.add_argument(
228
        '-d', '--database-name',
229
        nargs='?',
230
        default=None,
231
        help='Database name'
232
    )
233
234
    list_parser = subcmd_parsers.add_parser(  # pylint:disable=W0612
235
        'list',
236
        parents=[verbose_parser],
237
        help='Displays supported DB types',
238
        description=(
239
            'Displays the names of all database types with table heuristics '
240
            'known to {}'.format(PROJECT_NAME)
241
        ),
242
    )
243
244
    grep_parser = subcmd_parsers.add_parser(
245
        'grep',
246
        parents=[verbose_parser],
247
        help='Matches a string in one or more pages of the database',
248
        description='Bar',
249
    )
250
    grep_parser.add_argument(
251
        'sqlite_path',
252
        help='sqlite3 file path'
253
    )
254
    grep_parser.add_argument(
255
        'needle',
256
        help='String to match in the database'
257
    )
258
259
    undelete_parser = subcmd_parsers.add_parser(
260
        'undelete',
261
        parents=[verbose_parser],
262
        help='Inserts recovered records into a copy of the database',
263
        description=(
264
            'Recovers as many records as possible from the database passed as '
265
            'argument and inserts all recovered records into a copy of'
266
            'the database.'
267
        ),
268
    )
269
    undelete_parser.add_argument(
270
        'sqlite_path',
271
        help='sqlite3 file path'
272
    )
273
    undelete_parser.add_argument(
274
        'output_path',
275
        help='Output database path'
276
    )
277
    undelete_parser.add_argument(
278
        '-d', '--database-name',
279
        nargs='?',
280
        default=None,
281
        help='Database name'
282
    )
283
284
    cli_args = cli_parser.parse_args()
285
    if cli_args.verbose:
286
        _LOGGER.setLevel(logging.DEBUG)
287
288
    if cli_args.subcmd:
289
        subcmd_dispatcher(cli_args)
290
    else:
291
        # No subcommand specified, print the usage and bail
292
        cli_parser.print_help()
293