PathInfo.is_writeable_file()   A
last analyzed

Complexity

Conditions 1

Size

Total Lines 3
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 3
nop 1
dl 0
loc 3
rs 10
c 0
b 0
f 0
1
# SPDX-FileCopyrightText: Copyright 2020-2023, Contributors to pocketutils
2
# SPDX-PackageHomePage: https://github.com/dmyersturnbull/pocketutils
3
# SPDX-License-Identifier: Apache-2.0
4
"""
5
6
"""
7
8
import logging
9
import os
10
import pathlib
11
import shutil
12
import stat
13
import tempfile
14
from collections.abc import Generator, Mapping
15
from dataclasses import dataclass
16
from datetime import UTC, datetime
17
from pathlib import Path, PurePath
18
from typing import Any, Self, Unpack
19
20
from pocketutils.core.exceptions import PathMissingError, ReadFailedError, WriteFailedError
21
from pocketutils.core.input_output import Writeable
22
23
__all__ = ["FilesysUtils", "FilesysTools", "PathInfo"]
24
25
logger = logging.getLogger("pocketutils")
26
27
28
@dataclass(frozen=True, slots=True, kw_only=True)
29
class PathInfo:
30
    """
31
    Info about an extant or nonexistent path as it was at some time.
32
    Use this to avoid making repeated filesystem calls (e.g. `.is_dir()`):
33
    None of the properties defined here make OS calls.
34
35
    Attributes:
36
        source: The original path used for lookup; may be a symlink
37
        resolved: The fully resolved path, or None if it does not exist
38
        as_of: A datetime immediately before the system calls (system timezone)
39
        real_stat: `os.stat_result`, or None if the path does not exist
40
        link_stat: `os.stat_result`, or None if the path is not a symlink
41
        has_access: Path exists and has the 'a' flag set
42
        has_read: Path exists and has the 'r' flag set
43
        has_write: Path exists and has the 'w' flag set
44
45
    All the additional properties refer to the resolved path,
46
    except for [`is_symlink`](pocketutils.tools.filesys_tools.PathInfo.is_symlink),
47
    [`is_valid_symlink`](pocketutils.tools.filesys_tools.PathInfo.is_valid_symlink),
48
    and [`is_broken_symlink`](pocketutils.tools.filesys_tools.PathInfo.is_broken_symlink).
49
    """
50
51
    source: Path
52
    resolved: Path | None
53
    as_of: datetime
54
    real_stat: os.stat_result | None
55
    link_stat: os.stat_result | None
56
    has_access: bool
57
    has_read: bool
58
    has_write: bool
59
60
    @property
61
    def mod_or_create_dt(self: Self) -> datetime | None:
62
        """
63
        Returns the modification or access datetime.
64
        Uses whichever is available: creation on Windows and modification on Unix-like.
65
        """
66
        if os.name == "nt":
67
            return self._get_dt("st_ctime")
68
        # will work on posix; on java try anyway
69
        return self._get_dt("st_mtime")
70
71
    @property
72
    def mod_dt(self: Self) -> datetime | None:
73
        """
74
        Returns the modification datetime, if known.
75
        Returns None on Windows or if the path does not exist.
76
        """
77
        if os.name == "nt":
78
            return None
79
        return self._get_dt("st_mtime")
80
81
    @property
82
    def create_dt(self: Self) -> datetime | None:
83
        """
84
        Returns the creation datetime, if known.
85
        Returns None on Unix-like systems or if the path does not exist.
86
        """
87
        if os.name == "posix":
88
            return None
89
        return self._get_dt("st_ctime")
90
91
    @property
92
    def access_dt(self: Self) -> datetime | None:
93
        """
94
        Returns the access datetime.
95
        *Should* never return None if the path exists, but not guaranteed.
96
        """
97
        return self._get_dt("st_atime")
98
99
    @property
100
    def exists(self: Self) -> bool:
101
        """
102
        Returns whether the resolved path exists.
103
        """
104
        return self.real_stat is not None
105
106
    @property
107
    def is_file(self: Self) -> bool:
108
        return self.exists and stat.S_ISREG(self.real_stat.st_mode)
109
110
    @property
