Passed
Pull Request — main (#105)
by
unknown
01:11
created

pyclean.modern.descend_and_clean()   B

Complexity

Conditions 7

Size

Total Lines 19
Code Lines 12

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 7
eloc 12
nop 3
dl 0
loc 19
rs 8
c 0
b 0
f 0
1
# SPDX-FileCopyrightText: 2020 Peter Bittner <[email protected]>
2
#
3
# SPDX-License-Identifier: GPL-3.0-or-later
4
5
"""
6
Modern, cross-platform, pure-Python pyclean implementation.
7
"""
8
9
import logging
10
import os
11
import subprocess
12
from pathlib import Path
13
14
BYTECODE_DIRS = ['__pycache__']
15
BYTECODE_FILES = ['.pyc', '.pyo']
16
DEBRIS_TOPICS = {
17
    'cache': [
18
        '.cache/**/*',
19
        '.cache/',
20
    ],
21
    'coverage': [
22
        '.coverage',
23
        'coverage.json',
24
        'coverage.lcov',
25
        'coverage.xml',
26
        'htmlcov/**/*',
27
        'htmlcov/',
28
    ],
29
    'jupyter': [
30
        '.ipynb_checkpoints/**/*',
31
        '.ipynb_checkpoints/',
32
    ],
33
    'mypy': [
34
        '.mypy_cache/**/*',
35
        '.mypy_cache/',
36
    ],
37
    'package': [
38
        'build/bdist.*/**/*',
39
        'build/bdist.*/',
40
        'build/lib/**/*',
41
        'build/lib/',
42
        'build/',
43
        'dist/**/*',
44
        'dist/',
45
        'sdist/**/*',
46
        'sdist/',
47
        '*.egg-info/**/*',
48
        '*.egg-info/',
49
    ],
50
    'pytest': [
51
        '.pytest_cache/**/*',
52
        '.pytest_cache/',
53
        'pytestdebug.log',
54
    ],
55
    'ruff': [
56
        '.ruff_cache/**/*',
57
        '.ruff_cache/',
58
    ],
59
    'tox': [
60
        '.tox/**/*',
61
        '.tox/',
62
    ],
63
}
64
65
GIT_FATAL_ERROR = 128
66
67
68
class CleanupRunner:
69
    """Module-level configuration and value store."""
70
71
    def __init__(self):
72
        """Cleanup runner with optional dry-run behavior."""
73
        self.unlink = None
74
        self.rmdir = None
75
        self.ignore = None
76
        self.unlink_count = None
77
        self.unlink_failed = None
78
        self.rmdir_count = None
79
        self.rmdir_failed = None
80
81
    def configure(self, args):
82
        """Set up runner according to command line options."""
83
        self.unlink = print_filename if args.dry_run else remove_file
84
        self.rmdir = print_dirname if args.dry_run else remove_directory
85
        self.ignore = args.ignore
86
        self.unlink_count = 0
87
        self.unlink_failed = 0
88
        self.rmdir_count = 0
89
        self.rmdir_failed = 0
90
91
92
log = logging.getLogger(__name__)
93
Runner = CleanupRunner()
94
95
96
def normalize(path_pattern: str) -> str:
97
    """
98
    Normalize path separators in a pattern for cross-platform support.
99
100
    On Windows, both forward slash and backslash are valid path separators.
101
    On Unix/Posix, only forward slash is valid (backslash can be part of filename).
102
    """
103
    return path_pattern.replace(os.sep, os.altsep or os.sep)
104
105
106
def should_ignore(path: Path, ignore_patterns: list[str]) -> bool:
107
    """
108
    Check if a path should be ignored based on ignore patterns.
109
110
    Patterns can be:
111
    - Simple names like 'bar': matches any directory with that name
112
    - Paths like 'foo/bar': matches 'bar' directory inside 'foo' directory
113
      and also ignores everything inside that directory
114
    """
115
    if not ignore_patterns:
116
        return False
117
118
    for pattern in ignore_patterns:
119
        # Check if pattern has multiple components (is a path with separators)
120
        pattern_parts = Path(normalize(pattern)).parts
121
        if len(pattern_parts) > 1:
122
            # Pattern contains path separator - match relative path
123
            # Path must have at least as many parts as the pattern
124
            if len(path.parts) < len(pattern_parts):
125
                continue
126
            # Check if pattern matches anywhere in the path hierarchy
127
            for i in range(len(path.parts) - len(pattern_parts) + 1):
128
                path_slice = path.parts[i : i + len(pattern_parts)]
129
                if path_slice == pattern_parts:
130
                    return True
131
        # Simple name - match the directory name anywhere
132
        elif path.name == pattern:
133
            return True
134
    return False
135
136
137
def remove_file(fileobj):
138
    """Attempt to delete a file object for real."""
139
    log.debug('Deleting file: %s', fileobj)
140
    try:
141
        fileobj.unlink()
142
        Runner.unlink_count += 1
143
    except OSError as err:
144
        log.debug('File not deleted. %s', err)
145
        Runner.unlink_failed += 1
146
147
148
def remove_directory(dirobj):
149
    """Attempt to remove a directory object for real."""
150
    log.debug('Removing directory: %s', dirobj)
151
    try:
152
        dirobj.rmdir()
153
        Runner.rmdir_count += 1
154
    except OSError as err:
155
        log.debug('Directory not removed. %s', err)
156
        Runner.rmdir_failed += 1
157
158
159
def print_filename(fileobj):
160
    """Only display the file name, used with --dry-run."""
161
    log.debug('Would delete file: %s', fileobj)
162
    Runner.unlink_count += 1
163
164
165
def print_dirname(dirobj):
166
    """Only display the directory name, used with --dry-run."""
167
    log.debug('Would delete directory: %s', dirobj)
168
    Runner.rmdir_count += 1
169
170
171
def pyclean(args):
172
    """Cross-platform cleaning of Python bytecode."""
173
    Runner.configure(args)
174
175
    for dir_name in args.directory:
176
        dir_path = Path(dir_name)
177
178
        log.info('Cleaning directory %s', dir_path)
179
        descend_and_clean(dir_path, BYTECODE_FILES, BYTECODE_DIRS)
180
181
        for topic in args.debris:
182
            remove_debris_for(topic, dir_path)
183
184
        remove_freeform_targets(dir_path, args.erase, args.yes, args.dry_run)
185
186
        if args.folders:
187
            log.debug('Removing empty directories...')
188
            remove_empty_directories(dir_path)
189
190
        if args.git_clean:
191
            execute_git_clean(dir_path, args)
192
193
    log.info(
194
        'Total %d files, %d directories %s.',
195
        Runner.unlink_count,
196
        Runner.rmdir_count,
197
        'would be removed' if args.dry_run else 'removed',
198
    )
199
200
    if Runner.unlink_failed or Runner.rmdir_failed:
201
        log.debug(
202
            '%d files, %d directories %s not be removed.',
203
            Runner.unlink_failed,
204
            Runner.rmdir_failed,
205
            'would' if args.dry_run else 'could',
206
        )
207
208
    # Suggest --debris option if it wasn't used
209
    if not args.debris:
210
        suggest_debris_option(args)
211
212
213
def descend_and_clean(directory, file_types, dir_names):
214
    """
215
    Walk and descend a directory tree, cleaning up files of a certain type
216
    along the way. Only delete directories if they are empty, in the end.
217
    """
218
    for child in sorted(directory.iterdir()):
219
        if child.is_file():
220
            if child.suffix in file_types:
221
                Runner.unlink(child)
222
        elif child.is_dir():
223
            if should_ignore(child, Runner.ignore):
224
                log.debug('Skipping %s', child)
225
            else:
226
                descend_and_clean(child, file_types, dir_names)
227
228
            if child.name in dir_names:
229
                Runner.rmdir(child)
230
        else:
231
            log.debug('Ignoring %s (neither a file nor a folder)', child)
232
233
234
def remove_debris_for(topic, directory):
235
    """
236
    Clean up debris for a specific topic.
237
    """
238
    log.debug('Scanning for debris of %s ...', topic.title())
239
240
    patterns = DEBRIS_TOPICS[topic]
241
    recursive_delete_debris(directory, patterns)
242
243
244
def remove_empty_directories(directory):
245
    """
246
    Recursively remove empty directories in the given directory tree.
247
248
    This walks the directory tree in post-order (bottom-up), attempting to
249
    remove directories that are empty.
250
    """
251
    try:
252
        subdirs = [
253
            Path(entry.path) for entry in os.scandir(directory) if entry.is_dir()
254
        ]
255
    except (OSError, PermissionError) as err:
256
        log.warning('Cannot access directory %s: %s', directory, err)
257
        return
258
259
    for subdir in subdirs:
260
        if should_ignore(subdir, Runner.ignore):
261
            log.debug('Skipping %s', subdir)
262
        else:
263
            remove_empty_directories(subdir)  # recurse down the hierarchy
264
            try:
265
                if next(subdir.iterdir(), None) is None:
266
                    Runner.rmdir(subdir)
267
            except (OSError, PermissionError) as err:
268
                log.debug('Cannot check or remove directory %s: %s', subdir, err)
269
270
271
def remove_freeform_targets(directory, glob_patterns, yes, dry_run=False):
272
    """
273
    Remove free-form targets using globbing.
274
275
    This is **potentially dangerous** since users can delete everything
276
    anywhere in their file system, including the entire project they're
277
    working on. For this reason, the implementation imposes the following
278
    (user experience-related) restrictions:
279
280
    - Deleting (directories) is not recursive, directory contents must be
281
      explicitly specified using globbing (e.g. ``dirname/**/*``).
282
    - The user is responsible for the deletion order, so that a directory
283
      is empty when it is attempted to be deleted.
284
    - A confirmation prompt for the deletion of every single file system
285
      object is shown (unless the ``--yes`` option is used, in addition).
286
    """
287
    for path_glob in glob_patterns:
288
        log.debug('Erase file system objects matching: %s', path_glob)
289
        delete_filesystem_objects(directory, path_glob, prompt=not yes, dry_run=dry_run)
290
291
292
def recursive_delete_debris(directory, patterns):
293
    """
294
    Recursively delete debris matching any of the given patterns.
295
296
    This function walks the directory tree once and applies all patterns
297
    at each level, avoiding redundant directory scans.
298
    """
299
    for pattern in patterns:
300
        delete_filesystem_objects(directory, pattern)
301
302
    try:
303
        subdirs = (
304
            Path(entry.path) for entry in os.scandir(directory) if entry.is_dir()
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable entry does not seem to be defined.
Loading history...
305
        )
306
    except (OSError, PermissionError) as err:
307
        log.warning('Cannot access directory %s: %s', directory, err)
308
        return
309
310
    for subdir in subdirs:
311
        if should_ignore(subdir, Runner.ignore):
312
            log.debug('Skipping %s', subdir)
313
        else:
314
            recursive_delete_debris(subdir, patterns)
315
316
317
def delete_filesystem_objects(directory, path_glob, prompt=False, dry_run=False):
318
    """
319
    Identifies all pathnames matching a specific glob pattern, and attempts
320
    to delete them in the proper order, optionally asking for confirmation.
321
322
    Implementation Note: We sort the file system objects in *reverse order*
323
    and first delete *all files* before removing directories. This way we
324
    make sure that the directories that are deepest down in the hierarchy
325
    are empty (for both files & directories) when we attempt to remove them.
326
    """
327
    all_names = sorted(directory.glob(path_glob), reverse=True)
328
    dirs = (name for name in all_names if name.is_dir() and not name.is_symlink())
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable name does not seem to be defined.
Loading history...
329
    files = (name for name in all_names if not name.is_dir() or name.is_symlink())
330
331
    for file_object in files:
332
        file_type = 'symlink' if file_object.is_symlink() else 'file'
333
        if (
334
            not dry_run
335
            and prompt
336
            and not confirm('Delete %s %s' % (file_type, file_object))
337
        ):
338
            Runner.unlink_failed += 1
339
            continue
340
        Runner.unlink(file_object)
341
342
    for dir_object in dirs:
343
        if (
344
            not dry_run
345
            and prompt
346
            and not confirm('Remove empty directory %s' % dir_object)
347
        ):
348
            Runner.rmdir_failed += 1
349
            continue
350
        Runner.rmdir(dir_object)
351
352
353
def confirm(message):
354
    """An interactive confirmation prompt."""
355
    try:
356
        answer = input('%s? ' % message)
357
        return answer.strip().lower() in ['y', 'yes']
358
    except KeyboardInterrupt:
359
        msg = 'Aborted by user.'
360
        raise SystemExit(msg)
361
362
363
def detect_debris_in_directory(directory):
364
    """
365
    Scan a directory for debris artifacts and return a list of detected topics.
366
    """
367
    detected_topics = []
368
369
    for topic, patterns in DEBRIS_TOPICS.items():
370
        for pattern in patterns:
371
            # Skip patterns that are for recursive cleanup (contain **)
372
            if '**' in pattern:
373
                continue
374
            # Check if the pattern matches anything in the directory
375
            matches = list(directory.glob(pattern))
376
            if matches:
377
                detected_topics.append(topic)
378
                break  # Found at least one match for this topic, move to next
379
380
    return detected_topics
381
382
383
def suggest_debris_option(args):
384
    """
385
    Suggest using the --debris option when it wasn't used.
386
    Optionally provide targeted suggestions based on detected artifacts.
387
    """
388
    # Collect all detected debris topics across all directories
389
    all_detected = set()
390
    for dir_name in args.directory:
391
        dir_path = Path(dir_name)
392
        if dir_path.exists():
393
            detected = detect_debris_in_directory(dir_path)
394
            all_detected.update(detected)
395
396
    if all_detected:
397
        # Provide targeted suggestion
398
        topics_str = ' '.join(sorted(all_detected))
399
        log.info(
400
            'Hint: Use --debris to also clean up build artifacts. Detected: %s',
401
            topics_str,
402
        )
403
    else:
404
        # Provide general suggestion
405
        log.info(
406
            'Hint: Use --debris to also clean up build artifacts '
407
            'from common Python development tools.',
408
        )
409
410
411
def build_git_clean_command(ignore_patterns: list[str], dry_run: bool = False, force: bool = False) -> list[str]:
412
    """Build the git clean command with appropriate flags."""
413
    exclude = (item for pattern in ignore_patterns for item in ['-e', pattern])
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable item does not seem to be defined.
Loading history...
414
    mode = '-n' if dry_run else '-f' if force else '-i'
415
    return ['git', 'clean', '-dx', *exclude, mode]
416
417
418
def execute_git_clean(directory, args):
419
    """
420
    Execute git clean in the specified directory.
421
    """
422
    cmd = build_git_clean_command(
423
        args.ignore,
424
        dry_run=args.dry_run,
425
        force=args.yes
426
    )
427
428
    log.debug('%s (in %s)', ' '.join(cmd), directory)
429
430
    result = subprocess.run(
431
        cmd,
432
        cwd=directory,
433
        capture_output=True,
434
        text=True,
435
        check=False,
436
    )
437
438
    if result.stdout:
439
        log.info(result.stdout.rstrip())
440
    if result.stderr:
441
        log.warning(result.stderr.rstrip())
442
443
    if result.returncode == GIT_FATAL_ERROR:
444
        log.warning(
445
            'Directory %s is not under version control. Skipping git clean.',
446
            directory,
447
        )
448
    elif result.returncode:
449
        raise SystemExit(result.returncode)
450