Passed
Push — main ( c1ae8e...0e16f8 )
by Peter
01:21
created

pyclean.modern.build_git_clean_command()   A

Complexity

Conditions 3

Size

Total Lines 9
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 3
eloc 7
nop 3
dl 0
loc 9
rs 10
c 0
b 0
f 0
1
# SPDX-FileCopyrightText: 2020 Peter Bittner <[email protected]>
2
#
3
# SPDX-License-Identifier: GPL-3.0-or-later
4
5
"""
6
Modern, cross-platform, pure-Python pyclean implementation.
7
"""
8
9
import logging
10
import os
11
import subprocess
12
from pathlib import Path
13
14
BYTECODE_DIRS = ['__pycache__']
15
BYTECODE_FILES = ['.pyc', '.pyo']
16
DEBRIS_TOPICS = {
17
    'cache': [
18
        '.cache/**/*',
19
        '.cache/',
20
    ],
21
    'coverage': [
22
        '.coverage',
23
        'coverage.json',
24
        'coverage.lcov',
25
        'coverage.xml',
26
        'htmlcov/**/*',
27
        'htmlcov/',
28
    ],
29
    'jupyter': [
30
        '.ipynb_checkpoints/**/*',
31
        '.ipynb_checkpoints/',
32
    ],
33
    'mypy': [
34
        '.mypy_cache/**/*',
35
        '.mypy_cache/',
36
    ],
37
    'package': [
38
        'build/bdist.*/**/*',
39
        'build/bdist.*/',
40
        'build/lib/**/*',
41
        'build/lib/',
42
        'build/',
43
        'dist/**/*',
44
        'dist/',
45
        'sdist/**/*',
46
        'sdist/',
47
        '*.egg-info/**/*',
48
        '*.egg-info/',
49
    ],
50
    'pytest': [
51
        '.pytest_cache/**/*',
52
        '.pytest_cache/',
53
        'pytestdebug.log',
54
    ],
55
    'ruff': [
56
        '.ruff_cache/**/*',
57
        '.ruff_cache/',
58
    ],
59
    'tox': [
60
        '.tox/**/*',
61
        '.tox/',
62
    ],
63
}
64
GIT_FATAL_ERROR = 128
65
66
67
class CleanupRunner:
68
    """Module-level configuration and value store."""
69
70
    def __init__(self):
71
        """Cleanup runner with optional dry-run behavior."""
72
        self.unlink = None
73
        self.rmdir = None
74
        self.ignore = None
75
        self.unlink_count = None
76
        self.unlink_failed = None
77
        self.rmdir_count = None
78
        self.rmdir_failed = None
79
80
    def configure(self, args):
81
        """Set up runner according to command line options."""
82
        self.unlink = print_filename if args.dry_run else remove_file
83
        self.rmdir = print_dirname if args.dry_run else remove_directory
84
        self.ignore = args.ignore
85
        self.unlink_count = 0
86
        self.unlink_failed = 0
87
        self.rmdir_count = 0
88
        self.rmdir_failed = 0
89
90
91
log = logging.getLogger(__name__)
92
Runner = CleanupRunner()
93
94
95
def normalize(path_pattern: str) -> str:
96
    """
97
    Normalize path separators in a pattern for cross-platform support.
98
99
    On Windows, both forward slash and backslash are valid path separators.
100
    On Unix/Posix, only forward slash is valid (backslash can be part of filename).
101
    """
102
    return path_pattern.replace(os.sep, os.altsep or os.sep)
103
104
105
def should_ignore(path: Path, ignore_patterns: list[str]) -> bool:
106
    """
107
    Check if a path should be ignored based on ignore patterns.
108
109
    Patterns can be:
110
    - Simple names like 'bar': matches any directory with that name
111
    - Paths like 'foo/bar': matches 'bar' directory inside 'foo' directory
112
      and also ignores everything inside that directory
113
    """
114
    if not ignore_patterns:
115
        return False
116
117
    for pattern in ignore_patterns:
118
        # Check if pattern has multiple components (is a path with separators)
119
        pattern_parts = Path(normalize(pattern)).parts
120
        if len(pattern_parts) > 1:
121
            # Pattern contains path separator - match relative path
122
            # Path must have at least as many parts as the pattern
123
            if len(path.parts) < len(pattern_parts):
124
                continue
125
            # Check if pattern matches anywhere in the path hierarchy
126
            for i in range(len(path.parts) - len(pattern_parts) + 1):
127
                path_slice = path.parts[i : i + len(pattern_parts)]
128
                if path_slice == pattern_parts:
129
                    return True
130
        # Simple name - match the directory name anywhere
131
        elif path.name == pattern:
132
            return True
133
    return False
134
135
136
def remove_file(fileobj):
137
    """Attempt to delete a file object for real."""
138
    log.debug('Deleting file: %s', fileobj)
139
    try:
140
        fileobj.unlink()
141
        Runner.unlink_count += 1
142
    except OSError as err:
143
        log.debug('File not deleted. %s', err)
144
        Runner.unlink_failed += 1
145
146
147
def remove_directory(dirobj):
148
    """Attempt to remove a directory object for real."""
149
    log.debug('Removing directory: %s', dirobj)
150
    try:
151
        dirobj.rmdir()
152
        Runner.rmdir_count += 1
153
    except OSError as err:
154
        log.debug('Directory not removed. %s', err)
155
        Runner.rmdir_failed += 1
156
157
158
def print_filename(fileobj):
159
    """Only display the file name, used with --dry-run."""
160
    log.debug('Would delete file: %s', fileobj)
161
    Runner.unlink_count += 1
162
163
164
def print_dirname(dirobj):
165
    """Only display the directory name, used with --dry-run."""
166
    log.debug('Would delete directory: %s', dirobj)
167
    Runner.rmdir_count += 1
168
169
170
def pyclean(args):
171
    """Cross-platform cleaning of Python bytecode."""
172
    Runner.configure(args)
173
174
    for dir_name in args.directory:
175
        dir_path = Path(dir_name)
176
177
        log.info('Cleaning directory %s', dir_path)
178
        descend_and_clean(dir_path, BYTECODE_FILES, BYTECODE_DIRS)
179
180
        for topic in args.debris:
181
            remove_debris_for(topic, dir_path)
182
183
        remove_freeform_targets(dir_path, args.erase, args.yes, args.dry_run)
184
185
        if args.folders:
186
            log.debug('Removing empty directories...')
187
            remove_empty_directories(dir_path)
188
189
        if args.git_clean:
190
            execute_git_clean(dir_path, args)
191
192
    git_clean_note = ' (Not counting git clean)' if args.git_clean else ''
193
194
    log.info(
195
        'Total %d files, %d directories %s.%s',
196
        Runner.unlink_count,
197
        Runner.rmdir_count,
198
        'would be removed' if args.dry_run else 'removed',
199
        git_clean_note,
200
    )
201
202
    if Runner.unlink_failed or Runner.rmdir_failed:
203
        log.debug(
204
            '%d files, %d directories %s not be removed.%s',
205
            Runner.unlink_failed,
206
            Runner.rmdir_failed,
207
            'would' if args.dry_run else 'could',
208
            git_clean_note,
209
        )
210
211
    # Suggest --debris option if it wasn't used
212
    if not args.debris:
213
        suggest_debris_option(args)
214
215
216
def descend_and_clean(directory, file_types, dir_names):
217
    """
218
    Walk and descend a directory tree, cleaning up files of a certain type
219
    along the way. Only delete directories if they are empty, in the end.
220
    """
221
    for child in sorted(directory.iterdir()):
222
        if child.is_file():
223
            if child.suffix in file_types:
224
                Runner.unlink(child)
225
        elif child.is_dir():
