Passed
Pull Request — main (#105)
by
unknown
01:17
created

pyclean.modern.confirm()   A

Complexity

Conditions 2

Size

Total Lines 8
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 2
eloc 7
nop 1
dl 0
loc 8
rs 10
c 0
b 0
f 0
1
# SPDX-FileCopyrightText: 2020 Peter Bittner <[email protected]>
2
#
3
# SPDX-License-Identifier: GPL-3.0-or-later
4
5
"""
6
Modern, cross-platform, pure-Python pyclean implementation.
7
"""
8
9
import logging
10
import os
11
import subprocess
12
from pathlib import Path
13
14
BYTECODE_FILES = ['.pyc', '.pyo']
15
BYTECODE_DIRS = ['__pycache__']
16
GIT_FATAL_ERROR = 128
17
DEBRIS_TOPICS = {
18
    'cache': [
19
        '.cache/**/*',
20
        '.cache/',
21
    ],
22
    'coverage': [
23
        '.coverage',
24
        'coverage.json',
25
        'coverage.lcov',
26
        'coverage.xml',
27
        'htmlcov/**/*',
28
        'htmlcov/',
29
    ],
30
    'jupyter': [
31
        '.ipynb_checkpoints/**/*',
32
        '.ipynb_checkpoints/',
33
    ],
34
    'mypy': [
35
        '.mypy_cache/**/*',
36
        '.mypy_cache/',
37
    ],
38
    'package': [
39
        'build/bdist.*/**/*',
40
        'build/bdist.*/',
41
        'build/lib/**/*',
42
        'build/lib/',
43
        'build/',
44
        'dist/**/*',
45
        'dist/',
46
        'sdist/**/*',
47
        'sdist/',
48
        '*.egg-info/**/*',
49
        '*.egg-info/',
50
    ],
51
    'pytest': [
52
        '.pytest_cache/**/*',
53
        '.pytest_cache/',
54
        'pytestdebug.log',
55
    ],
56
    'ruff': [
57
        '.ruff_cache/**/*',
58
        '.ruff_cache/',
59
    ],
60
    'tox': [
61
        '.tox/**/*',
62
        '.tox/',
63
    ],
64
}
65
66
67
class CleanupRunner:
68
    """Module-level configuration and value store."""
69
70
    def __init__(self):
71
        """Cleanup runner with optional dry-run behavior."""
72
        self.unlink = None
73
        self.rmdir = None
74
        self.ignore = None
75
        self.unlink_count = None
76
        self.unlink_failed = None
77
        self.rmdir_count = None
78
        self.rmdir_failed = None
79
80
    def configure(self, args):
81
        """Set up runner according to command line options."""
82
        self.unlink = print_filename if args.dry_run else remove_file
83
        self.rmdir = print_dirname if args.dry_run else remove_directory
84
        self.ignore = args.ignore
85
        self.unlink_count = 0
86
        self.unlink_failed = 0
87
        self.rmdir_count = 0
88
        self.rmdir_failed = 0
89
90
91
log = logging.getLogger(__name__)
92
Runner = CleanupRunner()
93
94
95
def normalize(path_pattern: str) -> str:
96
    """
97
    Normalize path separators in a pattern for cross-platform support.
98
99
    On Windows, both forward slash and backslash are valid path separators.
100
    On Unix/Posix, only forward slash is valid (backslash can be part of filename).
101
    """
102
    return path_pattern.replace(os.sep, os.altsep or os.sep)
103
104
105
def should_ignore(path: Path, ignore_patterns: list[str]) -> bool:
106
    """
107
    Check if a path should be ignored based on ignore patterns.
108
109
    Patterns can be:
110
    - Simple names like 'bar': matches any directory with that name
111
    - Paths like 'foo/bar': matches 'bar' directory inside 'foo' directory
112
      and also ignores everything inside that directory
113
    """
114
    if not ignore_patterns:
115
        return False
116
117
    for pattern in ignore_patterns:
118
        # Check if pattern has multiple components (is a path with separators)
119
        pattern_parts = Path(normalize(pattern)).parts
120
        if len(pattern_parts) > 1:
121
            # Pattern contains path separator - match relative path
122
            # Path must have at least as many parts as the pattern
123
            if len(path.parts) < len(pattern_parts):
124
                continue
125
            # Check if pattern matches anywhere in the path hierarchy
126
            for i in range(len(path.parts) - len(pattern_parts) + 1):
127
                path_slice = path.parts[i : i + len(pattern_parts)]
128
                if path_slice == pattern_parts:
129
                    return True
130
        # Simple name - match the directory name anywhere
131
        elif path.name == pattern:
132
            return True
133
    return False
134
135
136
def remove_file(fileobj):
137
    """Attempt to delete a file object for real."""
138
    log.debug('Deleting file: %s', fileobj)
139
    try:
140
        fileobj.unlink()
141
        Runner.unlink_count += 1
142
    except OSError as err:
143
        log.debug('File not deleted. %s', err)
144
        Runner.unlink_failed += 1
145
146
147
def remove_directory(dirobj):
148
    """Attempt to remove a directory object for real."""
149
    log.debug('Removing directory: %s', dirobj)
150
    try:
151
        dirobj.rmdir()
152
        Runner.rmdir_count += 1
153
    except OSError as err:
154
        log.debug('Directory not removed. %s', err)
155
        Runner.rmdir_failed += 1
156
157
158
def print_filename(fileobj):
159
    """Only display the file name, used with --dry-run."""
160
    log.debug('Would delete file: %s', fileobj)
161
    Runner.unlink_count += 1
162
163
164
def print_dirname(dirobj):
165
    """Only display the directory name, used with --dry-run."""
166
    log.debug('Would delete directory: %s', dirobj)
167
    Runner.rmdir_count += 1
168
169
170
def pyclean(args):
171
    """Cross-platform cleaning of Python bytecode."""
172
    Runner.configure(args)
173
174
    for dir_name in args.directory:
175
        dir_path = Path(dir_name)
176
177
        log.info('Cleaning directory %s', dir_path)
178
        descend_and_clean(dir_path, BYTECODE_FILES, BYTECODE_DIRS)
179
180
        for topic in args.debris:
181
            remove_debris_for(topic, dir_path)
182
183
        remove_freeform_targets(dir_path, args.erase, args.yes, args.dry_run)
184
185
        if args.folders:
186
            log.debug('Removing empty directories...')
187
            remove_empty_directories(dir_path)
188
189
        if args.git_clean:
190
            execute_git_clean(dir_path, args)
191
192
    log.info(
193
        'Total %d files, %d directories %s.',
194
        Runner.unlink_count,
195
        Runner.rmdir_count,
196
        'would be removed' if args.dry_run else 'removed',
197
    )
198
199
    if Runner.unlink_failed or Runner.rmdir_failed:
200
        log.debug(
201
            '%d files, %d directories %s not be removed.',
202
            Runner.unlink_failed,
203
            Runner.rmdir_failed,
204
            'would' if args.dry_run else 'could',
205
        )
206
207
    # Suggest --debris option if it wasn't used
208
    if not args.debris:
209
        suggest_debris_option(args)
210
211
212
def descend_and_clean(directory, file_types, dir_names):
213
    """
214
    Walk and descend a directory tree, cleaning up files of a certain type
215
    along the way. Only delete directories if they are empty, in the end.
216
    """
217
    for child in sorted(directory.iterdir()):
218
        if child.is_file():
219
            if child.suffix in file_types:
220
                Runner.unlink(child)
221
        elif child.is_dir():
222
            if should_ignore(child, Runner.ignore):
223
                log.debug('Skipping %s', child)
224
            else:
225
                descend_and_clean(child, file_types, dir_names)
226
227
            if child.name in dir_names:
228
                Runner.rmdir(child)
229
        else:
230
            log.debug('Ignoring %s (neither a file nor a folder)', child)
231
232
233
def remove_debris_for(topic, directory):
234
    """
235
    Clean up debris for a specific topic.
236
    """
237
    log.debug('Scanning for debris of %s ...', topic.title())
238
239
    patterns = DEBRIS_TOPICS[topic]
240
    recursive_delete_debris(directory, patterns)
241
242
243
def remove_empty_directories(directory):
244
    """
245
    Recursively remove empty directories in the given directory tree.
246
247
    This walks the directory tree in post-order (bottom-up), attempting to
248
    remove directories that are empty.
249
    """
250
    try:
251
        subdirs = [
252
            Path(entry.path) for entry in os.scandir(directory) if entry.is_dir()
253
        ]
254
    except (OSError, PermissionError) as err:
255
        log.warning('Cannot access directory %s: %s', directory, err)
256
        return
257
258
    for subdir in subdirs:
259
        if should_ignore(subdir, Runner.ignore):
260
            log.debug('Skipping %s', subdir)
261
        else:
262
            remove_empty_directories(subdir)  # recurse down the hierarchy
263
            try:
264
                if next(subdir.iterdir(), None) is None:
265
                    Runner.rmdir(subdir)
266
            except (OSError, PermissionError) as err:
267
                log.debug('Cannot check or remove directory %s: %s', subdir, err)
268
269
270
def remove_freeform_targets(directory, glob_patterns, yes, dry_run=False):
271
    """
272
    Remove free-form targets using globbing.
273
274
    This is **potentially dangerous** since users can delete everything
275
    anywhere in their file system, including the entire project they're
276
    working on. For this reason, the implementation imposes the following
277
    (user experience-related) restrictions:
278
279
    - Deleting (directories) is not recursive, directory contents must be
280
      explicitly specified using globbing (e.g. ``dirname/**/*``).
281
    - The user is responsible for the deletion order, so that a directory
282
      is empty when it is attempted to be deleted.
283
    - A confirmation prompt for the deletion of every single file system
284
      object is shown (unless the ``--yes`` option is used, in addition).
285
    """
286
    for path_glob in glob_patterns:
287
        log.debug('Erase file system objects matching: %s', path_glob)
288
        delete_filesystem_objects(directory, path_glob, prompt=not yes, dry_run=dry_run)
289
290
291
def recursive_delete_debris(directory, patterns):
292
    """
293
    Recursively delete debris matching any of the given patterns.
294
295
    This function walks the directory tree once and applies all patterns
296
    at each level, avoiding redundant directory scans.
297
    """
298
    for pattern in patterns:
299
        delete_filesystem_objects(directory, pattern)
300
301
    try:
302
        subdirs = (
303
            Path(entry.path) for entry in os.scandir(directory) if entry.is_dir()
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable entry does not seem to be defined.
Loading history...
304
        )
305
    except (OSError, PermissionError) as err:
306
        log.warning('Cannot access directory %s: %s', directory, err)
307
        return
308
309
    for subdir in subdirs:
310
        if should_ignore(subdir, Runner.ignore):
311
            log.debug('Skipping %s', subdir)
312
        else:
313
            recursive_delete_debris(subdir, patterns)
314
315
316
def delete_filesystem_objects(directory, path_glob, prompt=False, dry_run=False):
317
    """
318
    Identifies all pathnames matching a specific glob pattern, and attempts
319
    to delete them in the proper order, optionally asking for confirmation.
320
321
    Implementation Note: We sort the file system objects in *reverse order*
322
    and first delete *all files* before removing directories. This way we
323
    make sure that the directories that are deepest down in the hierarchy
324
    are empty (for both files & directories) when we attempt to remove them.
325
    """
326
    all_names = sorted(directory.glob(path_glob), reverse=True)
327
    dirs = (name for name in all_names if name.is_dir() and not name.is_symlink())
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable name does not seem to be defined.
Loading history...
328
    files = (name for name in all_names if not name.is_dir() or name.is_symlink())
329
330
    for file_object in files:
331
        file_type = 'symlink' if file_object.is_symlink() else 'file'
332
        if (
333
            not dry_run
334
            and prompt
335
            and not confirm('Delete %s %s' % (file_type, file_object))
336
        ):
337
            Runner.unlink_failed += 1
338
            continue
339
        Runner.unlink(file_object)
340
341
    for dir_object in dirs:
342
        if (
343
            not dry_run
344
            and prompt
345
            and not confirm('Remove empty directory %s' % dir_object)
346
        ):
347
            Runner.rmdir_failed += 1
348
            continue
349
        Runner.rmdir(dir_object)
350
351
352
def confirm(message):
353
    """An interactive confirmation prompt."""
354
    try:
355
        answer = input('%s? ' % message)
356
        return answer.strip().lower() in ['y', 'yes']
357
    except KeyboardInterrupt:
358
        msg = 'Aborted by user.'
359
        raise SystemExit(msg)
360
361
362
def detect_debris_in_directory(directory):
363
    """
364
    Scan a directory for debris artifacts and return a list of detected topics.
365
    """
366
    detected_topics = []
367
368
    for topic, patterns in DEBRIS_TOPICS.items():
369
        for pattern in patterns:
370
            # Skip patterns that are for recursive cleanup (contain **)
371
            if '**' in pattern:
372
                continue
373
            # Check if the pattern matches anything in the directory
374
            matches = list(directory.glob(pattern))
375
            if matches:
376
                detected_topics.append(topic)
377
                break  # Found at least one match for this topic, move to next
378
379
    return detected_topics
380
381
382
def suggest_debris_option(args):
383
    """
384
    Suggest using the --debris option when it wasn't used.
385
    Optionally provide targeted suggestions based on detected artifacts.
386
    """
387
    # Collect all detected debris topics across all directories
388
    all_detected = set()
389
    for dir_name in args.directory:
390
        dir_path = Path(dir_name)
391
        if dir_path.exists():
392
            detected = detect_debris_in_directory(dir_path)
393
            all_detected.update(detected)
394
395
    if all_detected:
396
        # Provide targeted suggestion
397
        topics_str = ' '.join(sorted(all_detected))
398
        log.info(
399
            'Hint: Use --debris to also clean up build artifacts. Detected: %s',
400
            topics_str,
401
        )
402
    else:
403
        # Provide general suggestion
404
        log.info(
405
            'Hint: Use --debris to also clean up build artifacts '
406
            'from common Python development tools.',
407
        )
408
409
410
def build_git_clean_command(ignore_patterns, dry_run=False, force=False):
411
    """
412
    Build the git clean command with appropriate flags.
413
414
    Args:
415
        ignore_patterns: List of patterns to exclude from git clean
416
        dry_run: If True, only show what would be deleted
417
        force: If True, delete without prompting (otherwise interactive)
418
419
    Returns:
420
        List of command arguments
421
    """
422
    cmd = ['git', 'clean', '-dx']
423
    cmd.extend(item for pattern in ignore_patterns for item in ['-e', pattern])
424
425
    if dry_run:
426
        cmd.append('-n')
427
    elif force:
428
        cmd.append('-f')
429
    else:
430
        cmd.append('-i')
431
432
    return cmd
433
434
435
def execute_git_clean(directory, args):
436
    """
437
    Execute git clean in the specified directory.
438
    """
439
    cmd = build_git_clean_command(
440
        args.ignore,
441
        dry_run=args.dry_run,
442
        force=args.yes
443
    )
444
445
    log.debug('Running: %s in %s', ' '.join(cmd), directory)
446
447
    result = subprocess.run(
448
        cmd,
449
        cwd=directory,
450
        capture_output=True,
451
        text=True,
452
        check=False,
453
    )
454
455
    if result.stdout:
456
        log.info(result.stdout.rstrip())
457
    if result.stderr:
458
        log.warning(result.stderr.rstrip())
459
460
    if result.returncode == GIT_FATAL_ERROR:
461
        log.warning(
462
            'Directory %s is not under version control. Skipping git clean.',
463
            directory,
464
        )
465
    elif result.returncode:
466
        raise SystemExit(result.returncode)
467
468
469
def run_git_clean(directory, ignore_patterns, dry_run=False, force=False):
470
    """
471
    Run git clean in the specified directory with appropriate flags.
472
473
    Args:
474
        directory: Path to the directory to clean
475
        ignore_patterns: List of patterns to exclude from git clean
476
        dry_run: If True, only show what would be deleted
477
        force: If True, delete without prompting (otherwise interactive)
478
479
    Returns:
480
        Exit code from git clean
481
    """
482
    cmd = build_git_clean_command(ignore_patterns, dry_run, force)
483
484
    log.debug('Running: %s in %s', ' '.join(cmd), directory)
485
486
    result = subprocess.run(
487
        cmd,
488
        cwd=directory,
489
        capture_output=True,
490
        text=True,
491
        check=False,
492
    )
493
494
    if result.stdout:
495
        log.info(result.stdout.rstrip())
496
    if result.stderr:
497
        log.warning(result.stderr.rstrip())
498
499
    return result.returncode
500