111
    def is_dir(self: Self) -> bool:
112
        return self.exists and stat.S_ISDIR(self.real_stat.st_mode)
113
114
    @property
115
    def is_readable_dir(self: Self) -> bool:
116
        return self.is_file and self.has_access and self.has_read
117
118
    @property
119
    def is_writeable_dir(self: Self) -> bool:
120
        return self.is_dir and self.has_access and self.has_write
121
122
    @property
123
    def is_readable_file(self: Self) -> bool:
124
        return self.is_file and self.has_access and self.has_read
125
126
    @property
127
    def is_writeable_file(self: Self) -> bool:
128
        return self.is_file and self.has_access and self.has_write
129
130
    @property
131
    def is_block_device(self: Self) -> bool:
132
        return self.exists and stat.S_ISBLK(self.real_stat.st_mode)
133
134
    @property
135
    def is_char_device(self: Self) -> bool:
136
        return self.exists and stat.S_ISCHR(self.real_stat.st_mode)
137
138
    @property
139
    def is_socket(self: Self) -> bool:
140
        return self.exists and stat.S_ISSOCK(self.real_stat.st_mode)
141
142
    @property
143
    def is_fifo(self: Self) -> bool:
144
        return self.exists and stat.S_ISFIFO(self.real_stat.st_mode)
145
146
    @property
147
    def is_symlink(self: Self) -> bool:
148
        return self.link_stat is not None
149
150
    @property
151
    def is_valid_symlink(self: Self) -> bool:
152
        return self.is_symlink and self.exists
153
154
    @property
155
    def is_broken_symlink(self: Self) -> bool:
156
        return self.is_symlink and not self.exists
157
158
    def _get_dt(self: Self, attr: str) -> datetime | None:
159
        if self.real_stat is None:
160
            return None
161
        sec = getattr(self.real_stat, attr)
162
        return datetime.fromtimestamp(sec).astimezone()
163
164
165
@dataclass(slots=True, frozen=True)
166
class FilesysUtils:
167
    """
168
    Tools for file/directory creation, etc.
169
170
    Warning:
171
        Some functions may be insecure.
172
    """
173
174 View Code Duplication
    @classmethod
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
175
    def verify_can_read_files(
176
        cls: type[Self],
177
        *paths: str | Path,
178
        missing_ok: bool = False,
179
        attempt: bool = False,
180
    ) -> None:
181
        """
182
        Checks that all files can be written to, to ensure atomicity before operations.
183
184
        Args:
185
            *paths: The files
186
            missing_ok: Don't raise an error if a path doesn't exist
187
            attempt: Actually try opening
188
189
        Returns:
190
            ReadFailedError: If a path is not a file (modulo existence) or doesn't have 'W' set
191
        """
192
        paths = [Path(p) for p in paths]
193
        for path in paths:
194
            if path.exists() and not path.is_file():
195
                raise ReadFailedError(f"Path {path} is not a file", filename=str(path))
196
            if (not missing_ok or path.exists()) and not os.access(path, os.R_OK):
197
                raise ReadFailedError(f"Cannot read from {path}", filename=str(path))
198
            if attempt:
199
                try:
200
                    with open(path):
201
                        pass
202
                except OSError:
203
                    raise WriteFailedError(f"Failed to open {path} for read", filename=str(path))
204
205 View Code Duplication
    @classmethod
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
206
    def verify_can_write_files(
207
        cls: type[Self],
208
        *paths: str | Path,
209
        missing_ok: bool = False,
210
        attempt: bool = False,
211
    ) -> None:
212
        """
213
        Checks that all files can be written to, to ensure atomicity before operations.
214
215
        Args:
216
            *paths: The files
217
            missing_ok: Don't raise an error if a path doesn't exist
218
            attempt: Actually try opening
219
220
        Returns:
221
            WriteFailedError: If a path is not a file (modulo existence) or doesn't have 'W' set
222
        """
223
        paths = [Path(p) for p in paths]
224
        for path in paths:
225
            if path.exists() and not path.is_file():
226
                raise WriteFailedError(f"Path {path} is not a file", filename=str(path))
227
            if (not missing_ok or path.exists()) and not os.access(path, os.W_OK):
