db_sync_tool.database.utility   A
last analyzed

Complexity

Total Complexity 36

Size/Duplication

Total Lines 244
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 36
eloc 127
dl 0
loc 244
rs 9.52
c 0
b 0
f 0

11 Functions

Rating   Name   Duplication   Size   Complexity  
A generate_database_dump_filename() 0 14 2
A run_database_command() 0 14 2
A get_database_tables_like() 0 12 2
A get_database_tables() 0 13 3
A generate_ignore_database_table() 0 8 1
B truncate_tables() 0 27 7
B generate_ignore_database_tables() 0 24 7
A generate_mysql_credentials() 0 15 4
A count_tables() 0 19 2
A check_database_dump() 0 31 4
A get_database_version() 0 24 2
1
#!/usr/bin/env python3
2
# -*- coding: future_fstrings -*-
3
4
"""
5
Utility script
6
"""
7
8
import sys
9
import datetime
10
import re
11
from db_sync_tool.utility import mode, system, helper, output
12
13
database_dump_file_name = None
0 ignored issues
show
Coding Style Naming introduced by
Constant name "database_dump_file_name" doesn't conform to UPPER_CASE naming style ('([^\\W\\da-z][^\\Wa-z]*|__.*__)$' pattern)

This check looks for invalid names for a range of different identifiers.

You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements.

If your project includes a Pylint configuration file, the settings contained in that file take precedence.

To find out more about Pylint, please refer to their site.

Loading history...
14
15
16
class DatabaseSystem:
0 ignored issues
show
introduced by
Missing class docstring
Loading history...
17
    MYSQL = 'MySQL'
18
    MARIADB = 'MariaDB'
19
20
21
def run_database_command(client, command, force_database_name=False):
22
    """
23
    Run a database command using the "mysql -e" command
24
    :param client: String
25
    :param command: String database command
26
    :param force_database_name: Bool forces the database name
27
    :return:
28
    """
29
    _database_name = ' ' + system.config[client]['db']['name'] if force_database_name else ''
30
31
    return mode.run_command(
32
        helper.get_command(client, 'mysql') + ' ' + generate_mysql_credentials(
33
            client) + _database_name + ' -e "' + command + '"',
34
        client, True)
35
36
37
def generate_database_dump_filename():
38
    """
39
    Generate a database dump filename like "_[name]_[date].sql" or using the give filename
40
    :return:
41
    """
42
    global database_dump_file_name
0 ignored issues
show
Coding Style Naming introduced by
Constant name "database_dump_file_name" doesn't conform to UPPER_CASE naming style ('([^\\W\\da-z][^\\Wa-z]*|__.*__)$' pattern)

This check looks for invalid names for a range of different identifiers.

You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements.

If your project includes a Pylint configuration file, the settings contained in that file take precedence.

To find out more about Pylint, please refer to their site.

Loading history...
Coding Style introduced by
Usage of the global statement should be avoided.

Usage of global can make code hard to read and test, its usage is generally not recommended unless you are dealing with legacy code.

Loading history...
43
44
    if system.config['dump_name'] == '':
45
        # _project-db_2022-08-22_12-37.sql
46
        _now = datetime.datetime.now()
47
        database_dump_file_name = '_' + system.config[mode.Client.ORIGIN]['db']['name'] + '_' + _now.strftime(
0 ignored issues
show
Coding Style introduced by
This line is too long as per the coding-style (110/100).

This check looks for lines that are too long. You can specify the maximum line length.

Loading history...
48
            "%Y-%m-%d_%H-%M") + '.sql'
49
    else:
50
        database_dump_file_name = system.config['dump_name'] + '.sql'
51
52
53
def truncate_tables():
54
    """
55
    Generate the ignore tables options for the mysqldump command by the given table list
56
    # ToDo: Too much conditional nesting
57
    :return: String
58
    """
59
    # Workaround for config naming
60
    if 'truncate_table' in system.config:
61
        system.config['truncate_tables'] = system.config['truncate_table']
62
63
    if 'truncate_tables' in system.config:
