db_sync_tool.remote.rsync   A
last analyzed

Complexity

Total Complexity 27

Size/Duplication

Total Lines 160
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 27
eloc 70
dl 0
loc 160
rs 10
c 0
b 0
f 0

8 Functions

Rating   Name   Duplication   Size   Complexity  
A unit_converter() 0 17 4
B get_authorization() 0 25 7
A get_host() 0 9 2
A get_password_environment() 0 13 5
A read_stats() 0 15 3
A run_rsync_command() 0 23 3
A parse_string() 0 10 1
A get_options() 0 9 2
1
#!/usr/bin/env python3
2
# -*- coding: future_fstrings -*-
3
4
"""
5
rsync script
6
"""
7
8
import re
9
from db_sync_tool.utility import mode, system, output
10
11
# Default options for rsync command
12
# https://wiki.ubuntuusers.de/rsync/
13
default_options = [
14
    '--delete',
15
    '-a',
16
    '-z',
17
    '--stats',
18
    '--human-readable',
19
    '--iconv=UTF-8',
20
    '--chmod=D2770,F660'
21
]
22
23
24
def get_password_environment(client):
25
    """
26
    Optionally create a password environment variable for sshpass password authentication
27
    https://www.redhat.com/sysadmin/ssh-automation-sshpass
28
    :param client: String
29
    :return:
30
    """
31
    if not client:
32
        return ''
33
34
    if system.config['use_sshpass'] and not 'ssh_key' in system.config[client] and 'password' in system.config[client]:
0 ignored issues
show
Coding Style introduced by
This line is too long as per the coding-style (119/100).

This check looks for lines that are too long. You can specify the maximum line length.

Loading history...
35
        return f'SSHPASS=\'{system.config[client]["password"]}\' '
36
    return ''
37
38
39
def get_authorization(client):
40
    """
41
    Define authorization arguments for rsync command
42
    :param client: String
43
    :return: String
44
    """
45
    _ssh_key = None
46
    if not client:
47
        return ''
48
49
    if 'ssh_key' in system.config[client]:
50
        _ssh_key = system.config[mode.Client.ORIGIN]['ssh_key']
51
52
    _ssh_port = system.config[client]['port'] if 'port' in system.config[client] else 22
53
54
    if _ssh_key is None:
55
        if system.config['use_sshpass'] and get_password_environment(client):
0 ignored issues
show
unused-code introduced by
Unnecessary "else" after "return"
Loading history...
56
            # In combination with SSHPASS environment variable
57
            # https://www.redhat.com/sysadmin/ssh-automation-sshpass
58
            return f'--rsh="sshpass -e ssh -p{_ssh_port} -o StrictHostKeyChecking=no -l {system.config[client]["user"]}"'
0 ignored issues
show
Coding Style introduced by
This line is too long as per the coding-style (121/100).

This check looks for lines that are too long. You can specify the maximum line length.

Loading history...
59
        else:
60
            return f'-e "ssh -p{_ssh_port} -o StrictHostKeyChecking=no"'
61
    else:
62
        # Provide ssh key file path for ssh authentication
63
        return f'-e "ssh -i {_ssh_key} -p{_ssh_port}"'
64
65
66
def get_host(client):
67
    """
68
    Return user@host if client is not local
69
    :param client: String
70
    :return: String
71
    """
72
    if mode.is_remote(client):
73
        return f'{system.config[client]["user"]}@{system.config[client]["host"]}:'
74
    return ''
75
76
77
def get_options():
78
    """
79
    Prepare rsync options with stored default options and provided addtional options
80
    :return: String
81
    """
82
    _options = f'{" ".join(default_options)}'
83
    if not system.config['use_rsync_options'] is None:
84
        _options += f'{system.config["use_rsync_options"]}'
85
    return _options
86
87
88
def read_stats(stats):
89
    """
90
    Read rsync stats and print a summary
91
    :param stats: String
92
    :return:
93
    """
94
    if system.config['verbose']:
95
        print(f'{output.Subject.DEBUG}{output.CliFormat.BLACK}{stats}{output.CliFormat.ENDC}')
96
97
    _file_size = parse_string(stats, r'Total transferred file size:\s*([\d.]+[MKG]?)')
98
99
    if _file_size:
100
        output.message(
101
            output.Subject.INFO,
102
            f'Status: {unit_converter(_file_size[0])} transferred'
103
        )
104
105
106
def parse_string(string, regex):
107
    """
108
    Parse string by given regex
109
    :param string: String
110
    :param regex: String
111
    :return:
112
    """
113
    _file_size_pattern = regex
114
    _regex_matcher = re.compile(_file_size_pattern)
115
    return _regex_matcher.findall(string)
116
117
118
def unit_converter(size_in_bytes):
119
    """
120
121
    :param size_in_bytes:
122
    :return:
123
    """
124
    units = ['Bytes', 'kB', 'MB', 'GB']
125
126
    if isinstance(size_in_bytes, (int, float)):
127
        _convertedSize = float(size_in_bytes)
0 ignored issues
show
Coding Style Naming introduced by
Variable name "_convertedSize" doesn't conform to snake_case naming style ('([^\\W\\dA-Z][^\\WA-Z]2,|_[^\\WA-Z]*|__[^\\WA-Z\\d_][^\\WA-Z]+__)$' pattern)

This check looks for invalid names for a range of different identifiers.

You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements.

If your project includes a Pylint configuration file, the settings contained in that file take precedence.

To find out more about Pylint, please refer to their site.

Loading history...
128
        for unit in units:
129
            if _convertedSize < 1024:
130
                return str(_convertedSize) + ' ' + unit
131
            _convertedSize = _convertedSize / 1024
0 ignored issues
show
Coding Style Naming introduced by
Variable name "_convertedSize" doesn't conform to snake_case naming style ('([^\\W\\dA-Z][^\\WA-Z]2,|_[^\\WA-Z]*|__[^\\WA-Z\\d_][^\\WA-Z]+__)$' pattern)

This check looks for invalid names for a range of different identifiers.

You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements.

If your project includes a Pylint configuration file, the settings contained in that file take precedence.

To find out more about Pylint, please refer to their site.

Loading history...
132
133
        return _convertedSize
134
    return size_in_bytes
135
136
137
def run_rsync_command(remote_client, origin_path, target_path, origin_ssh='', target_ssh=''):
138
    """
139
140
    :param target_ssh:
141
    :param origin_ssh:
142
    :param target_path:
143
    :param origin_path:
144
    :param remote_client:
145
    :return:
146
    """
147
    if origin_ssh != '':
148
        origin_ssh += ':'
149
    if target_ssh != '':
150
        target_ssh += ':'
151
152
    _output = mode.run_command(
153
        f'{get_password_environment(remote_client)}rsync {get_options()} '
154
        f'{get_authorization(remote_client)} '
155
        f'{origin_ssh}{origin_path} {target_ssh}{target_path}',
156
        mode.Client.LOCAL,
157
        True
158
    )
159
    read_stats(_output)
160