226
            if should_ignore(child, Runner.ignore):
227
                log.debug('Skipping %s', child)
228
            else:
229
                descend_and_clean(child, file_types, dir_names)
230
231
            if child.name in dir_names:
232
                Runner.rmdir(child)
233
        else:
234
            log.debug('Ignoring %s (neither a file nor a folder)', child)
235
236
237
def remove_debris_for(topic, directory):
238
    """
239
    Clean up debris for a specific topic.
240
    """
241
    log.debug('Scanning for debris of %s ...', topic.title())
242
243
    patterns = DEBRIS_TOPICS[topic]
244
    recursive_delete_debris(directory, patterns)
245
246
247
def remove_empty_directories(directory):
248
    """
249
    Recursively remove empty directories in the given directory tree.
250
251
    This walks the directory tree in post-order (bottom-up), attempting to
252
    remove directories that are empty.
253
    """
254
    try:
255
        subdirs = [
256
            Path(entry.path) for entry in os.scandir(directory) if entry.is_dir()
257
        ]
258
    except (OSError, PermissionError) as err:
259
        log.warning('Cannot access directory %s: %s', directory, err)
260
        return
261
262
    for subdir in subdirs:
263
        if should_ignore(subdir, Runner.ignore):
264
            log.debug('Skipping %s', subdir)
265
        else:
266
            remove_empty_directories(subdir)  # recurse down the hierarchy
267
            try:
268
                if next(subdir.iterdir(), None) is None:
269
                    Runner.rmdir(subdir)
270
            except (OSError, PermissionError) as err:
271
                log.debug('Cannot check or remove directory %s: %s', subdir, err)
272
273
274
def remove_freeform_targets(directory, glob_patterns, yes, dry_run=False):
275
    """
276
    Remove free-form targets using globbing.
277
278
    This is **potentially dangerous** since users can delete everything
279
    anywhere in their file system, including the entire project they're
280
    working on. For this reason, the implementation imposes the following
281
    (user experience-related) restrictions:
282
283
    - Deleting (directories) is not recursive, directory contents must be
284
      explicitly specified using globbing (e.g. ``dirname/**/*``).
285
    - The user is responsible for the deletion order, so that a directory
286
      is empty when it is attempted to be deleted.
287
    - A confirmation prompt for the deletion of every single file system
288
      object is shown (unless the ``--yes`` option is used, in addition).
289
    """
290
    for path_glob in glob_patterns:
291
        log.debug('Erase file system objects matching: %s', path_glob)
292
        delete_filesystem_objects(directory, path_glob, prompt=not yes, dry_run=dry_run)
293
294
295
def recursive_delete_debris(directory, patterns):
296
    """
297
    Recursively delete debris matching any of the given patterns.
298
299
    This function walks the directory tree once and applies all patterns
300
    at each level, avoiding redundant directory scans.
301
    """
302
    for pattern in patterns:
303
        delete_filesystem_objects(directory, pattern)
304
305
    try:
306
        subdirs = (
307
            Path(entry.path) for entry in os.scandir(directory) if entry.is_dir()
308
        )
309
    except (OSError, PermissionError) as err:
310
        log.warning('Cannot access directory %s: %s', directory, err)
311
        return
312
313
    for subdir in subdirs:
314
        if should_ignore(subdir, Runner.ignore):
315
            log.debug('Skipping %s', subdir)
316
        else:
317
            recursive_delete_debris(subdir, patterns)
318
319
320
def delete_filesystem_objects(directory, path_glob, prompt=False, dry_run=False):
321
    """
322
    Identifies all pathnames matching a specific glob pattern, and attempts
323
    to delete them in the proper order, optionally asking for confirmation.
324
325
    Implementation Note: We sort the file system objects in *reverse order*
326
    and first delete *all files* before removing directories. This way we
327
    make sure that the directories that are deepest down in the hierarchy
328
    are empty (for both files & directories) when we attempt to remove them.
329
    """
