Issues (23)

backuppc_clone/helper/BackupScanner.py (1 issue)

1
import csv
2
import os
3
import shutil
4
from typing import Optional
5
6
from backuppc_clone.Config import Config
7
from backuppc_clone.ProgressBar import ProgressBar
8
from backuppc_clone.helper.BackupInfoScanner import BackupInfoScanner
9
from backuppc_clone.style.BackupPcCloneStyle import BackupPcCloneStyle
10
11
12 View Code Duplication
class BackupScanner:
0 ignored issues
show
This code seems to be duplicated in your project.
Loading history...
13
    """
14
    Helper class for scanning backup directories.
15
    """
16
17
    # ------------------------------------------------------------------------------------------------------------------
18
    def __init__(self, io: BackupPcCloneStyle):
19
        """
20
        Object constructor.
21
22
        @param BackupPcCloneStyle io: The output style.
23
        """
24
25
        self.__io: BackupPcCloneStyle = io
26
        """
27
        The output style.
28
        """
29
30
        self.__dir_count: int = 0
31
        """
32
        The file count.
33
        """
34
35
        self.__file_count: int = 0
36
        """
37
        The file count.
38
        """
39
40
        self.__entry_seq: int = 0
41
        """
42
        The entry sequence number.
43
        """
44
45
        self.progress: Optional[ProgressBar] = None
46
        """
47
        The progress counter.
48
        """
49
50
    # ------------------------------------------------------------------------------------------------------------------
51
    @property
52
    def dir_count(self) -> int:
53
        """
54
        Returns the number of found directories.
55
        """
56
        return self.__dir_count
57
58
    # ------------------------------------------------------------------------------------------------------------------
59
    @property
60
    def file_count(self) -> int:
61
        """
62
        Returns the number of found files.
63
        """
64
        return self.__file_count
65
66
    # ------------------------------------------------------------------------------------------------------------------
67
    def __scan_directory_helper(self, parent_dir: str, dir_name: str, csv_writer: csv.writer) -> None:
68
        """
69
        Scans recursively a list of directories and stores filenames and directories in CSV format.
70
71
        @param str parent_dir: The name of the parent directory.
72
        @param str dir_name: The name of the directory.
73
        @param csv.writer csv_writer: The CSV writer.
74
        """
75
        target_name = os.path.join(parent_dir, dir_name) if dir_name else parent_dir
76
77
        first_file = True
78
        sub_dir_names = []
79
        for entry in os.scandir(target_name):
80
            if entry.is_file():
81
                self.__file_count += 1
82
                if first_file:
83
                    first_file = False
84
                    self.__entry_seq += 1
85
                csv_writer.writerow((self.__entry_seq, entry.inode(), dir_name, entry.name))
86
87
                if entry.name not in ['attrib', 'backupInfo', 'backuppc-clone.csv']:
88
                    self.progress.advance()
89
90
            elif entry.is_dir():
91
                sub_dir_names.append(entry.name)
92
93
        for sub_dir_name in sorted(sub_dir_names):
94
            self.__entry_seq += 1
95
            self.__dir_count += 1
96
            csv_writer.writerow((self.__entry_seq, None, dir_name, sub_dir_name))
97
            self.__scan_directory_helper(parent_dir, os.path.join(dir_name, sub_dir_name), csv_writer)
98
99
    # ------------------------------------------------------------------------------------------------------------------
100
    def scan_directory(self, host: str, backup_no: int, csv_filename: str) -> None:
101
        """
102
        Scans recursively a list of directories and stores filenames and directories in CSV format.
103
104
        @param str host: The host name
105
        @param int backup_no: The backup number.
106
        @param str csv_filename: The filename of the CSV file.
107
        """
108
        self.__dir_count = 0
109
        self.__file_count = 0
110
        self.__entry_seq = 0
111
112
        backup_dir = Config.instance.backup_dir_original(host, backup_no)
113
114
        file_count = int(BackupInfoScanner.get_backup_info(backup_dir, 'nFiles'))
115
        self.progress = ProgressBar(self.__io.output, file_count)
116
117
        with open(csv_filename, 'w') as csv_file:
118
            csv_writer = csv.writer(csv_file)
119
            self.__io.writeln(' Scanning <fso>{}</fso>'.format(backup_dir))
120
            self.__io.writeln('')
121
            self.__scan_directory_helper(backup_dir, '', csv_writer)
122
            self.progress.finish()
123
124
    # ------------------------------------------------------------------------------------------------------------------
125
    def pre_scan_directory(self, host: str, backup_no: int) -> None:
126
        """
127
        Scans recursively a list of directories and stores filenames and directories in CSV format.
128
129
        @param str host: The host name
130
        @param int backup_no: The backup number.
131
        """
132
        self.__dir_count = 0
133
        self.__file_count = 0
134
        self.__entry_seq = 0
135
136
        backup_dir = Config.instance.backup_dir_original(host, backup_no)
137
138
        csv_filename1 = os.path.join(Config.instance.tmp_dir_clone, '.backup-{}-{}.csv'.format(host, backup_no))
139
        csv_filename2 = os.path.join(backup_dir, 'backuppc-clone.csv')
140
141
        file_count = int(BackupInfoScanner.get_backup_info(backup_dir, 'nFiles'))
142
        self.progress = ProgressBar(self.__io.output, file_count)
143
144
        with open(csv_filename1, 'w') as csv_file:
145
            csv_writer = csv.writer(csv_file)
146
            self.__io.writeln(' Scanning <fso>{}</fso>'.format(backup_dir))
147
            self.__io.writeln('')
148
            self.__scan_directory_helper(backup_dir, '', csv_writer)
149
            self.progress.finish()
150
151
        shutil.move(csv_filename1, csv_filename2)
152
        self.__io.writeln('')
153
        self.__io.writeln(' Wrote <fso>{}</fso>'.format(csv_filename2))
154
        self.__io.writeln('')
155
156
# ----------------------------------------------------------------------------------------------------------------------
157