228
                raise WriteFailedError(f"Cannot write to {path}", filename=str(path))
229
            if attempt:
230
                try:
231
                    with open(path, "a"):  # or w
232
                        pass
233
                except OSError:
234
                    raise WriteFailedError(f"Failed to open {path} for write", filename=str(path))
235
236
    @classmethod
237
    def verify_can_write_dirs(
238
        cls: type[Self],
239
        *paths: str | PurePath,
240
        missing_ok: bool = False,
241
    ) -> None:
242
        """
243
        Checks that all directories can be written to, to ensure atomicity before operations.
244
245
        Args:
246
            *paths: The directories
247
            missing_ok: Don't raise an error if a path doesn't exist
248
249
        Returns:
250
            WriteFailedError: If a path is not a directory (modulo existence) or doesn't have 'W' set
251
        """
252
        paths = [Path(p) for p in paths]
253
        for path in paths:
254
            if path.exists() and not path.is_dir():
255
                raise WriteFailedError(f"Path {path} is not a dir", filename=str(path))
256
            if missing_ok and not path.exists():
257
                continue
258
            if not os.access(path, os.W_OK):
259
                raise WriteFailedError(f"{path} lacks write permission", filename=str(path))
260
            if not os.access(path, os.X_OK):
261
                raise WriteFailedError(f"{path} lacks access permission", filename=str(path))
262
263
    def get_info(self: Self, path: PurePath | str, *, expand_user: bool = False, strict: bool = False) -> PathInfo:
264
        path = Path(path)
265
        has_ignore_error = hasattr(pathlib, "_ignore_error")
266
        if not has_ignore_error:
267
            logger.debug("No _ignore_error found; some OSErrors may be suppressed")
268
        resolved = None
269
        real_stat = None
270
        has_access = False
271
        has_read = False
272
        has_write = False
273
        link_stat = None
274
        as_of = datetime.now(tz=UTC).astimezone()
275
        if has_ignore_error or path.is_symlink() or path.exists():
276
            link_stat = self.__stat_raw(path)
277
        if link_stat is not None:
278
            resolved = path.expanduser().resolve(strict=strict) if expand_user else path.resolve(strict=strict)
279
            real_stat = self.__stat_raw(resolved) if stat.S_ISLNK(link_stat.st_mode) else link_stat
280
            has_access = os.access(path, os.X_OK, follow_symlinks=True)
281
            has_read = os.access(path, os.R_OK, follow_symlinks=True)
282
            has_write = os.access(path, os.W_OK, follow_symlinks=True)
283
            if not stat.S_ISLNK(link_stat.st_mode):
284
                link_stat = None
285
        return PathInfo(
286
            source=path,
287
            resolved=resolved,
288
            as_of=as_of,
289
            real_stat=real_stat,
290
            link_stat=link_stat,
291
            has_access=has_access,
292
            has_read=has_read,
293
            has_write=has_write,
294
        )
295
296
    def prep_dir(self: Self, path: PurePath | str, *, exist_ok: bool = True) -> bool:
297
        """
298
        Prepares a directory by making it if it doesn't exist.
299
        If `exist_ok` is False, calls `logger.warning` if `path` already exists
300
        """
301
        path = Path(path)
302
        exists = path.exists()
303
        # On some platforms we get generic exceptions like permissions errors,
304
        # so these are better
305
        if exists and not path.is_dir():
306
            raise PathMissingError(filename=str(path))
307
        if exists and not exist_ok:
308
            logger.warning(f"Directory {path} already exists")
309
        if not exists:
310
            # NOTE! exist_ok in mkdir throws an error on Windows
311
            path.mkdir(parents=True)
312
        return exists
313
314
    def prep_file(self: Self, path: PurePath | str, *, exist_ok: bool = True) -> None:
315
        """
316
        Prepares a file path by making its parent directory.
317
        Same as `pathlib.Path.mkdir` but makes sure `path` is a file if it exists.
318
        """
319
        # On some platforms we get generic exceptions like permissions errors, so these are better
320
        path = Path(path)
321
        # check for errors first; don't make the dirs and then fail
322
        if path.exists() and not path.is_file() and not path.is_symlink():
