Passed
Pull Request — main (#105)
by
unknown
01:12
created

pyclean.modern   F

Complexity

Total Complexity 83

Size/Duplication

Total Lines 505
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 280
dl 0
loc 505
rs 2
c 0
b 0
f 0
wmc 83

19 Functions

Rating   Name   Duplication   Size   Complexity  
A remove_directory() 0 9 2
A print_filename() 0 4 1
A remove_file() 0 9 2
A print_dirname() 0 4 1
A normalize() 0 8 1
B should_ignore() 0 29 8
A confirm() 0 8 2
C delete_filesystem_objects() 0 34 10
A detect_debris_in_directory() 0 18 5
A recursive_delete_debris() 0 23 5
A build_git_clean_command() 0 25 4
A run_git_clean() 0 31 3
B descend_and_clean() 0 19 7
B remove_empty_directories() 0 25 6
A remove_freeform_targets() 0 19 2
B execute_git_clean() 0 36 5
A remove_debris_for() 0 8 1
C pyclean() 0 40 10
A suggest_debris_option() 0 24 4

2 Methods

Rating   Name   Duplication   Size   Complexity  
A CleanupRunner.configure() 0 9 3
A CleanupRunner.__init__() 0 9 1

How to fix   Complexity   

Complexity

Complex classes like pyclean.modern often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# SPDX-FileCopyrightText: 2020 Peter Bittner <[email protected]>
2
#
3
# SPDX-License-Identifier: GPL-3.0-or-later
4
5
"""
6
Modern, cross-platform, pure-Python pyclean implementation.
7
"""
8
9
import logging
10
import os
11
import subprocess
12
from pathlib import Path
13
14
BYTECODE_FILES = ['.pyc', '.pyo']
15
BYTECODE_DIRS = ['__pycache__']
16
DEBRIS_TOPICS = {
17
    'cache': [
18
        '.cache/**/*',
19
        '.cache/',
20
    ],
21
    'coverage': [
22
        '.coverage',
23
        'coverage.json',
24
        'coverage.lcov',
25
        'coverage.xml',
26
        'htmlcov/**/*',
27
        'htmlcov/',
28
    ],
29
    'jupyter': [
30
        '.ipynb_checkpoints/**/*',
31
        '.ipynb_checkpoints/',
32
    ],
33
    'mypy': [
34
        '.mypy_cache/**/*',
35
        '.mypy_cache/',
36
    ],
37
    'package': [
38
        'build/bdist.*/**/*',
39
        'build/bdist.*/',
40
        'build/lib/**/*',
41
        'build/lib/',
42
        'build/',
43
        'dist/**/*',
44
        'dist/',
45
        'sdist/**/*',
46
        'sdist/',
47
        '*.egg-info/**/*',
48
        '*.egg-info/',
49
    ],
50
    'pytest': [
51
        '.pytest_cache/**/*',
52
        '.pytest_cache/',
53
        'pytestdebug.log',
54
    ],
55
    'ruff': [
56
        '.ruff_cache/**/*',
57
        '.ruff_cache/',
58
    ],
59
    'tox': [
60
        '.tox/**/*',
61
        '.tox/',
62
    ],
63
}
64
65
66
class CleanupRunner:
67
    """Module-level configuration and value store."""
68
69
    def __init__(self):
70
        """Cleanup runner with optional dry-run behavior."""
71
        self.unlink = None
72
        self.rmdir = None
73
        self.ignore = None
74
        self.unlink_count = None
75
        self.unlink_failed = None
76
        self.rmdir_count = None
77
        self.rmdir_failed = None
78
79
    def configure(self, args):
80
        """Set up runner according to command line options."""
81
        self.unlink = print_filename if args.dry_run else remove_file
82
        self.rmdir = print_dirname if args.dry_run else remove_directory
83
        self.ignore = args.ignore
84
        self.unlink_count = 0
85
        self.unlink_failed = 0
86
        self.rmdir_count = 0
87
        self.rmdir_failed = 0
88
89
90
log = logging.getLogger(__name__)
91
Runner = CleanupRunner()
92
93
94
def normalize(path_pattern: str) -> str:
95
    """
96
    Normalize path separators in a pattern for cross-platform support.
97
98
    On Windows, both forward slash and backslash are valid path separators.
99
    On Unix/Posix, only forward slash is valid (backslash can be part of filename).
100
    """
101
    return path_pattern.replace(os.sep, os.altsep or os.sep)
102
103
104
def should_ignore(path: Path, ignore_patterns: list[str]) -> bool:
105
    """
106
    Check if a path should be ignored based on ignore patterns.
107
108
    Patterns can be:
109
    - Simple names like 'bar': matches any directory with that name
110
    - Paths like 'foo/bar': matches 'bar' directory inside 'foo' directory
111
      and also ignores everything inside that directory
112
    """
113
    if not ignore_patterns:
114
        return False
115
116
    for pattern in ignore_patterns:
117
        # Check if pattern has multiple components (is a path with separators)
118
        pattern_parts = Path(normalize(pattern)).parts
119
        if len(pattern_parts) > 1:
120
            # Pattern contains path separator - match relative path
121
            # Path must have at least as many parts as the pattern
122
            if len(path.parts) < len(pattern_parts):
123
                continue
124
            # Check if pattern matches anywhere in the path hierarchy
125
            for i in range(len(path.parts) - len(pattern_parts) + 1):
126
                path_slice = path.parts[i : i + len(pattern_parts)]
127
                if path_slice == pattern_parts:
128
                    return True
129
        # Simple name - match the directory name anywhere
130
        elif path.name == pattern:
131
            return True
132
    return False
133
134
135
def remove_file(fileobj):
136
    """Attempt to delete a file object for real."""
137
    log.debug('Deleting file: %s', fileobj)
138
    try:
139
        fileobj.unlink()
140
        Runner.unlink_count += 1
141
    except OSError as err:
142
        log.debug('File not deleted. %s', err)
143
        Runner.unlink_failed += 1
144
145
146
def remove_directory(dirobj):
147
    """Attempt to remove a directory object for real."""
148
    log.debug('Removing directory: %s', dirobj)
149
    try:
150
        dirobj.rmdir()
151
        Runner.rmdir_count += 1
152
    except OSError as err:
153
        log.debug('Directory not removed. %s', err)
154
        Runner.rmdir_failed += 1
155
156
157
def print_filename(fileobj):
158
    """Only display the file name, used with --dry-run."""
159
    log.debug('Would delete file: %s', fileobj)
160
    Runner.unlink_count += 1
161
162
163
def print_dirname(dirobj):
164
    """Only display the directory name, used with --dry-run."""
165
    log.debug('Would delete directory: %s', dirobj)
166
    Runner.rmdir_count += 1
167
168
169
def pyclean(args):
170
    """Cross-platform cleaning of Python bytecode."""
171
    Runner.configure(args)
172
173
    for dir_name in args.directory:
174
        dir_path = Path(dir_name)
175
176
        log.info('Cleaning directory %s', dir_path)
177
        descend_and_clean(dir_path, BYTECODE_FILES, BYTECODE_DIRS)
178
179
        for topic in args.debris:
180
            remove_debris_for(topic, dir_path)
181
182
        remove_freeform_targets(dir_path, args.erase, args.yes, args.dry_run)
183
184
        if args.folders:
185
            log.debug('Removing empty directories...')
186
            remove_empty_directories(dir_path)
187
188
        if args.git_clean:
189
            execute_git_clean(dir_path, args)
190
191
    log.info(
192
        'Total %d files, %d directories %s.',
193
        Runner.unlink_count,
194
        Runner.rmdir_count,
195
        'would be removed' if args.dry_run else 'removed',
196
    )
197
198
    if Runner.unlink_failed or Runner.rmdir_failed:
199
        log.debug(
200
            '%d files, %d directories %s not be removed.',
201
            Runner.unlink_failed,
202
            Runner.rmdir_failed,
203
            'would' if args.dry_run else 'could',
204
        )
205
206
    # Suggest --debris option if it wasn't used
207
    if not args.debris:
208
        suggest_debris_option(args)
209
210
211
def descend_and_clean(directory, file_types, dir_names):
212
    """
213
    Walk and descend a directory tree, cleaning up files of a certain type
214
    along the way. Only delete directories if they are empty, in the end.
215
    """
216
    for child in sorted(directory.iterdir()):
217
        if child.is_file():
218
            if child.suffix in file_types:
219
                Runner.unlink(child)
220
        elif child.is_dir():
221
            if should_ignore(child, Runner.ignore):
222
                log.debug('Skipping %s', child)
223
            else:
224
                descend_and_clean(child, file_types, dir_names)
225
226
            if child.name in dir_names:
227
                Runner.rmdir(child)
228
        else:
229
            log.debug('Ignoring %s (neither a file nor a folder)', child)
230
231
232
def remove_debris_for(topic, directory):
233
    """
234
    Clean up debris for a specific topic.
235
    """
236
    log.debug('Scanning for debris of %s ...', topic.title())
237
238
    patterns = DEBRIS_TOPICS[topic]
239
    recursive_delete_debris(directory, patterns)
240
241
242
def remove_empty_directories(directory):
243
    """
244
    Recursively remove empty directories in the given directory tree.
245
246
    This walks the directory tree in post-order (bottom-up), attempting to
247
    remove directories that are empty.
248
    """
249
    try:
250
        subdirs = [
251
            Path(entry.path) for entry in os.scandir(directory) if entry.is_dir()
252
        ]
253
    except (OSError, PermissionError) as err:
254
        log.warning('Cannot access directory %s: %s', directory, err)
255
        return
256
257
    for subdir in subdirs:
258
        if should_ignore(subdir, Runner.ignore):
259
            log.debug('Skipping %s', subdir)
260
        else:
261
            remove_empty_directories(subdir)  # recurse down the hierarchy
262
            try:
263
                if next(subdir.iterdir(), None) is None:
264
                    Runner.rmdir(subdir)
265
            except (OSError, PermissionError) as err:
266
                log.debug('Cannot check or remove directory %s: %s', subdir, err)
267
268
269
def remove_freeform_targets(directory, glob_patterns, yes, dry_run=False):
270
    """
271
    Remove free-form targets using globbing.
272
273
    This is **potentially dangerous** since users can delete everything
274
    anywhere in their file system, including the entire project they're
275
    working on. For this reason, the implementation imposes the following
276
    (user experience-related) restrictions:
277
278
    - Deleting (directories) is not recursive, directory contents must be
279
      explicitly specified using globbing (e.g. ``dirname/**/*``).
280
    - The user is responsible for the deletion order, so that a directory
281
      is empty when it is attempted to be deleted.
282
    - A confirmation prompt for the deletion of every single file system
283
      object is shown (unless the ``--yes`` option is used, in addition).
284
    """
285
    for path_glob in glob_patterns:
286
        log.debug('Erase file system objects matching: %s', path_glob)
287
        delete_filesystem_objects(directory, path_glob, prompt=not yes, dry_run=dry_run)
288
289
290
def recursive_delete_debris(directory, patterns):
291
    """
292
    Recursively delete debris matching any of the given patterns.
293
294
    This function walks the directory tree once and applies all patterns
295
    at each level, avoiding redundant directory scans.
296
    """
297
    for pattern in patterns:
298
        delete_filesystem_objects(directory, pattern)
299
300
    try:
301
        subdirs = (
302
            Path(entry.path) for entry in os.scandir(directory) if entry.is_dir()
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable entry does not seem to be defined.
Loading history...
303
        )
304
    except (OSError, PermissionError) as err:
305
        log.warning('Cannot access directory %s: %s', directory, err)
306
        return
307
308
    for subdir in subdirs:
309
        if should_ignore(subdir, Runner.ignore):
310
            log.debug('Skipping %s', subdir)
311
        else:
312
            recursive_delete_debris(subdir, patterns)
313
314
315
def delete_filesystem_objects(directory, path_glob, prompt=False, dry_run=False):
316
    """
317
    Identifies all pathnames matching a specific glob pattern, and attempts
318
    to delete them in the proper order, optionally asking for confirmation.
319
320
    Implementation Note: We sort the file system objects in *reverse order*
321
    and first delete *all files* before removing directories. This way we
322
    make sure that the directories that are deepest down in the hierarchy
323
    are empty (for both files & directories) when we attempt to remove them.
324
    """
325
    all_names = sorted(directory.glob(path_glob), reverse=True)
326
    dirs = (name for name in all_names if name.is_dir() and not name.is_symlink())
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable name does not seem to be defined.
Loading history...
327
    files = (name for name in all_names if not name.is_dir() or name.is_symlink())
328
329
    for file_object in files:
330
        file_type = 'symlink' if file_object.is_symlink() else 'file'
331
        if (
332
            not dry_run
333
            and prompt
334
            and not confirm('Delete %s %s' % (file_type, file_object))
335
        ):
336
            Runner.unlink_failed += 1
337
            continue
338
        Runner.unlink(file_object)
339
340
    for dir_object in dirs:
341
        if (
342
            not dry_run
343
            and prompt
344
            and not confirm('Remove empty directory %s' % dir_object)
345
        ):
346
            Runner.rmdir_failed += 1
347
            continue
348
        Runner.rmdir(dir_object)
349
350
351
def confirm(message):
352
    """An interactive confirmation prompt."""
353
    try:
354
        answer = input('%s? ' % message)
355
        return answer.strip().lower() in ['y', 'yes']
356
    except KeyboardInterrupt:
357
        msg = 'Aborted by user.'
358
        raise SystemExit(msg)
359
360
361
def detect_debris_in_directory(directory):
362
    """
363
    Scan a directory for debris artifacts and return a list of detected topics.
364
    """
365
    detected_topics = []
366
367
    for topic, patterns in DEBRIS_TOPICS.items():
368
        for pattern in patterns:
369
            # Skip patterns that are for recursive cleanup (contain **)
370
            if '**' in pattern:
371
                continue
372
            # Check if the pattern matches anything in the directory
373
            matches = list(directory.glob(pattern))
374
            if matches:
375
                detected_topics.append(topic)
376
                break  # Found at least one match for this topic, move to next
377
378
    return detected_topics
379
380
381
def suggest_debris_option(args):
382
    """
383
    Suggest using the --debris option when it wasn't used.
384
    Optionally provide targeted suggestions based on detected artifacts.
385
    """
386
    # Collect all detected debris topics across all directories
387
    all_detected = set()
388
    for dir_name in args.directory:
389
        dir_path = Path(dir_name)
390
        if dir_path.exists():
391
            detected = detect_debris_in_directory(dir_path)
392
            all_detected.update(detected)
393
394
    if all_detected:
395
        # Provide targeted suggestion
396
        topics_str = ' '.join(sorted(all_detected))
397
        log.info(
398
            'Hint: Use --debris to also clean up build artifacts. Detected: %s',
399
            topics_str,
400
        )
401
    else:
402
        # Provide general suggestion
403
        log.info(
404
            'Hint: Use --debris to also clean up build artifacts '
405
            'from common Python development tools.',
406
        )
407
408
409
def build_git_clean_command(ignore_patterns, dry_run=False, force=False):
410
    """
411
    Build the git clean command with appropriate flags.
412
413
    Args:
414
        ignore_patterns: List of patterns to exclude from git clean
415
        dry_run: If True, only show what would be deleted
416
        force: If True, delete without prompting (otherwise interactive)
417
418
    Returns:
419
        List of command arguments
420
    """
421
    cmd = ['git', 'clean', '-dx']
422
423
    for pattern in ignore_patterns:
424
        cmd.extend(['-e', pattern])
425
426
    if dry_run:
427
        cmd.append('-n')
428
    elif force:
429
        cmd.append('-f')
430
    else:
431
        cmd.append('-i')
432
433
    return cmd
434
435
436
def execute_git_clean(directory, args):
437
    """
438
    Execute git clean in the specified directory.
439
440
    Args:
441
        directory: Path to the directory to clean
442
        args: Command line arguments with ignore, dry_run, yes flags
443
    """
444
    cmd = build_git_clean_command(
445
        args.ignore,
446
        dry_run=args.dry_run,
447
        force=args.yes
448
    )
449
450
    log.debug('Running: %s in %s', ' '.join(cmd), directory)
451
452
    result = subprocess.run(
453
        cmd,
454
        cwd=directory,
455
        capture_output=True,
456
        text=True,
457
        check=False,
458
    )
459
460
    if result.stdout:
461
        log.info(result.stdout.rstrip())
462
    if result.stderr:
463
        log.warning(result.stderr.rstrip())
464
465
    if result.returncode == 128:
466
        log.warning(
467
            'Directory %s is not under version control. Skipping git clean.',
468
            directory,
469
        )
470
    elif result.returncode != 0:
471
        raise SystemExit(result.returncode)
472
473
474
def run_git_clean(directory, ignore_patterns, dry_run=False, force=False):
475
    """
476
    Run git clean in the specified directory with appropriate flags.
477
478
    Args:
479
        directory: Path to the directory to clean
480
        ignore_patterns: List of patterns to exclude from git clean
481
        dry_run: If True, only show what would be deleted
482
        force: If True, delete without prompting (otherwise interactive)
483
484
    Returns:
485
        Exit code from git clean
486
    """
487
    cmd = build_git_clean_command(ignore_patterns, dry_run, force)
488
489
    log.debug('Running: %s in %s', ' '.join(cmd), directory)
490
491
    result = subprocess.run(
492
        cmd,
493
        cwd=directory,
494
        capture_output=True,
495
        text=True,
496
        check=False,
497
    )
498
499
    if result.stdout:
500
        log.info(result.stdout.rstrip())
501
    if result.stderr:
502
        log.warning(result.stderr.rstrip())
503
504
    return result.returncode
505