Passed
Push — master ( ba6342...71f198 )
by P.R.
07:12
created

backuppc_clone.command.InitCloneCommand   A

Complexity

Total Complexity 15

Size/Duplication

Total Lines 144
Duplicated Lines 91.67 %

Importance

Changes 0
Metric Value
eloc 78
dl 132
loc 144
rs 10
c 0
b 0
f 0
wmc 15

5 Methods

Rating   Name   Duplication   Size   Complexity  
A InitCloneCommand.__create_database() 27 27 3
A InitCloneCommand.__create_dirs() 16 16 3
B InitCloneCommand._handle_command() 44 44 6
A InitCloneCommand._init_singletons() 5 5 1
A InitCloneCommand.__writing_config_clone() 22 22 2

How to fix   Duplicated Code   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

1
"""
2
BackupPC Clone
3
"""
4
import configparser
5
import os
6
import sqlite3
7
8
from backuppc_clone.command.BaseCommand import BaseCommand
9
from backuppc_clone.exception.BackupPcCloneException import BackupPcCloneException
10
11
12 View Code Duplication
class InitCloneCommand(BaseCommand):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
13
    """
14
    Creates the configuration file for a clone
15
16
    init-clone
17
    """
18
    parameters = [('SCHEMA_VERSION', 'schema version', '1'),
19
                  ('LAST_POOL_SYNC', 'timestamp of last original pool scan', '-1')]
20
21
    # ------------------------------------------------------------------------------------------------------------------
22
    def __create_dirs(self, top_dir_clone: str) -> None:
23
        """
24
        Creates required directories under the top dir of the clone.
25
26
        :param str top_dir_clone: The top directory of the clone.
27
        """
28
        os.chmod(top_dir_clone, 0o770)
29
30
        self._io.writeln(' Creating directories')
31
32
        dir_names = ['cpool', 'pool', 'etc', 'pc', 'tmp', 'trash']
33
        for dir_name in dir_names:
34
            path = os.path.join(top_dir_clone, dir_name)
35
            if not os.path.isdir(path):
36
                self._io.log_verbose('Creating directory <fso>{}</fso>'.format(dir_name))
37
                os.mkdir(path, 0o700)
38
39
    # ------------------------------------------------------------------------------------------------------------------
40
    def __create_database(self, db_path: str) -> None:
41
        """
42
        Creates the metadata database.
43
44
        :param str db_path: The path to the SQLite database.
45
        """
46
        sql_path = os.path.join(os.path.dirname(os.path.realpath(__file__)),
47
                                '..',
48
                                'lib',
49
                                'ddl',
50
                                '0100_create_tables.sql')
51
        with open(sql_path) as file:
52
            sql = file.read()
53
54
        self._io.section('Creating metadata database')
55
56
        self._io.writeln(' Initializing <fso>{}</fso>'.format(db_path))
57
        connection = sqlite3.connect(db_path)
58
        cursor = connection.cursor()
59
        cursor.executescript(sql)
60
61
        for parameter in self.parameters:
62
            cursor.execute('insert into BKC_PARAMETER(prm_code, prm_description, prm_value) values(?, ?, ?)',
63
                           parameter)
64
65
        connection.commit()
66
        connection.close()
67
68
    # ------------------------------------------------------------------------------------------------------------------
69
    def __writing_config_clone(self,
70
                               config_filename_clone: str,
71
                               config_filename_original: str,
72
                               name_clone: str,
73
                               name_master: str) -> None:
74
        """
75
        Creates the config file for the clone.
76
77
        :param str config_filename_clone: The path to the config file of the clone.
78
        :param str config_filename_original: The path to the config file of the original.
79
        :param str name_clone: The name of the clone.
80
        :param str name_master: The name of the master.
81
        """
82
        self._io.writeln(' Writing <fso>{}</fso>'.format(config_filename_clone))
83
84
        config = configparser.ConfigParser()
85
        config['BackupPC Clone'] = {'role': 'clone',
86
                                    'name': name_clone}
87
        config['Original'] = {'config': config_filename_original,
88
                              'name':   name_master}
89
        with open(config_filename_clone, 'w') as config_file:
90
            config.write(config_file)
91
92
    # ------------------------------------------------------------------------------------------------------------------
93
    def _init_singletons(self) -> None:
94
        """
95
        Omits the creating of singleton objects.
96
        """
97
        pass
98
99
    # ------------------------------------------------------------------------------------------------------------------
100
    def _handle_command(self) -> None:
101
        """
102
        Executes the command.
103
        """
104
        self._io.title('Initializing Clone')
105
106
        self._io.section('Creating configuration file')
107
108
        top_dir_original = self.ask('top dir of the original', '/var/lib/BackupPC')
109
        config_filename_original = os.path.join(top_dir_original, 'original.cfg')
110
111
        if not os.path.isfile(config_filename_original):
112
            raise BackupPcCloneException(
113
                'Configuration file {} of original not found'.format(config_filename_original))
114
115
        config_original = configparser.ConfigParser()
116
        config_original.read(config_filename_original)
117
118
        top_dir_clone = None
119
        while top_dir_clone is None or not os.path.isdir(top_dir_clone):
120
            top_dir_clone = self.ask('top dir of the clone')
121
122
        name_clone = self.ask('name of clone', config_original['BackupPC Clone']['name'] + '-clone')
123
124
        config_filename_clone = os.path.join(top_dir_clone, 'clone.cfg')
125
126
        if os.path.isfile(config_filename_clone):
127
            create = self.confirm('Overwrite {}'.format(config_filename_clone), False)
128
        else:
129
            create = True
130
131
        if create:
132
            self.__writing_config_clone(config_filename_clone,
133
                                        config_filename_original,
134
                                        name_clone,
135
                                        config_original['BackupPC Clone']['name'])
136
137
            self.__create_dirs(top_dir_clone)
138
139
            self._io.writeln('')
140
141
            self.__create_database(os.path.join(top_dir_clone, 'clone.db'))
142
        else:
143
            self._io.warning('No configuration file created')
144
145
# ----------------------------------------------------------------------------------------------------------------------
146