Passed
Pull Request — main (#105)
by
unknown
01:14
created

pyclean.modern.run_git_clean()   B

Complexity

Conditions 7

Size

Total Lines 50
Code Lines 25

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 7
eloc 25
nop 4
dl 0
loc 50
rs 7.8799
c 0
b 0
f 0
1
# SPDX-FileCopyrightText: 2020 Peter Bittner <[email protected]>
2
#
3
# SPDX-License-Identifier: GPL-3.0-or-later
4
5
"""
6
Modern, cross-platform, pure-Python pyclean implementation.
7
"""
8
9
import logging
10
import os
11
import subprocess
12
from pathlib import Path
13
14
BYTECODE_FILES = ['.pyc', '.pyo']
15
BYTECODE_DIRS = ['__pycache__']
16
DEBRIS_TOPICS = {
17
    'cache': [
18
        '.cache/**/*',
19
        '.cache/',
20
    ],
21
    'coverage': [
22
        '.coverage',
23
        'coverage.json',
24
        'coverage.lcov',
25
        'coverage.xml',
26
        'htmlcov/**/*',
27
        'htmlcov/',
28
    ],
29
    'jupyter': [
30
        '.ipynb_checkpoints/**/*',
31
        '.ipynb_checkpoints/',
32
    ],
33
    'mypy': [
34
        '.mypy_cache/**/*',
35
        '.mypy_cache/',
36
    ],
37
    'package': [
38
        'build/bdist.*/**/*',
39
        'build/bdist.*/',
40
        'build/lib/**/*',
41
        'build/lib/',
42
        'build/',
43
        'dist/**/*',
44
        'dist/',
45
        'sdist/**/*',
46
        'sdist/',
47
        '*.egg-info/**/*',
48
        '*.egg-info/',
49
    ],
50
    'pytest': [
51
        '.pytest_cache/**/*',
52
        '.pytest_cache/',
53
        'pytestdebug.log',
54
    ],
55
    'ruff': [
56
        '.ruff_cache/**/*',
57
        '.ruff_cache/',
58
    ],
59
    'tox': [
60
        '.tox/**/*',
61
        '.tox/',
62
    ],
63
}
64
65
66
class CleanupRunner:
67
    """Module-level configuration and value store."""
68
69
    def __init__(self):
70
        """Cleanup runner with optional dry-run behavior."""
71
        self.unlink = None
72
        self.rmdir = None
73
        self.ignore = None
74
        self.unlink_count = None
75
        self.unlink_failed = None
76
        self.rmdir_count = None
77
        self.rmdir_failed = None
78
79
    def configure(self, args):
80
        """Set up runner according to command line options."""
81
        self.unlink = print_filename if args.dry_run else remove_file
82
        self.rmdir = print_dirname if args.dry_run else remove_directory
83
        self.ignore = args.ignore
84
        self.unlink_count = 0
85
        self.unlink_failed = 0
86
        self.rmdir_count = 0
87
        self.rmdir_failed = 0
88
89
90
log = logging.getLogger(__name__)
91
Runner = CleanupRunner()
92
93
94
def normalize(path_pattern: str) -> str:
95
    """
96
    Normalize path separators in a pattern for cross-platform support.
97
98
    On Windows, both forward slash and backslash are valid path separators.
99
    On Unix/Posix, only forward slash is valid (backslash can be part of filename).
100
    """
101
    return path_pattern.replace(os.sep, os.altsep or os.sep)
102
103
104
def should_ignore(path: Path, ignore_patterns: list[str]) -> bool:
105
    """
106
    Check if a path should be ignored based on ignore patterns.
107
108
    Patterns can be:
109
    - Simple names like 'bar': matches any directory with that name
110
    - Paths like 'foo/bar': matches 'bar' directory inside 'foo' directory
111
      and also ignores everything inside that directory
112
    """
113
    if not ignore_patterns:
114
        return False
115
116
    for pattern in ignore_patterns:
117
        # Check if pattern has multiple components (is a path with separators)
118
        pattern_parts = Path(normalize(pattern)).parts
119
        if len(pattern_parts) > 1:
120
            # Pattern contains path separator - match relative path
121
            # Path must have at least as many parts as the pattern
122
            if len(path.parts) < len(pattern_parts):
123
                continue
124
            # Check if pattern matches anywhere in the path hierarchy
125
            for i in range(len(path.parts) - len(pattern_parts) + 1):
126
                path_slice = path.parts[i : i + len(pattern_parts)]
127
                if path_slice == pattern_parts:
128
                    return True
129
        # Simple name - match the directory name anywhere
130
        elif path.name == pattern:
131
            return True
132
    return False
133
134
135
def remove_file(fileobj):
136
    """Attempt to delete a file object for real."""
137
    log.debug('Deleting file: %s', fileobj)
138
    try:
139
        fileobj.unlink()
140
        Runner.unlink_count += 1
141
    except OSError as err:
142
        log.debug('File not deleted. %s', err)
143
        Runner.unlink_failed += 1
144
145
146
def remove_directory(dirobj):
147
    """Attempt to remove a directory object for real."""
148
    log.debug('Removing directory: %s', dirobj)
149
    try:
150
        dirobj.rmdir()
151
        Runner.rmdir_count += 1
152
    except OSError as err:
153
        log.debug('Directory not removed. %s', err)
154
        Runner.rmdir_failed += 1
155
156
157
def print_filename(fileobj):
158
    """Only display the file name, used with --dry-run."""
159
    log.debug('Would delete file: %s', fileobj)
160
    Runner.unlink_count += 1
161
162
163
def print_dirname(dirobj):
164
    """Only display the directory name, used with --dry-run."""
165
    log.debug('Would delete directory: %s', dirobj)
166
    Runner.rmdir_count += 1
167
168
169
def pyclean(args):
170
    """Cross-platform cleaning of Python bytecode."""
171
    Runner.configure(args)
172
    
173
    # Ensure git_clean attribute exists (for tests that create args manually)
174
    if not hasattr(args, 'git_clean'):
175
        args.git_clean = False
176
177
    for dir_name in args.directory:
178
        dir_path = Path(dir_name)
179
180
        log.info('Cleaning directory %s', dir_path)
181
        descend_and_clean(dir_path, BYTECODE_FILES, BYTECODE_DIRS)
182
183
        for topic in args.debris:
184
            remove_debris_for(topic, dir_path)
185
186
        remove_freeform_targets(dir_path, args.erase, args.yes, args.dry_run)
187
188
        if args.folders:
189
            log.debug('Removing empty directories...')
190
            remove_empty_directories(dir_path)
191
192
    log.info(
193
        'Total %d files, %d directories %s.',
194
        Runner.unlink_count,
195
        Runner.rmdir_count,
196
        'would be removed' if args.dry_run else 'removed',
197
    )
198
199
    if Runner.unlink_failed or Runner.rmdir_failed:
200
        log.debug(
201
            '%d files, %d directories %s not be removed.',
202
            Runner.unlink_failed,
203
            Runner.rmdir_failed,
204
            'would' if args.dry_run else 'could',
205
        )
206
207
    # Suggest --debris option if it wasn't used
208
    if not args.debris:
209
        suggest_debris_option(args)
210
211
    # Run git clean as the very last step if requested
212
    if args.git_clean:
213
        log.info('Running git clean...')
214
        for dir_name in args.directory:
215
            dir_path = Path(dir_name)
216
            exit_code = run_git_clean(
217
                dir_path, 
218
                args.ignore, 
219
                dry_run=args.dry_run,
220
                force=args.yes
221
            )
222
            
223
            if exit_code == 128:
224
                # Directory is not under version control
225
                log.warning(
226
                    'Directory %s is not under version control. Skipping git clean.',
227
                    dir_path,
228
                )
229
            elif exit_code != 0:
230
                # Other git errors should cause immediate exit
231
                msg = f'git clean failed with exit code {exit_code}'
232
                raise SystemExit(exit_code)
233
234
235
def descend_and_clean(directory, file_types, dir_names):
236
    """
237
    Walk and descend a directory tree, cleaning up files of a certain type
238
    along the way. Only delete directories if they are empty, in the end.
239
    """
240
    for child in sorted(directory.iterdir()):
241
        if child.is_file():
242
            if child.suffix in file_types:
243
                Runner.unlink(child)
244
        elif child.is_dir():
245
            if should_ignore(child, Runner.ignore):
246
                log.debug('Skipping %s', child)
247
            else:
248
                descend_and_clean(child, file_types, dir_names)
249
250
            if child.name in dir_names:
251
                Runner.rmdir(child)
252
        else:
253
            log.debug('Ignoring %s (neither a file nor a folder)', child)
254
255
256
def remove_debris_for(topic, directory):
257
    """
258
    Clean up debris for a specific topic.
259
    """
260
    log.debug('Scanning for debris of %s ...', topic.title())
261
262
    patterns = DEBRIS_TOPICS[topic]
263
    recursive_delete_debris(directory, patterns)
264
265
266
def remove_empty_directories(directory):
267
    """
268
    Recursively remove empty directories in the given directory tree.
269
270
    This walks the directory tree in post-order (bottom-up), attempting to
271
    remove directories that are empty.
272
    """
273
    try:
274
        subdirs = [
275
            Path(entry.path) for entry in os.scandir(directory) if entry.is_dir()
276
        ]
277
    except (OSError, PermissionError) as err:
278
        log.warning('Cannot access directory %s: %s', directory, err)
279
        return
280
281
    for subdir in subdirs:
282
        if should_ignore(subdir, Runner.ignore):
283
            log.debug('Skipping %s', subdir)
284
        else:
285
            remove_empty_directories(subdir)  # recurse down the hierarchy
286
            try:
287
                if next(subdir.iterdir(), None) is None:
288
                    Runner.rmdir(subdir)
289
            except (OSError, PermissionError) as err:
290
                log.debug('Cannot check or remove directory %s: %s', subdir, err)
291
292
293
def remove_freeform_targets(directory, glob_patterns, yes, dry_run=False):
294
    """
295
    Remove free-form targets using globbing.
296
297
    This is **potentially dangerous** since users can delete everything
298
    anywhere in their file system, including the entire project they're
299
    working on. For this reason, the implementation imposes the following
300
    (user experience-related) restrictions:
301
302
    - Deleting (directories) is not recursive, directory contents must be
303
      explicitly specified using globbing (e.g. ``dirname/**/*``).
304
    - The user is responsible for the deletion order, so that a directory
305
      is empty when it is attempted to be deleted.
306
    - A confirmation prompt for the deletion of every single file system
307
      object is shown (unless the ``--yes`` option is used, in addition).
308
    """
309
    for path_glob in glob_patterns:
310
        log.debug('Erase file system objects matching: %s', path_glob)
311
        delete_filesystem_objects(directory, path_glob, prompt=not yes, dry_run=dry_run)
312
313
314
def recursive_delete_debris(directory, patterns):
315
    """
316
    Recursively delete debris matching any of the given patterns.
317
318
    This function walks the directory tree once and applies all patterns
319
    at each level, avoiding redundant directory scans.
320
    """
321
    for pattern in patterns:
322
        delete_filesystem_objects(directory, pattern)
323
324
    try:
325
        subdirs = (
326
            Path(entry.path) for entry in os.scandir(directory) if entry.is_dir()
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable entry does not seem to be defined.
Loading history...
327
        )
328
    except (OSError, PermissionError) as err:
329
        log.warning('Cannot access directory %s: %s', directory, err)
330
        return
331
332
    for subdir in subdirs:
333
        if should_ignore(subdir, Runner.ignore):
334
            log.debug('Skipping %s', subdir)
335
        else:
336
            recursive_delete_debris(subdir, patterns)
337
338
339
def delete_filesystem_objects(directory, path_glob, prompt=False, dry_run=False):
340
    """
341
    Identifies all pathnames matching a specific glob pattern, and attempts
342
    to delete them in the proper order, optionally asking for confirmation.
343
344
    Implementation Note: We sort the file system objects in *reverse order*
345
    and first delete *all files* before removing directories. This way we
346
    make sure that the directories that are deepest down in the hierarchy
347
    are empty (for both files & directories) when we attempt to remove them.
348
    """
349
    all_names = sorted(directory.glob(path_glob), reverse=True)
350
    dirs = (name for name in all_names if name.is_dir() and not name.is_symlink())
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable name does not seem to be defined.
Loading history...
351
    files = (name for name in all_names if not name.is_dir() or name.is_symlink())
352
353
    for file_object in files:
354
        file_type = 'symlink' if file_object.is_symlink() else 'file'
355
        if (
356
            not dry_run
357
            and prompt
358
            and not confirm('Delete %s %s' % (file_type, file_object))
359
        ):
360
            Runner.unlink_failed += 1
361
            continue
362
        Runner.unlink(file_object)
363
364
    for dir_object in dirs:
365
        if (
366
            not dry_run
367
            and prompt
368
            and not confirm('Remove empty directory %s' % dir_object)
369
        ):
370
            Runner.rmdir_failed += 1
371
            continue
372
        Runner.rmdir(dir_object)
373
374
375
def confirm(message):
376
    """An interactive confirmation prompt."""
377
    try:
378
        answer = input('%s? ' % message)
379
        return answer.strip().lower() in ['y', 'yes']
380
    except KeyboardInterrupt:
381
        msg = 'Aborted by user.'
382
        raise SystemExit(msg)
383
384
385
def detect_debris_in_directory(directory):
386
    """
387
    Scan a directory for debris artifacts and return a list of detected topics.
388
    """
389
    detected_topics = []
390
391
    for topic, patterns in DEBRIS_TOPICS.items():
392
        for pattern in patterns:
393
            # Skip patterns that are for recursive cleanup (contain **)
394
            if '**' in pattern:
395
                continue
396
            # Check if the pattern matches anything in the directory
397
            matches = list(directory.glob(pattern))
398
            if matches:
399
                detected_topics.append(topic)
400
                break  # Found at least one match for this topic, move to next
401
402
    return detected_topics
403
404
405
def suggest_debris_option(args):
406
    """
407
    Suggest using the --debris option when it wasn't used.
408
    Optionally provide targeted suggestions based on detected artifacts.
409
    """
410
    # Collect all detected debris topics across all directories
411
    all_detected = set()
412
    for dir_name in args.directory:
413
        dir_path = Path(dir_name)
414
        if dir_path.exists():
415
            detected = detect_debris_in_directory(dir_path)
416
            all_detected.update(detected)
417
418
    if all_detected:
419
        # Provide targeted suggestion
420
        topics_str = ' '.join(sorted(all_detected))
421
        log.info(
422
            'Hint: Use --debris to also clean up build artifacts. Detected: %s',
423
            topics_str,
424
        )
425
    else:
426
        # Provide general suggestion
427
        log.info(
428
            'Hint: Use --debris to also clean up build artifacts '
429
            'from common Python development tools.',
430
        )
431
432
433
def run_git_clean(directory, ignore_patterns, dry_run=False, force=False):
434
    """
435
    Run git clean in the specified directory with appropriate flags.
436
437
    Args:
438
        directory: Path to the directory to clean
439
        ignore_patterns: List of patterns to exclude from git clean
440
        dry_run: If True, only show what would be deleted
441
        force: If True, delete without prompting (otherwise interactive)
442
443
    Returns:
444
        Exit code from git clean
445
    """
446
    # Build git clean command
447
    cmd = ['git', 'clean', '-dx']
448
449
    # Add exclude patterns for ignored directories
450
    for pattern in ignore_patterns:
451
        cmd.extend(['-e', pattern])
452
453
    # Add mode flag: -n for dry-run, -f for force, -i for interactive
454
    if dry_run:
455
        cmd.append('-n')
456
    elif force:
457
        cmd.append('-f')
458
    else:
459
        cmd.append('-i')
460
461
    log.debug('Running: %s in %s', ' '.join(cmd), directory)
462
463
    try:
464
        result = subprocess.run(
465
            cmd,
466
            cwd=directory,
467
            capture_output=True,
468
            text=True,
469
            check=False,
470
        )
471
472
        # Print output from git clean
473
        if result.stdout:
474
            print(result.stdout, end='')
475
        if result.stderr:
476
            print(result.stderr, end='')
477
478
        return result.returncode
479
480
    except FileNotFoundError:
481
        log.error('Git command not found. Cannot execute git clean.')
482
        return 127
483