Passed
Push — main ( a042e4...253d03 )
by Peter
01:15
created

pyclean.modern.CleanupRunner.__init__()   A

Complexity

Conditions 1

Size

Total Lines 9
Code Lines 8

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 8
nop 1
dl 0
loc 9
rs 10
c 0
b 0
f 0
1
"""
2
Modern, cross-platform, pure-Python pyclean implementation.
3
"""
4
import logging
5
6
try:
7
    from pathlib import Path
8
except ImportError:  # Python 2.7, PyPy2
9
    from warnings import warn
10
    warn("Python 3 required for modern implementation. Python 2 is obsolete.")
11
12
BYTECODE_FILES = ['.pyc', '.pyo']
13
BYTECODE_DIRS = ['__pycache__']
14
DEBRIS_TOPICS = {
15
    'cache': [
16
        '.cache/**/*',
17
        '.cache/',
18
    ],
19
    'coverage': [
20
        '.coverage',
21
        'coverage.json',
22
        'coverage.lcov',
23
        'coverage.xml',
24
        'htmlcov/**/*',
25
        'htmlcov/',
26
    ],
27
    'jupyter': [
28
        '.ipynb_checkpoints/**/*',
29
        '.ipynb_checkpoints/',
30
    ],
31
    'package': [
32
        'build/bdist.*/**/*',
33
        'build/bdist.*/',
34
        'build/lib/**/*',
35
        'build/lib/',
36
        'build/',
37
        'dist/**/*',
38
        'dist/',
39
        'sdist/**/*',
40
        'sdist/',
41
        '*.egg-info/**/*',
42
        '*.egg-info/',
43
    ],
44
    'pytest': [
45
        '.pytest_cache/**/*',
46
        '.pytest_cache/',
47
        'pytestdebug.log',
48
    ],
49
    'tox': [
50
        '.tox/**/*',
51
        '.tox/',
52
    ],
53
}
54
55
56
class CleanupRunner:  # pylint: disable=too-few-public-methods
57
    """Module-level configuration and value store."""
58
59
    def __init__(self):
60
        """Cleanup runner with optional dry-run behavior."""
61
        self.unlink = None
62
        self.rmdir = None
63
        self.ignore = None
64
        self.unlink_count = None
65
        self.unlink_failed = None
66
        self.rmdir_count = None
67
        self.rmdir_failed = None
68
69
    def configure(self, args):
70
        """Set up runner according to command line options."""
71
        self.unlink = print_filename if args.dry_run else remove_file
72
        self.rmdir = print_dirname if args.dry_run else remove_directory
73
        self.ignore = args.ignore
74
        self.unlink_count = 0
75
        self.unlink_failed = 0
76
        self.rmdir_count = 0
77
        self.rmdir_failed = 0
78
79
80
log = logging.getLogger(__name__)
81
Runner = CleanupRunner()
82
83
84
def remove_file(fileobj):
85
    """Attempt to delete a file object for real."""
86
    log.debug("Deleting file: %s", fileobj)
87
    try:
88
        fileobj.unlink()
89
        Runner.unlink_count += 1
90
    except OSError as err:
91
        log.debug("File not deleted. %s", err)
92
        Runner.unlink_failed += 1
93
94
95
def remove_directory(dirobj):
96
    """Attempt to remove a directory object for real."""
97
    log.debug("Removing directory: %s", dirobj)
98
    try:
99
        dirobj.rmdir()
100
        Runner.rmdir_count += 1
101
    except OSError as err:
102
        log.debug("Directory not removed. %s", err)
103
        Runner.rmdir_failed += 1
104
105
106
def print_filename(fileobj):
107
    """Only display the file name, used with --dry-run."""
108
    log.debug("Would delete file: %s", fileobj)
109
    Runner.unlink_count += 1
110
111
112
def print_dirname(dirobj):
113
    """Only display the directory name, used with --dry-run."""
114
    log.debug("Would delete directory: %s", dirobj)
115
    Runner.rmdir_count += 1
116
117
118
def pyclean(args):
119
    """Cross-platform cleaning of Python bytecode."""
120
    Runner.configure(args)
121
122
    for dir_name in args.directory:
123
        dir_path = Path(dir_name)
124
125
        log.info("Cleaning directory %s", dir_path)