330
    all_names = sorted(directory.glob(path_glob), reverse=True)
331
    dirs = (name for name in all_names if name.is_dir() and not name.is_symlink())
332
    files = (name for name in all_names if not name.is_dir() or name.is_symlink())
333
334
    for file_object in files:
335
        file_type = 'symlink' if file_object.is_symlink() else 'file'
336
        if (
337
            not dry_run
338
            and prompt
339
            and not confirm('Delete %s %s' % (file_type, file_object))
340
        ):
341
            Runner.unlink_failed += 1
342
            continue
343
        Runner.unlink(file_object)
344
345
    for dir_object in dirs:
346
        if (
347
            not dry_run
348
            and prompt
349
            and not confirm('Remove empty directory %s' % dir_object)
350
        ):
351
            Runner.rmdir_failed += 1
352
            continue
353
        Runner.rmdir(dir_object)
354
355
356
def confirm(message):
357
    """An interactive confirmation prompt."""
358
    try:
359
        answer = input('%s? ' % message)
360
        return answer.strip().lower() in ['y', 'yes']
361
    except KeyboardInterrupt:
362
        msg = 'Aborted by user.'
363
        raise SystemExit(msg)
364
365
366
def detect_debris_in_directory(directory):
367
    """
368
    Scan a directory for debris artifacts and return a list of detected topics.
369
    """
370
    detected_topics = []
371
372
    for topic, patterns in DEBRIS_TOPICS.items():
373
        for pattern in patterns:
374
            # Skip patterns that are for recursive cleanup (contain **)
375
            if '**' in pattern:
376
                continue
377
            # Check if the pattern matches anything in the directory
378
            matches = list(directory.glob(pattern))
379
            if matches:
380
                detected_topics.append(topic)
381
                break  # Found at least one match for this topic, move to next
382
383
    return detected_topics
384
385
386
def suggest_debris_option(args):
387
    """
388
    Suggest using the --debris option when it wasn't used.
389
    Optionally provide targeted suggestions based on detected artifacts.
390
    """
391
    # Collect all detected debris topics across all directories
392
    all_detected = set()
393
    for dir_name in args.directory:
394
        dir_path = Path(dir_name)
395
        if dir_path.exists():
396
            detected = detect_debris_in_directory(dir_path)
397
            all_detected.update(detected)
398
399
    if all_detected:
400
        # Provide targeted suggestion
401
        topics_str = ' '.join(sorted(all_detected))
402
        log.info(
403
            'Hint: Use --debris to also clean up build artifacts. Detected: %s',
404
            topics_str,
405
        )
406
    else:
407
        # Provide general suggestion
408
        log.info(
409
            'Hint: Use --debris to also clean up build artifacts '
410
            'from common Python development tools.',
411
        )
412
413
414
def build_git_clean_command(
415
    ignore_patterns: list[str],
416
    dry_run=False,
417
    force=False,
418
) -> list[str]:
419
    """Build the git clean command with appropriate flags."""
420
    exclude = (item for pattern in ignore_patterns for item in ['-e', pattern])
421
    mode = '-n' if dry_run else '-f' if force else '-i'
422
    return ['git', 'clean', '-dx', *exclude, mode]
423
424
425
def execute_git_clean(directory, args):
426
    """
427
    Execute git clean in the specified directory.
428
    """
429
    log.info('Executing git clean...')
430
    cmd = build_git_clean_command(args.ignore, dry_run=args.dry_run, force=args.yes)
431
432
    log.debug('Run: %s', ' '.join(cmd))
433
    result = subprocess.run(cmd, cwd=directory, check=False)  # noqa: S603
434
435
    if result.returncode == GIT_FATAL_ERROR:
436
        log.warning(
437
            'Directory %s is not under version control. Skipping git clean.',
438
            directory,
439
        )
440
    elif result.returncode:
441
        raise SystemExit(result.returncode)
442