@@ 64-96 (lines=33) @@ | ||
61 | msg = f"Failed to open {path} for read" |
|
62 | raise WritePermissionsError(msg, key=str(path)) from e |
|
63 | ||
64 | @classmethod |
|
65 | def verify_can_write_files( |
|
66 | cls, |
|
67 | *paths: str | Path, |
|
68 | missing_ok: bool = False, |
|
69 | attempt: bool = False, |
|
70 | ) -> None: |
|
71 | """ |
|
72 | Checks that all files can be written to, to ensure atomicity before operations. |
|
73 | ||
74 | Args: |
|
75 | *paths: The files |
|
76 | missing_ok: Don't raise an error if a path doesn't exist |
|
77 | attempt: Actually try opening |
|
78 | ||
79 | Returns: |
|
80 | WritePermissionsError: If a path is not a file (modulo existence) or doesn't have 'W' set |
|
81 | """ |
|
82 | paths = [Path(p) for p in paths] |
|
83 | for path in paths: |
|
84 | if path.exists() and not path.is_file(): |
|
85 | msg = f"Path {path} is not a file" |
|
86 | raise WritePermissionsError(msg, key=str(path)) |
|
87 | if (not missing_ok or path.exists()) and not os.access(path, os.W_OK): |
|
88 | msg = f"Cannot write to {path}" |
|
89 | raise WritePermissionsError(msg, key=str(path)) |
|
90 | if attempt: |
|
91 | try: |
|
92 | with path.open("a"): # or w |
|
93 | pass |
|
94 | except OSError as e: |
|
95 | msg = f"Failed to open {path} for write" |
|
96 | raise WritePermissionsError(msg, key=str(path)) from e |
|
97 | ||
98 | @classmethod |
|
99 | def verify_can_write_dirs(cls, *paths: str | Path, missing_ok: bool = False) -> None: |
|
@@ 30-62 (lines=33) @@ | ||
27 | ||
28 | ||
29 | class IoUtils: |
|
30 | @classmethod |
|
31 | def verify_can_read_files( |
|
32 | cls, |
|
33 | *paths: str | Path, |
|
34 | missing_ok: bool = False, |
|
35 | attempt: bool = False, |
|
36 | ) -> None: |
|
37 | """ |
|
38 | Checks that all files can be written to, to ensure atomicity before operations. |
|
39 | ||
40 | Args: |
|
41 | *paths: The files |
|
42 | missing_ok: Don't raise an error if a path doesn't exist |
|
43 | attempt: Actually try opening |
|
44 | ||
45 | Returns: |
|
46 | ReadPermissionsError: If a path is not a file (modulo existence) or doesn't have 'W' set |
|
47 | """ |
|
48 | paths = [Path(p) for p in paths] |
|
49 | for path in paths: |
|
50 | if path.exists() and not path.is_file(): |
|
51 | msg = f"Path {path} is not a file" |
|
52 | raise ReadPermissionsError(msg, key=str(path)) |
|
53 | if (not missing_ok or path.exists()) and not os.access(path, os.R_OK): |
|
54 | msg = f"Cannot read from {path}" |
|
55 | raise ReadPermissionsError(msg, key=str(path)) |
|
56 | if attempt: |
|
57 | try: |
|
58 | with path.open(): |
|
59 | pass |
|
60 | except OSError as e: |
|
61 | msg = f"Failed to open {path} for read" |
|
62 | raise WritePermissionsError(msg, key=str(path)) from e |
|
63 | ||
64 | @classmethod |
|
65 | def verify_can_write_files( |