126
        descend_and_clean(dir_path, BYTECODE_FILES, BYTECODE_DIRS)
127
128
        for topic in args.debris:
129
            remove_debris_for(topic, dir_path)
130
131
        remove_freeform_targets(args.erase, args.yes, dir_path)
132
133
    log.info("Total %d files, %d directories %s.",
134
             Runner.unlink_count, Runner.rmdir_count,
135
             "would be removed" if args.dry_run else "removed")
136
137
    if Runner.unlink_failed or Runner.rmdir_failed:
138
        log.debug("%d files, %d directories %s not be removed.",
139
                  Runner.unlink_failed, Runner.rmdir_failed,
140
                  "would" if args.dry_run else "could")
141
142
143
def descend_and_clean(directory, file_types, dir_names):
144
    """
145
    Walk and descend a directory tree, cleaning up files of a certain type
146
    along the way. Only delete directories if they are empty, in the end.
147
    """
148
    for child in sorted(directory.iterdir()):
149
        if child.is_file():
150
            if child.suffix in file_types:
151
                Runner.unlink(child)
152
        elif child.is_dir():
153
            if child.name in Runner.ignore:
154
                log.debug("Skipping %s", child)
155
            else:
156
                descend_and_clean(child, file_types, dir_names)
157
158
            if child.name in dir_names:
159
                Runner.rmdir(child)
160
        else:
161
            log.debug("Ignoring %s", child)
162
163
164
def remove_debris_for(topic, directory):
165
    """
166
    Clean up debris for a specific topic.
167
    """
168
    log.debug("Scanning for debris of %s ...", topic.title())
169
170
    for path_glob in DEBRIS_TOPICS[topic]:
171
        delete_filesystem_objects(directory, path_glob)
172
173
174
def remove_freeform_targets(glob_patterns, yes, directory):
175
    """
176
    Remove free-form targets using globbing.
177
178
    This is **potentially dangerous** since users can delete everything
179
    anywhere in their file system, including the entire project they're
180
    working on. For this reason, the implementation imposes the following
181
    (user experience-related) restrictions:
182
183
    - Deleting (directories) is not recursive, directory contents must be
184
      explicitly specified using globbing (e.g. ``dirname/**/*``).
185
    - The user is responsible for the deletion order, so that a directory
186
      is empty when it is attempted to be deleted.
187
    - A confirmation prompt for the deletion of every single file system
188
      object is shown (unless the ``--yes`` option is used, in addition).
189
    """
190
    for path_glob in glob_patterns:
191
        log.debug("Erase file system objects matching: %s", path_glob)
192
        delete_filesystem_objects(directory, path_glob, prompt=not yes)
193
194
195
def delete_filesystem_objects(directory, path_glob, prompt=False):
196
    """
197
    Identifies all pathnames matching a specific glob pattern, and attempts
198
    to delete them in the proper order, optionally asking for confirmation.
199
200
    Implementation Note: We sort the file system objects in *reverse order*
201
    and first delete *all files* before removing directories. This way we
202
    make sure that the directories that are deepest down in the hierarchy
203
    are empty (for both files & directories) when we attempt to remove them.
204
    """
205
    all_names = sorted(directory.glob(path_glob), reverse=True)
206
    dirs = (name for name in all_names if name.is_dir() and not name.is_symlink())
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable name does not seem to be defined.
Loading history...
207
    files = (name for name in all_names if not name.is_dir() or name.is_symlink())
208
209
    for file_object in files:
210
        if prompt and not confirm('Delete %s %s' % (
211
            'symlink' if file_object.is_symlink() else 'file',
212
            file_object,
213
        )):
214
            Runner.unlink_failed += 1
215
            continue
216
        Runner.unlink(file_object)
217
218
    for dir_object in dirs:
219
        if prompt and not confirm('Remove empty directory %s' % dir_object):
220
            Runner.rmdir_failed += 1
221
            continue
222
        Runner.rmdir(dir_object)
223
224
225
def confirm(message):
226
    """An interactive confirmation prompt."""
227
    try:
228
        answer = input("%s? " % message)
229
        return answer.strip().lower() in ['y', 'yes']
230
    except KeyboardInterrupt:
231
        raise SystemExit('Aborted by user.')
232