Passed
Push — master ( a4c24d...9369e9 )
by Konrad
02:32
created

db_sync_tool.database.utility.truncate_tables()   B

Complexity

Conditions 6

Size

Total Lines 23
Code Lines 16

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 16
dl 0
loc 23
rs 8.6666
c 0
b 0
f 0
cc 6
nop 0
1
#!/usr/bin/env python3
2
# -*- coding: future_fstrings -*-
3
4
"""
5
Utility script
6
"""
7
8
import sys
9
import datetime
10
import re
11
from db_sync_tool.utility import mode, system, helper, output
12
13
database_dump_file_name = None
0 ignored issues
show
Coding Style Naming introduced by
Constant name "database_dump_file_name" doesn't conform to UPPER_CASE naming style ('([^\\W\\da-z][^\\Wa-z]*|__.*__)$' pattern)

This check looks for invalid names for a range of different identifiers.

You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements.

If your project includes a Pylint configuration file, the settings contained in that file take precedence.

To find out more about Pylint, please refer to their site.

Loading history...
14
15
16
class DatabaseSystem:
0 ignored issues
show
introduced by
Missing class docstring
Loading history...
17
    MYSQL = 'MySQL'
18
    MARIADB = 'MariaDB'
19
20
21
def run_database_command(client, command, force_database_name=False):
22
    """
23
    Run a database command using the "mysql -e" command
24
    :param client: String
25
    :param command: String database command
26
    :param force_database_name: Bool forces the database name
27
    :return:
28
    """
29
    _database_name = ' ' + system.config[client]['db']['name'] if force_database_name else ''
30
31
    return mode.run_command(
32
        helper.get_command(client, 'mysql') + ' ' + generate_mysql_credentials(
33
            client) + _database_name + ' -e "' + command + '"',
34
        client, True)
35
36
37
def generate_database_dump_filename():
38
    """
39
    Generate a database dump filename like "_[name]_[date].sql" or using the give filename
40
    :return:
41
    """
42
    global database_dump_file_name
0 ignored issues
show
Coding Style Naming introduced by
Constant name "database_dump_file_name" doesn't conform to UPPER_CASE naming style ('([^\\W\\da-z][^\\Wa-z]*|__.*__)$' pattern)

This check looks for invalid names for a range of different identifiers.

You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements.

If your project includes a Pylint configuration file, the settings contained in that file take precedence.

To find out more about Pylint, please refer to their site.

Loading history...
Coding Style introduced by
Usage of the global statement should be avoided.

Usage of global can make code hard to read and test, its usage is generally not recommended unless you are dealing with legacy code.

Loading history...
43
44
    if system.config['dump_name'] == '':
45
        # _project-db_20-08-2020_12-37.sql
46
        _now = datetime.datetime.now()
47
        database_dump_file_name = '_' + system.config[mode.Client.ORIGIN]['db']['name'] + '_' + _now.strftime(
0 ignored issues
show
Coding Style introduced by
This line is too long as per the coding-style (110/100).

This check looks for lines that are too long. You can specify the maximum line length.

Loading history...
48
            "%d-%m-%Y_%H-%M") + '.sql'
49
    else:
50
        database_dump_file_name = system.config['dump_name'] + '.sql'
51
52
53
def truncate_tables():
54
    """
55
    Generate the ignore tables options for the mysqldump command by the given table list
56
    # ToDo: Too much conditional nesting
57
    :return: String
58
    """
59
    if 'truncate_table' in system.config:
60
        output.message(
61
            output.Subject.TARGET,
62
            'Truncating tables before import',
63
            True
64
        )
65
        for _table in system.config['truncate_table']:
66
            if '*' in _table:
67
                _wildcard_tables = get_database_tables_like(mode.Client.TARGET,
68
                                                            _table.replace('*', ''))
69
                if _wildcard_tables:
70
                    for _wildcard_table in _wildcard_tables:
71
                        _sql_command = f'TRUNCATE TABLE IF EXISTS {_wildcard_table}'
72
                        run_database_command(mode.Client.TARGET, _sql_command, True)
73
            else:
74
                _sql_command = f'TRUNCATE TABLE IF EXISTS {_table}'
75
                run_database_command(mode.Client.TARGET, _sql_command, True)
76
77
78
def generate_ignore_database_tables():
79
    """
80
    Generate the ignore tables options for the mysqldump command by the given table list
81
    # ToDo: Too much conditional nesting
82
    :return: String
83
    """
84
    _ignore_tables = []
85
    if 'ignore_table' in system.config:
86
        for table in system.config['ignore_table']:
87
            if '*' in table:
88
                _wildcard_tables = get_database_tables_like(mode.Client.ORIGIN,
89
                                                            table.replace('*', ''))
90
                if _wildcard_tables:
91
                    for wildcard_table in _wildcard_tables:
92
                        _ignore_tables = generate_ignore_database_table(_ignore_tables,
93
                                                                        wildcard_table)
94
            else:
95
                _ignore_tables = generate_ignore_database_table(_ignore_tables, table)
96
        return ' '.join(_ignore_tables)
97
    return ''
98
99
100
def generate_ignore_database_table(ignore_tables, table):
101
    """
102
    :param ignore_tables: Dictionary
103
    :param table: String
104
    :return: Dictionary
105
    """
106
    ignore_tables.append('--ignore-table=' + system.config['origin']['db']['name'] + '.' + table)