323
            raise PathMissingError(filename=str(path))
324
        Path(path.parent).mkdir(parents=True, exist_ok=exist_ok)
325
326
    def delete_surefire(self: Self, path: PurePath | str) -> Exception | None:
327
        """
328
        Deletes files or directories cross-platform, but working around multiple issues in Windows.
329
330
        Returns:
331
            None, or an Exception for minor warnings
332
333
        Raises:
334
            IOError: If it can't delete
335
        """
336
        # we need this because of Windows
337
        path = Path(path)
338
        logger.debug(f"Permanently deleting {path} ...")
339
        chmod_err = None
340
        try:
341
            os.chmod(str(path), stat.S_IRWXU)
342
        except Exception as e:
343
            chmod_err = e
344
        # another reason for returning exception:
345
        # We don't want to interrupt the current line being printed like in slow_delete
346
        if path.is_dir():
347
            shutil.rmtree(str(path), ignore_errors=True)  # ignore_errors because of Windows
348
            try:
349
                path.unlink(missing_ok=True)  # again, because of Windows
350
            except OSError:  # noqa: S110
351
                pass  # almost definitely because it doesn't exist
352
        else:
353
            path.unlink(missing_ok=True)
354
        logger.debug(f"Permanently deleted {path}")
355
        return chmod_err
356
357
    def trash(self: Self, path: PurePath | str, trash_dir: PurePath | str) -> None:
358
        """
359
        Trash a file or directory.
360
361
        Args:
362
            path: The path to move to the trash
363
            trash_dir: If None, uses
364
            [`guess_trash`](pocketutils.tools.path_tools.PathTools.guess_trash).
365
        """
366
        logger.debug(f"Trashing {path} to {trash_dir} ...")
367
        shutil.move(str(path), str(trash_dir))
368
        logger.debug(f"Trashed {path} to {trash_dir}")
369
370
    def try_delete(self: Self, path: Path, *, bound: type[Exception] = PermissionError) -> None:
371
        """
372
        Try to delete a file (probably temp file), if it exists, and log any `PermissionError`.
373
        """
374
        path = Path(path)
375
        # noinspection PyBroadException
376
        try:
377
            path.unlink(missing_ok=True)
378
        except bound:
379
            logger.error(f"Permission error preventing deleting {path}")
380
381
    def temp_path(self: Self, path: PurePath | str | None = None, **kwargs) -> Generator[Path, None, None]:
382
        """
383
        Makes a temporary Path. Won't create `path` but will delete it at the end.
384
        If `path` is None, will use `tempfile.mkstemp`.
385
        """
386
        if path is None:
387
            _, path = tempfile.mkstemp()
388
        try:
389
            yield Path(path, **kwargs)
390
        finally:
391
            Path(path).unlink()
392
393
    def temp_file(
394
        self: Self,
395
        path: PurePath | str | None = None,
396
        *,
397
        spooled: bool = False,
398
        **kwargs: Unpack[Mapping[str, Any]],
399
    ) -> Generator[Writeable, None, None]:
400
        """
401
        Simple wrapper around `tempfile` functions.
402
        Wraps `TemporaryFile`, `NamedTemporaryFile`, and `SpooledTemporaryFile`.
403
        """
404
        if spooled:
405
            with tempfile.SpooledTemporaryFile(**kwargs) as x:
406
                yield x
407
        elif path is None:
408
            with tempfile.TemporaryFile(**kwargs) as x:
409
                yield x
410
        else:
411
            with tempfile.NamedTemporaryFile(str(path), **kwargs) as x:
412
                yield x
413
414
    def temp_dir(self: Self, **kwargs: Unpack[Mapping[str, Any]]) -> Generator[Path, None, None]:
415
        with tempfile.TemporaryDirectory(**kwargs) as x:
416
            yield Path(x)
417
418
    def __stat_raw(self: Self, path: Path) -> os.stat_result | None:
419
        try:
420
            return path.lstat()
421
        except OSError as e:
422
            if hasattr(pathlib, "_ignore_error") and not pathlib._ignore_error(e):
423
                raise e
424
        return None
425
426
427
FilesysTools = FilesysUtils()
428