64
        output.message(
65
            output.Subject.TARGET,
66
            'Truncating tables before import',
67
            True
68
        )
69
        for _table in system.config['truncate_tables']:
70
            if '*' in _table:
71
                _wildcard_tables = get_database_tables_like(mode.Client.TARGET,
72
                                                            _table.replace('*', '%'))
73
                if _wildcard_tables:
74
                    for _wildcard_table in _wildcard_tables:
75
                        _sql_command = f'TRUNCATE TABLE IF EXISTS {_wildcard_table}'
76
                        run_database_command(mode.Client.TARGET, _sql_command, True)
77
            else:
78
                _sql_command = f'TRUNCATE TABLE IF EXISTS {_table}'
79
                run_database_command(mode.Client.TARGET, _sql_command, True)
80
81
82
def generate_ignore_database_tables():
83
    """
84
    Generate the ignore tables options for the mysqldump command by the given table list
85
    # ToDo: Too much conditional nesting
86
    :return: String
87
    """
88
    # Workaround for config naming
89
    if 'ignore_table' in system.config:
90
        system.config['ignore_tables'] = system.config['ignore_table']
91
92
    _ignore_tables = []
93
    if 'ignore_tables' in system.config:
94
        for table in system.config['ignore_tables']:
95
            if '*' in table:
96
                _wildcard_tables = get_database_tables_like(mode.Client.ORIGIN,
97
                                                            table.replace('*', '%'))
98
                if _wildcard_tables:
99
                    for wildcard_table in _wildcard_tables:
100
                        _ignore_tables = generate_ignore_database_table(_ignore_tables,
101
                                                                        wildcard_table)
102
            else:
103
                _ignore_tables = generate_ignore_database_table(_ignore_tables, table)
104
        return ' '.join(_ignore_tables)
105
    return ''
106
107
108
def generate_ignore_database_table(ignore_tables, table):
109
    """
110
    :param ignore_tables: Dictionary
111
    :param table: String
112
    :return: Dictionary
113
    """
114
    ignore_tables.append('--ignore-table=' + system.config['origin']['db']['name'] + '.' + table)
115
    return ignore_tables
116
117
118
def get_database_tables_like(client, name):
0 ignored issues
show
Unused Code introduced by
Either all return statements in a function should return an expression, or none of them should.
Loading history...
119
    """
120
    Get database table names like the given name
121
    :param client: String
122
    :param name: String
123
    :return: Dictionary
124
    """
125
    _dbname = system.config[client]['db']['name']
126
    _tables = run_database_command(client, f'SHOW TABLES FROM \`{_dbname}\` LIKE \'{name}\';').strip()