107
    return ignore_tables
108
109
110
def get_database_tables_like(client, name):
0 ignored issues
show
Unused Code introduced by
Either all return statements in a function should return an expression, or none of them should.
Loading history...
111
    """
112
    Get database table names like the given name
113
    :param client: String
114
    :param name: String
115
    :return: Dictionary
116
    """
117
    _dbname = system.config[client]['db']['name']
118
    _tables = run_database_command(client, f'SHOW TABLES FROM {_dbname} LIKE \'%{name}%\';').strip()
119
    if _tables != '':
120
        return _tables.split('\n')[1:]
121
    return
122
123
124
def get_database_tables():
125
    """
126
    Generate specific tables for export
127
    :return: String
128
    """
129
    if system.config['tables'] == '':
130
        return ''
131
132
    _result = ''
133
    _tables = system.config['tables'].split(',')
134
    for _table in _tables:
135
        _result += '\'' + _table + '\' '
136
    return _result
137
138
139
def generate_mysql_credentials(client):
140
    """
141
    Generate the needed database credential information for the mysql command
142
    :param client: String
143
    :return:
144
    """
145
    _credentials = '-u\'' + system.config[client]['db']['user'] + '\' -p\'' + \
146
                   system.config[client]['db'][
147
                       'password'] + '\''
148
    if 'host' in system.config[client]['db']:
149
        _credentials += ' -h\'' + system.config[client]['db']['host'] + '\''
150
    if 'port' in system.config[client]['db']:
151
        _credentials += ' -P\'' + str(system.config[client]['db']['port']) + '\''
152
    return _credentials
153
154
155
def check_database_dump(client, filepath):
156
    """
157
    Checking the last line of the dump file if it contains "-- Dump completed on"
158
    :param client: String
159
    :param filepath: String
160
    :return:
161
    """
162
    if system.config['check_dump']:
163
        _line = mode.run_command(
164
            helper.get_command(client, 'tail') + ' -n 1 ' + filepath,
165
            client,
166
            True,
167
            skip_dry_run=True
168
        )
169
170
        if not _line:
171
            return
172
173
        if "-- Dump completed on" not in _line:
174
            sys.exit(
175
                output.message(
176
                    output.Subject.ERROR,
177
                    'Dump file is corrupted',
178
                    do_print=False
179
                )
180
            )
181
        else:
182
            output.message(
183
                output.host_to_subject(client),
184
                'Dump file is valid',
185
                verbose_only=True
186
            )
187
188
189
def count_tables(client, filepath):
190
    """
191
    Count the reference string in the database dump file to get the count of all exported tables
192
    :param client: String
193
    :param filepath: String
194
    :return:
195
    """
196
    _reference = 'CREATE TABLE'
197
    _count = mode.run_command(
198
        f'{helper.get_command(client, "grep")} -ao "{_reference}" {filepath} | wc -l | xargs',
199
        client,
200
        True,
201
        skip_dry_run=True
202
    )
203
204
    if _count:
205
        output.message(
206
            output.host_to_subject(client),
207
            f'{int(_count)} table(s) exported'
208
        )
209
210
211
def get_database_version(client):
212
    """
213
    Check the database version and distinguish between mysql and mariadb
214
    :param client:
215
    :return: Tuple<String,String>
216
    """
217
    _database_system = None
218
    _version_number = None
219
    try:
220
        _database_version = run_database_command(client, 'SELECT VERSION();').splitlines()[1]
221
        _database_system = DatabaseSystem.MYSQL
222
223
        _version_number = re.search('(\d+\.)?(\d+\.)?(\*|\d+)', _database_version).group()
0 ignored issues
show
Bug introduced by
A suspicious escape sequence \d was found. Did you maybe forget to add an r prefix?

Escape sequences in Python are generally interpreted according to rules similar to standard C. Only if strings are prefixed with r or R are they interpreted as regular expressions.

The escape sequence that was used indicates that you might have intended to write a regular expression.

Learn more about the available escape sequences. in the Python documentation.

Loading history...
Bug introduced by
A suspicious escape sequence \. was found. Did you maybe forget to add an r prefix?

Escape sequences in Python are generally interpreted according to rules similar to standard C. Only if strings are prefixed with r or R are they interpreted as regular expressions.

The escape sequence that was used indicates that you might have intended to write a regular expression.

Learn more about the available escape sequences. in the Python documentation.

Loading history...
Bug introduced by
A suspicious escape sequence \* was found. Did you maybe forget to add an r prefix?

Escape sequences in Python are generally interpreted according to rules similar to standard C. Only if strings are prefixed with r or R are they interpreted as regular expressions.

The escape sequence that was used indicates that you might have intended to write a regular expression.

Learn more about the available escape sequences. in the Python documentation.

Loading history...
224
225
        if DatabaseSystem.MARIADB.lower() in _database_version.lower():
226
            _database_system = DatabaseSystem.MARIADB
227
228
        output.message(
229
            output.host_to_subject(client),
230
            f'Database version: {_database_system} v{_version_number}',
231
            True
232
        )
233
    finally:
234
        return _database_system, _version_number
0 ignored issues
show
Bug Best Practice introduced by
return statements in finally blocks should be avoided.

Placing a return statement inside finally will swallow all exceptions that may have been thrown in the try block.

Loading history...
235
0 ignored issues
show
coding-style introduced by
Trailing newlines
Loading history...
236