Total Complexity | 91 |
Total Lines | 428 |
Duplicated Lines | 14.02 % |
Changes | 0 |
Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.
Common duplication problems, and corresponding solutions are:
Complex classes like pocketutils.tools.filesys_tools often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
1 | # SPDX-FileCopyrightText: Copyright 2020-2023, Contributors to pocketutils |
||
2 | # SPDX-PackageHomePage: https://github.com/dmyersturnbull/pocketutils |
||
3 | # SPDX-License-Identifier: Apache-2.0 |
||
4 | """ |
||
5 | |||
6 | """ |
||
7 | |||
8 | import logging |
||
9 | import os |
||
10 | import pathlib |
||
11 | import shutil |
||
12 | import stat |
||
13 | import tempfile |
||
14 | from collections.abc import Generator, Mapping |
||
15 | from dataclasses import dataclass |
||
16 | from datetime import UTC, datetime |
||
17 | from pathlib import Path, PurePath |
||
18 | from typing import Any, Self, Unpack |
||
19 | |||
20 | from pocketutils.core.exceptions import PathMissingError, ReadFailedError, WriteFailedError |
||
21 | from pocketutils.core.input_output import Writeable |
||
22 | |||
23 | __all__ = ["FilesysUtils", "FilesysTools", "PathInfo"] |
||
24 | |||
25 | logger = logging.getLogger("pocketutils") |
||
26 | |||
27 | |||
28 | @dataclass(frozen=True, slots=True, kw_only=True) |
||
29 | class PathInfo: |
||
30 | """ |
||
31 | Info about an extant or nonexistent path as it was at some time. |
||
32 | Use this to avoid making repeated filesystem calls (e.g. `.is_dir()`): |
||
33 | None of the properties defined here make OS calls. |
||
34 | |||
35 | Attributes: |
||
36 | source: The original path used for lookup; may be a symlink |
||
37 | resolved: The fully resolved path, or None if it does not exist |
||
38 | as_of: A datetime immediately before the system calls (system timezone) |
||
39 | real_stat: `os.stat_result`, or None if the path does not exist |
||
40 | link_stat: `os.stat_result`, or None if the path is not a symlink |
||
41 | has_access: Path exists and has the 'a' flag set |
||
42 | has_read: Path exists and has the 'r' flag set |
||
43 | has_write: Path exists and has the 'w' flag set |
||
44 | |||
45 | All the additional properties refer to the resolved path, |
||
46 | except for [`is_symlink`](pocketutils.tools.filesys_tools.PathInfo.is_symlink), |
||
47 | [`is_valid_symlink`](pocketutils.tools.filesys_tools.PathInfo.is_valid_symlink), |
||
48 | and [`is_broken_symlink`](pocketutils.tools.filesys_tools.PathInfo.is_broken_symlink). |
||
49 | """ |
||
50 | |||
51 | source: Path |
||
52 | resolved: Path | None |
||
53 | as_of: datetime |
||
54 | real_stat: os.stat_result | None |
||
55 | link_stat: os.stat_result | None |
||
56 | has_access: bool |
||
57 | has_read: bool |
||
58 | has_write: bool |
||
59 | |||
60 | @property |
||
61 | def mod_or_create_dt(self: Self) -> datetime | None: |
||
62 | """ |
||
63 | Returns the modification or access datetime. |
||
64 | Uses whichever is available: creation on Windows and modification on Unix-like. |
||
65 | """ |
||
66 | if os.name == "nt": |
||
67 | return self._get_dt("st_ctime") |
||
68 | # will work on posix; on java try anyway |
||
69 | return self._get_dt("st_mtime") |
||
70 | |||
71 | @property |
||
72 | def mod_dt(self: Self) -> datetime | None: |
||
73 | """ |
||
74 | Returns the modification datetime, if known. |
||
75 | Returns None on Windows or if the path does not exist. |
||
76 | """ |
||
77 | if os.name == "nt": |
||
78 | return None |
||
79 | return self._get_dt("st_mtime") |
||
80 | |||
81 | @property |
||
82 | def create_dt(self: Self) -> datetime | None: |
||
83 | """ |
||
84 | Returns the creation datetime, if known. |
||
85 | Returns None on Unix-like systems or if the path does not exist. |
||
86 | """ |
||
87 | if os.name == "posix": |
||
88 | return None |
||
89 | return self._get_dt("st_ctime") |
||
90 | |||
91 | @property |
||
92 | def access_dt(self: Self) -> datetime | None: |
||
93 | """ |
||
94 | Returns the access datetime. |
||
95 | *Should* never return None if the path exists, but not guaranteed. |
||
96 | """ |
||
97 | return self._get_dt("st_atime") |
||
98 | |||
99 | @property |
||
100 | def exists(self: Self) -> bool: |
||
101 | """ |
||
102 | Returns whether the resolved path exists. |
||
103 | """ |
||
104 | return self.real_stat is not None |
||
105 | |||
106 | @property |
||
107 | def is_file(self: Self) -> bool: |
||
108 | return self.exists and stat.S_ISREG(self.real_stat.st_mode) |
||
109 | |||
110 | @property |
||
111 | def is_dir(self: Self) -> bool: |
||
112 | return self.exists and stat.S_ISDIR(self.real_stat.st_mode) |
||
113 | |||
114 | @property |
||
115 | def is_readable_dir(self: Self) -> bool: |
||
116 | return self.is_file and self.has_access and self.has_read |
||
117 | |||
118 | @property |
||
119 | def is_writeable_dir(self: Self) -> bool: |
||
120 | return self.is_dir and self.has_access and self.has_write |
||
121 | |||
122 | @property |
||
123 | def is_readable_file(self: Self) -> bool: |
||
124 | return self.is_file and self.has_access and self.has_read |
||
125 | |||
126 | @property |
||
127 | def is_writeable_file(self: Self) -> bool: |
||
128 | return self.is_file and self.has_access and self.has_write |
||
129 | |||
130 | @property |
||
131 | def is_block_device(self: Self) -> bool: |
||
132 | return self.exists and stat.S_ISBLK(self.real_stat.st_mode) |
||
133 | |||
134 | @property |
||
135 | def is_char_device(self: Self) -> bool: |
||
136 | return self.exists and stat.S_ISCHR(self.real_stat.st_mode) |
||
137 | |||
138 | @property |
||
139 | def is_socket(self: Self) -> bool: |
||
140 | return self.exists and stat.S_ISSOCK(self.real_stat.st_mode) |
||
141 | |||
142 | @property |
||
143 | def is_fifo(self: Self) -> bool: |
||
144 | return self.exists and stat.S_ISFIFO(self.real_stat.st_mode) |
||
145 | |||
146 | @property |
||
147 | def is_symlink(self: Self) -> bool: |
||
148 | return self.link_stat is not None |
||
149 | |||
150 | @property |
||
151 | def is_valid_symlink(self: Self) -> bool: |
||
152 | return self.is_symlink and self.exists |
||
153 | |||
154 | @property |
||
155 | def is_broken_symlink(self: Self) -> bool: |
||
156 | return self.is_symlink and not self.exists |
||
157 | |||
158 | def _get_dt(self: Self, attr: str) -> datetime | None: |
||
159 | if self.real_stat is None: |
||
160 | return None |
||
161 | sec = getattr(self.real_stat, attr) |
||
162 | return datetime.fromtimestamp(sec).astimezone() |
||
163 | |||
164 | |||
165 | @dataclass(slots=True, frozen=True) |
||
166 | class FilesysUtils: |
||
167 | """ |
||
168 | Tools for file/directory creation, etc. |
||
169 | |||
170 | Warning: |
||
171 | Some functions may be insecure. |
||
172 | """ |
||
173 | |||
174 | View Code Duplication | @classmethod |
|
|
|||
175 | def verify_can_read_files( |
||
176 | cls: type[Self], |
||
177 | *paths: str | Path, |
||
178 | missing_ok: bool = False, |
||
179 | attempt: bool = False, |
||
180 | ) -> None: |
||
181 | """ |
||
182 | Checks that all files can be written to, to ensure atomicity before operations. |
||
183 | |||
184 | Args: |
||
185 | *paths: The files |
||
186 | missing_ok: Don't raise an error if a path doesn't exist |
||
187 | attempt: Actually try opening |
||
188 | |||
189 | Returns: |
||
190 | ReadFailedError: If a path is not a file (modulo existence) or doesn't have 'W' set |
||
191 | """ |
||
192 | paths = [Path(p) for p in paths] |
||
193 | for path in paths: |
||
194 | if path.exists() and not path.is_file(): |
||
195 | raise ReadFailedError(f"Path {path} is not a file", filename=str(path)) |
||
196 | if (not missing_ok or path.exists()) and not os.access(path, os.R_OK): |
||
197 | raise ReadFailedError(f"Cannot read from {path}", filename=str(path)) |
||
198 | if attempt: |
||
199 | try: |
||
200 | with open(path): |
||
201 | pass |
||
202 | except OSError: |
||
203 | raise WriteFailedError(f"Failed to open {path} for read", filename=str(path)) |
||
204 | |||
205 | View Code Duplication | @classmethod |
|
206 | def verify_can_write_files( |
||
207 | cls: type[Self], |
||
208 | *paths: str | Path, |
||
209 | missing_ok: bool = False, |
||
210 | attempt: bool = False, |
||
211 | ) -> None: |
||
212 | """ |
||
213 | Checks that all files can be written to, to ensure atomicity before operations. |
||
214 | |||
215 | Args: |
||
216 | *paths: The files |
||
217 | missing_ok: Don't raise an error if a path doesn't exist |
||
218 | attempt: Actually try opening |
||
219 | |||
220 | Returns: |
||
221 | WriteFailedError: If a path is not a file (modulo existence) or doesn't have 'W' set |
||
222 | """ |
||
223 | paths = [Path(p) for p in paths] |
||
224 | for path in paths: |
||
225 | if path.exists() and not path.is_file(): |
||
226 | raise WriteFailedError(f"Path {path} is not a file", filename=str(path)) |
||
227 | if (not missing_ok or path.exists()) and not os.access(path, os.W_OK): |
||
228 | raise WriteFailedError(f"Cannot write to {path}", filename=str(path)) |
||
229 | if attempt: |
||
230 | try: |
||
231 | with open(path, "a"): # or w |
||
232 | pass |
||
233 | except OSError: |
||
234 | raise WriteFailedError(f"Failed to open {path} for write", filename=str(path)) |
||
235 | |||
236 | @classmethod |
||
237 | def verify_can_write_dirs( |
||
238 | cls: type[Self], |
||
239 | *paths: str | PurePath, |
||
240 | missing_ok: bool = False, |
||
241 | ) -> None: |
||
242 | """ |
||
243 | Checks that all directories can be written to, to ensure atomicity before operations. |
||
244 | |||
245 | Args: |
||
246 | *paths: The directories |
||
247 | missing_ok: Don't raise an error if a path doesn't exist |
||
248 | |||
249 | Returns: |
||
250 | WriteFailedError: If a path is not a directory (modulo existence) or doesn't have 'W' set |
||
251 | """ |
||
252 | paths = [Path(p) for p in paths] |
||
253 | for path in paths: |
||
254 | if path.exists() and not path.is_dir(): |
||
255 | raise WriteFailedError(f"Path {path} is not a dir", filename=str(path)) |
||
256 | if missing_ok and not path.exists(): |
||
257 | continue |
||
258 | if not os.access(path, os.W_OK): |
||
259 | raise WriteFailedError(f"{path} lacks write permission", filename=str(path)) |
||
260 | if not os.access(path, os.X_OK): |
||
261 | raise WriteFailedError(f"{path} lacks access permission", filename=str(path)) |
||
262 | |||
263 | def get_info(self: Self, path: PurePath | str, *, expand_user: bool = False, strict: bool = False) -> PathInfo: |
||
264 | path = Path(path) |
||
265 | has_ignore_error = hasattr(pathlib, "_ignore_error") |
||
266 | if not has_ignore_error: |
||
267 | logger.debug("No _ignore_error found; some OSErrors may be suppressed") |
||
268 | resolved = None |
||
269 | real_stat = None |
||
270 | has_access = False |
||
271 | has_read = False |
||
272 | has_write = False |
||
273 | link_stat = None |
||
274 | as_of = datetime.now(tz=UTC).astimezone() |
||
275 | if has_ignore_error or path.is_symlink() or path.exists(): |
||
276 | link_stat = self.__stat_raw(path) |
||
277 | if link_stat is not None: |
||
278 | resolved = path.expanduser().resolve(strict=strict) if expand_user else path.resolve(strict=strict) |
||
279 | real_stat = self.__stat_raw(resolved) if stat.S_ISLNK(link_stat.st_mode) else link_stat |
||
280 | has_access = os.access(path, os.X_OK, follow_symlinks=True) |
||
281 | has_read = os.access(path, os.R_OK, follow_symlinks=True) |
||
282 | has_write = os.access(path, os.W_OK, follow_symlinks=True) |
||
283 | if not stat.S_ISLNK(link_stat.st_mode): |
||
284 | link_stat = None |
||
285 | return PathInfo( |
||
286 | source=path, |
||
287 | resolved=resolved, |
||
288 | as_of=as_of, |
||
289 | real_stat=real_stat, |
||
290 | link_stat=link_stat, |
||
291 | has_access=has_access, |
||
292 | has_read=has_read, |
||
293 | has_write=has_write, |
||
294 | ) |
||
295 | |||
296 | def prep_dir(self: Self, path: PurePath | str, *, exist_ok: bool = True) -> bool: |
||
297 | """ |
||
298 | Prepares a directory by making it if it doesn't exist. |
||
299 | If `exist_ok` is False, calls `logger.warning` if `path` already exists |
||
300 | """ |
||
301 | path = Path(path) |
||
302 | exists = path.exists() |
||
303 | # On some platforms we get generic exceptions like permissions errors, |
||
304 | # so these are better |
||
305 | if exists and not path.is_dir(): |
||
306 | raise PathMissingError(filename=str(path)) |
||
307 | if exists and not exist_ok: |
||
308 | logger.warning(f"Directory {path} already exists") |
||
309 | if not exists: |
||
310 | # NOTE! exist_ok in mkdir throws an error on Windows |
||
311 | path.mkdir(parents=True) |
||
312 | return exists |
||
313 | |||
314 | def prep_file(self: Self, path: PurePath | str, *, exist_ok: bool = True) -> None: |
||
315 | """ |
||
316 | Prepares a file path by making its parent directory. |
||
317 | Same as `pathlib.Path.mkdir` but makes sure `path` is a file if it exists. |
||
318 | """ |
||
319 | # On some platforms we get generic exceptions like permissions errors, so these are better |
||
320 | path = Path(path) |
||
321 | # check for errors first; don't make the dirs and then fail |
||
322 | if path.exists() and not path.is_file() and not path.is_symlink(): |
||
323 | raise PathMissingError(filename=str(path)) |
||
324 | Path(path.parent).mkdir(parents=True, exist_ok=exist_ok) |
||
325 | |||
326 | def delete_surefire(self: Self, path: PurePath | str) -> Exception | None: |
||
327 | """ |
||
328 | Deletes files or directories cross-platform, but working around multiple issues in Windows. |
||
329 | |||
330 | Returns: |
||
331 | None, or an Exception for minor warnings |
||
332 | |||
333 | Raises: |
||
334 | IOError: If it can't delete |
||
335 | """ |
||
336 | # we need this because of Windows |
||
337 | path = Path(path) |
||
338 | logger.debug(f"Permanently deleting {path} ...") |
||
339 | chmod_err = None |
||
340 | try: |
||
341 | os.chmod(str(path), stat.S_IRWXU) |
||
342 | except Exception as e: |
||
343 | chmod_err = e |
||
344 | # another reason for returning exception: |
||
345 | # We don't want to interrupt the current line being printed like in slow_delete |
||
346 | if path.is_dir(): |
||
347 | shutil.rmtree(str(path), ignore_errors=True) # ignore_errors because of Windows |
||
348 | try: |
||
349 | path.unlink(missing_ok=True) # again, because of Windows |
||
350 | except OSError: # noqa: S110 |
||
351 | pass # almost definitely because it doesn't exist |
||
352 | else: |
||
353 | path.unlink(missing_ok=True) |
||
354 | logger.debug(f"Permanently deleted {path}") |
||
355 | return chmod_err |
||
356 | |||
357 | def trash(self: Self, path: PurePath | str, trash_dir: PurePath | str) -> None: |
||
358 | """ |
||
359 | Trash a file or directory. |
||
360 | |||
361 | Args: |
||
362 | path: The path to move to the trash |
||
363 | trash_dir: If None, uses |
||
364 | [`guess_trash`](pocketutils.tools.path_tools.PathTools.guess_trash). |
||
365 | """ |
||
366 | logger.debug(f"Trashing {path} to {trash_dir} ...") |
||
367 | shutil.move(str(path), str(trash_dir)) |
||
368 | logger.debug(f"Trashed {path} to {trash_dir}") |
||
369 | |||
370 | def try_delete(self: Self, path: Path, *, bound: type[Exception] = PermissionError) -> None: |
||
371 | """ |
||
372 | Try to delete a file (probably temp file), if it exists, and log any `PermissionError`. |
||
373 | """ |
||
374 | path = Path(path) |
||
375 | # noinspection PyBroadException |
||
376 | try: |
||
377 | path.unlink(missing_ok=True) |
||
378 | except bound: |
||
379 | logger.error(f"Permission error preventing deleting {path}") |
||
380 | |||
381 | def temp_path(self: Self, path: PurePath | str | None = None, **kwargs) -> Generator[Path, None, None]: |
||
382 | """ |
||
383 | Makes a temporary Path. Won't create `path` but will delete it at the end. |
||
384 | If `path` is None, will use `tempfile.mkstemp`. |
||
385 | """ |
||
386 | if path is None: |
||
387 | _, path = tempfile.mkstemp() |
||
388 | try: |
||
389 | yield Path(path, **kwargs) |
||
390 | finally: |
||
391 | Path(path).unlink() |
||
392 | |||
393 | def temp_file( |
||
394 | self: Self, |
||
395 | path: PurePath | str | None = None, |
||
396 | *, |
||
397 | spooled: bool = False, |
||
398 | **kwargs: Unpack[Mapping[str, Any]], |
||
399 | ) -> Generator[Writeable, None, None]: |
||
400 | """ |
||
401 | Simple wrapper around `tempfile` functions. |
||
402 | Wraps `TemporaryFile`, `NamedTemporaryFile`, and `SpooledTemporaryFile`. |
||
403 | """ |
||
404 | if spooled: |
||
405 | with tempfile.SpooledTemporaryFile(**kwargs) as x: |
||
406 | yield x |
||
407 | elif path is None: |
||
408 | with tempfile.TemporaryFile(**kwargs) as x: |
||
409 | yield x |
||
410 | else: |
||
411 | with tempfile.NamedTemporaryFile(str(path), **kwargs) as x: |
||
412 | yield x |
||
413 | |||
414 | def temp_dir(self: Self, **kwargs: Unpack[Mapping[str, Any]]) -> Generator[Path, None, None]: |
||
415 | with tempfile.TemporaryDirectory(**kwargs) as x: |
||
416 | yield Path(x) |
||
417 | |||
418 | def __stat_raw(self: Self, path: Path) -> os.stat_result | None: |
||
419 | try: |
||
420 | return path.lstat() |
||
421 | except OSError as e: |
||
422 | if hasattr(pathlib, "_ignore_error") and not pathlib._ignore_error(e): |
||
423 | raise e |
||
424 | return None |
||
425 | |||
426 | |||
427 | FilesysTools = FilesysUtils() |
||
428 |