0 ignored issues
show
Bug introduced by
A suspicious escape sequence \` was found. Did you maybe forget to add an r prefix?

Escape sequences in Python are generally interpreted according to rules similar to standard C. Only if strings are prefixed with r or R are they interpreted as regular expressions.

The escape sequence that was used indicates that you might have intended to write a regular expression.

Learn more about the available escape sequences. in the Python documentation.

Loading history...
Coding Style introduced by
This line is too long as per the coding-style (102/100).

This check looks for lines that are too long. You can specify the maximum line length.

Loading history...
127
    if _tables != '':
128
        return _tables.split('\n')[1:]
129
    return
130
131
132
def get_database_tables():
133
    """
134
    Generate specific tables for export
135
    :return: String
136
    """
137
    if system.config['tables'] == '':
138
        return ''
139
140
    _result = ' '
141
    _tables = system.config['tables'].split(',')
142
    for _table in _tables:
143
        _result += '\'' + _table + '\' '
144
    return _result
145
146
147
def generate_mysql_credentials(client, force_password=True):
148
    """
149
    Generate the needed database credential information for the mysql command
150
    :param client: String
151
    :param force_password: Bool
152
    :return:
153
    """
154
    _credentials = '-u\'' + system.config[client]['db']['user'] + '\''
155
    if force_password:
156
        _credentials += ' -p\'' + system.config[client]['db']['password'] + '\''
157
    if 'host' in system.config[client]['db']:
158
        _credentials += ' -h\'' + system.config[client]['db']['host'] + '\''
159
    if 'port' in system.config[client]['db']:
160
        _credentials += ' -P\'' + str(system.config[client]['db']['port']) + '\''
161
    return _credentials
162
163
164
def check_database_dump(client, filepath):
165
    """
166
    Checking the last line of the dump file if it contains "-- Dump completed on"
167
    :param client: String
168
    :param filepath: String
169
    :return:
170
    """
171
    if system.config['check_dump']:
172
        _line = mode.run_command(
173
            helper.get_command(client, 'tail') + ' -n 1 ' + filepath,
174
            client,
175
            True,
176
            skip_dry_run=True
177
        )
178
179
        if not _line:
180
            return
181
182
        if "-- Dump completed on" not in _line:
183
            sys.exit(
184
                output.message(
185
                    output.Subject.ERROR,
186
                    'Dump file is corrupted',
187
                    do_print=False
188
                )
189
            )
190
        else:
191
            output.message(
192
                output.host_to_subject(client),
193
                'Dump file is valid',
194
                verbose_only=True
195
            )
196
197
198
def count_tables(client, filepath):
199
    """
200
    Count the reference string in the database dump file to get the count of all exported tables
201
    :param client: String
202
    :param filepath: String
203
    :return:
204
    """
205
    _reference = 'CREATE TABLE'
206
    _count = mode.run_command(
207
        f'{helper.get_command(client, "grep")} -ao "{_reference}" {filepath} | wc -l | xargs',
208
        client,
209
        True,
210
        skip_dry_run=True
211
    )
212
213
    if _count:
214
        output.message(
215
            output.host_to_subject(client),
216
            f'{int(_count)} table(s) exported'
217
        )
218
219
220
def get_database_version(client):
221
    """
222
    Check the database version and distinguish between mysql and mariadb
223
    :param client:
224
    :return: Tuple<String,String>
225
    """
226
    _database_system = None
227
    _version_number = None
228
    try:
229
        _database_version = run_database_command(client, 'SELECT VERSION();').splitlines()[1]
230
        _database_system = DatabaseSystem.MYSQL
231
232
        _version_number = re.search('(\d+\.)?(\d+\.)?(\*|\d+)', _database_version).group()
0 ignored issues
show
Bug introduced by
A suspicious escape sequence \d was found. Did you maybe forget to add an r prefix?

Escape sequences in Python are generally interpreted according to rules similar to standard C. Only if strings are prefixed with r or R are they interpreted as regular expressions.

The escape sequence that was used indicates that you might have intended to write a regular expression.

Learn more about the available escape sequences. in the Python documentation.

Loading history...
Bug introduced by
A suspicious escape sequence \. was found. Did you maybe forget to add an r prefix?

Escape sequences in Python are generally interpreted according to rules similar to standard C. Only if strings are prefixed with r or R are they interpreted as regular expressions.

The escape sequence that was used indicates that you might have intended to write a regular expression.

Learn more about the available escape sequences. in the Python documentation.

Loading history...
Bug introduced by
A suspicious escape sequence \* was found. Did you maybe forget to add an r prefix?

Escape sequences in Python are generally interpreted according to rules similar to standard C. Only if strings are prefixed with r or R are they interpreted as regular expressions.

The escape sequence that was used indicates that you might have intended to write a regular expression.

Learn more about the available escape sequences. in the Python documentation.

Loading history...
233
234
        if DatabaseSystem.MARIADB.lower() in _database_version.lower():
235
            _database_system = DatabaseSystem.MARIADB
236
237
        output.message(
238
            output.host_to_subject(client),
239
            f'Database version: {_database_system} v{_version_number}',
240
            True
241
        )
242
    finally:
243
        return _database_system, _version_number
0 ignored issues
show
Bug Best Practice introduced by
return statements in finally blocks should be avoided.

Placing a return statement inside finally will swallow all exceptions that may have been thrown in the try block.

Loading history...
244
0 ignored issues
show
coding-style introduced by
Trailing newlines
Loading history...
245