| Total Complexity | 91 |
| Total Lines | 428 |
| Duplicated Lines | 14.02 % |
| Changes | 0 | ||
Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.
Common duplication problems, and corresponding solutions are:
Complex classes like pocketutils.tools.filesys_tools often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
| 1 | # SPDX-FileCopyrightText: Copyright 2020-2023, Contributors to pocketutils |
||
| 2 | # SPDX-PackageHomePage: https://github.com/dmyersturnbull/pocketutils |
||
| 3 | # SPDX-License-Identifier: Apache-2.0 |
||
| 4 | """ |
||
| 5 | |||
| 6 | """ |
||
| 7 | |||
| 8 | import logging |
||
| 9 | import os |
||
| 10 | import pathlib |
||
| 11 | import shutil |
||
| 12 | import stat |
||
| 13 | import tempfile |
||
| 14 | from collections.abc import Generator, Mapping |
||
| 15 | from dataclasses import dataclass |
||
| 16 | from datetime import UTC, datetime |
||
| 17 | from pathlib import Path, PurePath |
||
| 18 | from typing import Any, Self, Unpack |
||
| 19 | |||
| 20 | from pocketutils.core.exceptions import PathMissingError, ReadFailedError, WriteFailedError |
||
| 21 | from pocketutils.core.input_output import Writeable |
||
| 22 | |||
| 23 | __all__ = ["FilesysUtils", "FilesysTools", "PathInfo"] |
||
| 24 | |||
| 25 | logger = logging.getLogger("pocketutils") |
||
| 26 | |||
| 27 | |||
| 28 | @dataclass(frozen=True, slots=True, kw_only=True) |
||
| 29 | class PathInfo: |
||
| 30 | """ |
||
| 31 | Info about an extant or nonexistent path as it was at some time. |
||
| 32 | Use this to avoid making repeated filesystem calls (e.g. `.is_dir()`): |
||
| 33 | None of the properties defined here make OS calls. |
||
| 34 | |||
| 35 | Attributes: |
||
| 36 | source: The original path used for lookup; may be a symlink |
||
| 37 | resolved: The fully resolved path, or None if it does not exist |
||
| 38 | as_of: A datetime immediately before the system calls (system timezone) |
||
| 39 | real_stat: `os.stat_result`, or None if the path does not exist |
||
| 40 | link_stat: `os.stat_result`, or None if the path is not a symlink |
||
| 41 | has_access: Path exists and has the 'a' flag set |
||
| 42 | has_read: Path exists and has the 'r' flag set |
||
| 43 | has_write: Path exists and has the 'w' flag set |
||
| 44 | |||
| 45 | All the additional properties refer to the resolved path, |
||
| 46 | except for [`is_symlink`](pocketutils.tools.filesys_tools.PathInfo.is_symlink), |
||
| 47 | [`is_valid_symlink`](pocketutils.tools.filesys_tools.PathInfo.is_valid_symlink), |
||
| 48 | and [`is_broken_symlink`](pocketutils.tools.filesys_tools.PathInfo.is_broken_symlink). |
||
| 49 | """ |
||
| 50 | |||
| 51 | source: Path |
||
| 52 | resolved: Path | None |
||
| 53 | as_of: datetime |
||
| 54 | real_stat: os.stat_result | None |
||
| 55 | link_stat: os.stat_result | None |
||
| 56 | has_access: bool |
||
| 57 | has_read: bool |
||
| 58 | has_write: bool |
||
| 59 | |||
| 60 | @property |
||
| 61 | def mod_or_create_dt(self: Self) -> datetime | None: |
||
| 62 | """ |
||
| 63 | Returns the modification or access datetime. |
||
| 64 | Uses whichever is available: creation on Windows and modification on Unix-like. |
||
| 65 | """ |
||
| 66 | if os.name == "nt": |
||
| 67 | return self._get_dt("st_ctime") |
||
| 68 | # will work on posix; on java try anyway |
||
| 69 | return self._get_dt("st_mtime") |
||
| 70 | |||
| 71 | @property |
||
| 72 | def mod_dt(self: Self) -> datetime | None: |
||
| 73 | """ |
||
| 74 | Returns the modification datetime, if known. |
||
| 75 | Returns None on Windows or if the path does not exist. |
||
| 76 | """ |
||
| 77 | if os.name == "nt": |
||
| 78 | return None |
||
| 79 | return self._get_dt("st_mtime") |
||
| 80 | |||
| 81 | @property |
||
| 82 | def create_dt(self: Self) -> datetime | None: |
||
| 83 | """ |
||
| 84 | Returns the creation datetime, if known. |
||
| 85 | Returns None on Unix-like systems or if the path does not exist. |
||
| 86 | """ |
||
| 87 | if os.name == "posix": |
||
| 88 | return None |
||
| 89 | return self._get_dt("st_ctime") |
||
| 90 | |||
| 91 | @property |
||
| 92 | def access_dt(self: Self) -> datetime | None: |
||
| 93 | """ |
||
| 94 | Returns the access datetime. |
||
| 95 | *Should* never return None if the path exists, but not guaranteed. |
||
| 96 | """ |
||
| 97 | return self._get_dt("st_atime") |
||
| 98 | |||
| 99 | @property |
||
| 100 | def exists(self: Self) -> bool: |
||
| 101 | """ |
||
| 102 | Returns whether the resolved path exists. |
||
| 103 | """ |
||
| 104 | return self.real_stat is not None |
||
| 105 | |||
| 106 | @property |
||
| 107 | def is_file(self: Self) -> bool: |
||
| 108 | return self.exists and stat.S_ISREG(self.real_stat.st_mode) |
||
| 109 | |||
| 110 | @property |
||
| 111 | def is_dir(self: Self) -> bool: |
||
| 112 | return self.exists and stat.S_ISDIR(self.real_stat.st_mode) |
||
| 113 | |||
| 114 | @property |
||
| 115 | def is_readable_dir(self: Self) -> bool: |
||
| 116 | return self.is_file and self.has_access and self.has_read |
||
| 117 | |||
| 118 | @property |
||
| 119 | def is_writeable_dir(self: Self) -> bool: |
||
| 120 | return self.is_dir and self.has_access and self.has_write |
||
| 121 | |||
| 122 | @property |
||
| 123 | def is_readable_file(self: Self) -> bool: |
||
| 124 | return self.is_file and self.has_access and self.has_read |
||
| 125 | |||
| 126 | @property |
||
| 127 | def is_writeable_file(self: Self) -> bool: |
||
| 128 | return self.is_file and self.has_access and self.has_write |
||
| 129 | |||
| 130 | @property |
||
| 131 | def is_block_device(self: Self) -> bool: |
||
| 132 | return self.exists and stat.S_ISBLK(self.real_stat.st_mode) |
||
| 133 | |||
| 134 | @property |
||
| 135 | def is_char_device(self: Self) -> bool: |
||
| 136 | return self.exists and stat.S_ISCHR(self.real_stat.st_mode) |
||
| 137 | |||
| 138 | @property |
||
| 139 | def is_socket(self: Self) -> bool: |
||
| 140 | return self.exists and stat.S_ISSOCK(self.real_stat.st_mode) |
||
| 141 | |||
| 142 | @property |
||
| 143 | def is_fifo(self: Self) -> bool: |
||
| 144 | return self.exists and stat.S_ISFIFO(self.real_stat.st_mode) |
||
| 145 | |||
| 146 | @property |
||
| 147 | def is_symlink(self: Self) -> bool: |
||
| 148 | return self.link_stat is not None |
||
| 149 | |||
| 150 | @property |
||
| 151 | def is_valid_symlink(self: Self) -> bool: |
||
| 152 | return self.is_symlink and self.exists |
||
| 153 | |||
| 154 | @property |
||
| 155 | def is_broken_symlink(self: Self) -> bool: |
||
| 156 | return self.is_symlink and not self.exists |
||
| 157 | |||
| 158 | def _get_dt(self: Self, attr: str) -> datetime | None: |
||
| 159 | if self.real_stat is None: |
||
| 160 | return None |
||
| 161 | sec = getattr(self.real_stat, attr) |
||
| 162 | return datetime.fromtimestamp(sec).astimezone() |
||
| 163 | |||
| 164 | |||
| 165 | @dataclass(slots=True, frozen=True) |
||
| 166 | class FilesysUtils: |
||
| 167 | """ |
||
| 168 | Tools for file/directory creation, etc. |
||
| 169 | |||
| 170 | Warning: |
||
| 171 | Some functions may be insecure. |
||
| 172 | """ |
||
| 173 | |||
| 174 | View Code Duplication | @classmethod |
|
|
|
|||
| 175 | def verify_can_read_files( |
||
| 176 | cls: type[Self], |
||
| 177 | *paths: str | Path, |
||
| 178 | missing_ok: bool = False, |
||
| 179 | attempt: bool = False, |
||
| 180 | ) -> None: |
||
| 181 | """ |
||
| 182 | Checks that all files can be written to, to ensure atomicity before operations. |
||
| 183 | |||
| 184 | Args: |
||
| 185 | *paths: The files |
||
| 186 | missing_ok: Don't raise an error if a path doesn't exist |
||
| 187 | attempt: Actually try opening |
||
| 188 | |||
| 189 | Returns: |
||
| 190 | ReadFailedError: If a path is not a file (modulo existence) or doesn't have 'W' set |
||
| 191 | """ |
||
| 192 | paths = [Path(p) for p in paths] |
||
| 193 | for path in paths: |
||
| 194 | if path.exists() and not path.is_file(): |
||
| 195 | raise ReadFailedError(f"Path {path} is not a file", filename=str(path)) |
||
| 196 | if (not missing_ok or path.exists()) and not os.access(path, os.R_OK): |
||
| 197 | raise ReadFailedError(f"Cannot read from {path}", filename=str(path)) |
||
| 198 | if attempt: |
||
| 199 | try: |
||
| 200 | with open(path): |
||
| 201 | pass |
||
| 202 | except OSError: |
||
| 203 | raise WriteFailedError(f"Failed to open {path} for read", filename=str(path)) |
||
| 204 | |||
| 205 | View Code Duplication | @classmethod |
|
| 206 | def verify_can_write_files( |
||
| 207 | cls: type[Self], |
||
| 208 | *paths: str | Path, |
||
| 209 | missing_ok: bool = False, |
||
| 210 | attempt: bool = False, |
||
| 211 | ) -> None: |
||
| 212 | """ |
||
| 213 | Checks that all files can be written to, to ensure atomicity before operations. |
||
| 214 | |||
| 215 | Args: |
||
| 216 | *paths: The files |
||
| 217 | missing_ok: Don't raise an error if a path doesn't exist |
||
| 218 | attempt: Actually try opening |
||
| 219 | |||
| 220 | Returns: |
||
| 221 | WriteFailedError: If a path is not a file (modulo existence) or doesn't have 'W' set |
||
| 222 | """ |
||
| 223 | paths = [Path(p) for p in paths] |
||
| 224 | for path in paths: |
||
| 225 | if path.exists() and not path.is_file(): |
||
| 226 | raise WriteFailedError(f"Path {path} is not a file", filename=str(path)) |
||
| 227 | if (not missing_ok or path.exists()) and not os.access(path, os.W_OK): |
||
| 228 | raise WriteFailedError(f"Cannot write to {path}", filename=str(path)) |
||
| 229 | if attempt: |
||
| 230 | try: |
||
| 231 | with open(path, "a"): # or w |
||
| 232 | pass |
||
| 233 | except OSError: |
||
| 234 | raise WriteFailedError(f"Failed to open {path} for write", filename=str(path)) |
||
| 235 | |||
| 236 | @classmethod |
||
| 237 | def verify_can_write_dirs( |
||
| 238 | cls: type[Self], |
||
| 239 | *paths: str | PurePath, |
||
| 240 | missing_ok: bool = False, |
||
| 241 | ) -> None: |
||
| 242 | """ |
||
| 243 | Checks that all directories can be written to, to ensure atomicity before operations. |
||
| 244 | |||
| 245 | Args: |
||
| 246 | *paths: The directories |
||
| 247 | missing_ok: Don't raise an error if a path doesn't exist |
||
| 248 | |||
| 249 | Returns: |
||
| 250 | WriteFailedError: If a path is not a directory (modulo existence) or doesn't have 'W' set |
||
| 251 | """ |
||
| 252 | paths = [Path(p) for p in paths] |
||
| 253 | for path in paths: |
||
| 254 | if path.exists() and not path.is_dir(): |
||
| 255 | raise WriteFailedError(f"Path {path} is not a dir", filename=str(path)) |
||
| 256 | if missing_ok and not path.exists(): |
||
| 257 | continue |
||
| 258 | if not os.access(path, os.W_OK): |
||
| 259 | raise WriteFailedError(f"{path} lacks write permission", filename=str(path)) |
||
| 260 | if not os.access(path, os.X_OK): |
||
| 261 | raise WriteFailedError(f"{path} lacks access permission", filename=str(path)) |
||
| 262 | |||
| 263 | def get_info(self: Self, path: PurePath | str, *, expand_user: bool = False, strict: bool = False) -> PathInfo: |
||
| 264 | path = Path(path) |
||
| 265 | has_ignore_error = hasattr(pathlib, "_ignore_error") |
||
| 266 | if not has_ignore_error: |
||
| 267 | logger.debug("No _ignore_error found; some OSErrors may be suppressed") |
||
| 268 | resolved = None |
||
| 269 | real_stat = None |
||
| 270 | has_access = False |
||
| 271 | has_read = False |
||
| 272 | has_write = False |
||
| 273 | link_stat = None |
||
| 274 | as_of = datetime.now(tz=UTC).astimezone() |
||
| 275 | if has_ignore_error or path.is_symlink() or path.exists(): |
||
| 276 | link_stat = self.__stat_raw(path) |
||
| 277 | if link_stat is not None: |
||
| 278 | resolved = path.expanduser().resolve(strict=strict) if expand_user else path.resolve(strict=strict) |
||
| 279 | real_stat = self.__stat_raw(resolved) if stat.S_ISLNK(link_stat.st_mode) else link_stat |
||
| 280 | has_access = os.access(path, os.X_OK, follow_symlinks=True) |
||
| 281 | has_read = os.access(path, os.R_OK, follow_symlinks=True) |
||
| 282 | has_write = os.access(path, os.W_OK, follow_symlinks=True) |
||
| 283 | if not stat.S_ISLNK(link_stat.st_mode): |
||
| 284 | link_stat = None |
||
| 285 | return PathInfo( |
||
| 286 | source=path, |
||
| 287 | resolved=resolved, |
||
| 288 | as_of=as_of, |
||
| 289 | real_stat=real_stat, |
||
| 290 | link_stat=link_stat, |
||
| 291 | has_access=has_access, |
||
| 292 | has_read=has_read, |
||
| 293 | has_write=has_write, |
||
| 294 | ) |
||
| 295 | |||
| 296 | def prep_dir(self: Self, path: PurePath | str, *, exist_ok: bool = True) -> bool: |
||
| 297 | """ |
||
| 298 | Prepares a directory by making it if it doesn't exist. |
||
| 299 | If `exist_ok` is False, calls `logger.warning` if `path` already exists |
||
| 300 | """ |
||
| 301 | path = Path(path) |
||
| 302 | exists = path.exists() |
||
| 303 | # On some platforms we get generic exceptions like permissions errors, |
||
| 304 | # so these are better |
||
| 305 | if exists and not path.is_dir(): |
||
| 306 | raise PathMissingError(filename=str(path)) |
||
| 307 | if exists and not exist_ok: |
||
| 308 | logger.warning(f"Directory {path} already exists") |
||
| 309 | if not exists: |
||
| 310 | # NOTE! exist_ok in mkdir throws an error on Windows |
||
| 311 | path.mkdir(parents=True) |
||
| 312 | return exists |
||
| 313 | |||
| 314 | def prep_file(self: Self, path: PurePath | str, *, exist_ok: bool = True) -> None: |
||
| 315 | """ |
||
| 316 | Prepares a file path by making its parent directory. |
||
| 317 | Same as `pathlib.Path.mkdir` but makes sure `path` is a file if it exists. |
||
| 318 | """ |
||
| 319 | # On some platforms we get generic exceptions like permissions errors, so these are better |
||
| 320 | path = Path(path) |
||
| 321 | # check for errors first; don't make the dirs and then fail |
||
| 322 | if path.exists() and not path.is_file() and not path.is_symlink(): |
||
| 323 | raise PathMissingError(filename=str(path)) |
||
| 324 | Path(path.parent).mkdir(parents=True, exist_ok=exist_ok) |
||
| 325 | |||
| 326 | def delete_surefire(self: Self, path: PurePath | str) -> Exception | None: |
||
| 327 | """ |
||
| 328 | Deletes files or directories cross-platform, but working around multiple issues in Windows. |
||
| 329 | |||
| 330 | Returns: |
||
| 331 | None, or an Exception for minor warnings |
||
| 332 | |||
| 333 | Raises: |
||
| 334 | IOError: If it can't delete |
||
| 335 | """ |
||
| 336 | # we need this because of Windows |
||
| 337 | path = Path(path) |
||
| 338 | logger.debug(f"Permanently deleting {path} ...") |
||
| 339 | chmod_err = None |
||
| 340 | try: |
||
| 341 | os.chmod(str(path), stat.S_IRWXU) |
||
| 342 | except Exception as e: |
||
| 343 | chmod_err = e |
||
| 344 | # another reason for returning exception: |
||
| 345 | # We don't want to interrupt the current line being printed like in slow_delete |
||
| 346 | if path.is_dir(): |
||
| 347 | shutil.rmtree(str(path), ignore_errors=True) # ignore_errors because of Windows |
||
| 348 | try: |
||
| 349 | path.unlink(missing_ok=True) # again, because of Windows |
||
| 350 | except OSError: # noqa: S110 |
||
| 351 | pass # almost definitely because it doesn't exist |
||
| 352 | else: |
||
| 353 | path.unlink(missing_ok=True) |
||
| 354 | logger.debug(f"Permanently deleted {path}") |
||
| 355 | return chmod_err |
||
| 356 | |||
| 357 | def trash(self: Self, path: PurePath | str, trash_dir: PurePath | str) -> None: |
||
| 358 | """ |
||
| 359 | Trash a file or directory. |
||
| 360 | |||
| 361 | Args: |
||
| 362 | path: The path to move to the trash |
||
| 363 | trash_dir: If None, uses |
||
| 364 | [`guess_trash`](pocketutils.tools.path_tools.PathTools.guess_trash). |
||
| 365 | """ |
||
| 366 | logger.debug(f"Trashing {path} to {trash_dir} ...") |
||
| 367 | shutil.move(str(path), str(trash_dir)) |
||
| 368 | logger.debug(f"Trashed {path} to {trash_dir}") |
||
| 369 | |||
| 370 | def try_delete(self: Self, path: Path, *, bound: type[Exception] = PermissionError) -> None: |
||
| 371 | """ |
||
| 372 | Try to delete a file (probably temp file), if it exists, and log any `PermissionError`. |
||
| 373 | """ |
||
| 374 | path = Path(path) |
||
| 375 | # noinspection PyBroadException |
||
| 376 | try: |
||
| 377 | path.unlink(missing_ok=True) |
||
| 378 | except bound: |
||
| 379 | logger.error(f"Permission error preventing deleting {path}") |
||
| 380 | |||
| 381 | def temp_path(self: Self, path: PurePath | str | None = None, **kwargs) -> Generator[Path, None, None]: |
||
| 382 | """ |
||
| 383 | Makes a temporary Path. Won't create `path` but will delete it at the end. |
||
| 384 | If `path` is None, will use `tempfile.mkstemp`. |
||
| 385 | """ |
||
| 386 | if path is None: |
||
| 387 | _, path = tempfile.mkstemp() |
||
| 388 | try: |
||
| 389 | yield Path(path, **kwargs) |
||
| 390 | finally: |
||
| 391 | Path(path).unlink() |
||
| 392 | |||
| 393 | def temp_file( |
||
| 394 | self: Self, |
||
| 395 | path: PurePath | str | None = None, |
||
| 396 | *, |
||
| 397 | spooled: bool = False, |
||
| 398 | **kwargs: Unpack[Mapping[str, Any]], |
||
| 399 | ) -> Generator[Writeable, None, None]: |
||
| 400 | """ |
||
| 401 | Simple wrapper around `tempfile` functions. |
||
| 402 | Wraps `TemporaryFile`, `NamedTemporaryFile`, and `SpooledTemporaryFile`. |
||
| 403 | """ |
||
| 404 | if spooled: |
||
| 405 | with tempfile.SpooledTemporaryFile(**kwargs) as x: |
||
| 406 | yield x |
||
| 407 | elif path is None: |
||
| 408 | with tempfile.TemporaryFile(**kwargs) as x: |
||
| 409 | yield x |
||
| 410 | else: |
||
| 411 | with tempfile.NamedTemporaryFile(str(path), **kwargs) as x: |
||
| 412 | yield x |
||
| 413 | |||
| 414 | def temp_dir(self: Self, **kwargs: Unpack[Mapping[str, Any]]) -> Generator[Path, None, None]: |
||
| 415 | with tempfile.TemporaryDirectory(**kwargs) as x: |
||
| 416 | yield Path(x) |
||
| 417 | |||
| 418 | def __stat_raw(self: Self, path: Path) -> os.stat_result | None: |
||
| 419 | try: |
||
| 420 | return path.lstat() |
||
| 421 | except OSError as e: |
||
| 422 | if hasattr(pathlib, "_ignore_error") and not pathlib._ignore_error(e): |
||
| 423 | raise e |
||
| 424 | return None |
||
| 425 | |||
| 426 | |||
| 427 | FilesysTools = FilesysUtils() |
||
| 428 |