Total Complexity | 162 |
Total Lines | 724 |
Duplicated Lines | 8.29 % |
Changes | 0 |
Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.
Common duplication problems, and corresponding solutions are:
Complex classes like pocketutils.tools.filesys_tools often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
1 | import bz2 |
||
|
|||
2 | import csv |
||
3 | import gzip |
||
4 | import logging |
||
5 | import os |
||
6 | import pathlib |
||
7 | import shutil |
||
8 | import stat |
||
9 | import sys |
||
10 | import tempfile |
||
11 | from collections.abc import Callable, Generator, Iterable, Mapping, Sequence |
||
12 | from contextlib import contextmanager |
||
13 | from datetime import datetime, timedelta |
||
14 | from pathlib import Path, PurePath |
||
15 | from typing import Any |
||
16 | |||
17 | import orjson |
||
18 | import regex |
||
19 | from defusedxml import ElementTree |
||
20 | |||
21 | from pocketutils.core.chars import Chars |
||
22 | from pocketutils.core.exceptions import ( |
||
23 | AlreadyUsedError, |
||
24 | DirDoesNotExistError, |
||
25 | FileDoesNotExistError, |
||
26 | ParsingError, |
||
27 | ReadPermissionsError, |
||
28 | WritePermissionsError, |
||
29 | ) |
||
30 | from pocketutils.core.input_output import OpenMode, PathLike, Writeable |
||
31 | from pocketutils.tools.path_info import PathInfo |
||
32 | from pocketutils.tools.path_tools import PathTools |
||
33 | from pocketutils.tools.sys_tools import SystemTools |
||
34 | from pocketutils.tools.unit_tools import UnitTools |
||
35 | |||
36 | logger = logging.getLogger("pocketutils") |
||
37 | COMPRESS_LEVEL = 9 |
||
38 | |||
39 | |||
40 | class FilesysTools: |
||
41 | """ |
||
42 | Tools for file/directory creation, etc. |
||
43 | |||
44 | .. caution:: |
||
45 | Some functions may be insecure. |
||
46 | """ |
||
47 | |||
48 | def get_encoding(self, encoding: str = "utf-8") -> str: |
||
49 | """ |
||
50 | Returns a text encoding from a more flexible string. |
||
51 | Ignores hyphens and lowercases the string. |
||
52 | Permits these nonstandard shorthands: |
||
53 | |||
54 | - ``"platform"``: use ``sys.getdefaultencoding()`` on the fly |
||
55 | - ``"utf8(bom)"``: use ``"utf-8-sig"`` on Windows; ``"utf-8"`` otherwise |
||
56 | - ``"utf16(bom)"``: use ``"utf-16-sig"`` on Windows; ``"utf-16"`` otherwise |
||
57 | - ``"utf32(bom)"``: use ``"utf-32-sig"`` on Windows; ``"utf-32"`` otherwise |
||
58 | """ |
||
59 | encoding = encoding.lower().replace("-", "") |
||
60 | if encoding == "platform": |
||
61 | encoding = sys.getdefaultencoding() |
||
62 | if encoding == "utf8(bom)": |
||
63 | encoding = "utf-8-sig" if os.name == "nt" else "utf-8" |
||
64 | if encoding == "utf16(bom)": |
||
65 | encoding = "utf-16-sig" if os.name == "nt" else "utf-16" |
||
66 | if encoding == "utf32(bom)": |
||
67 | encoding = "utf-32-sig" if os.name == "nt" else "utf-32" |
||
68 | return encoding |
||
69 | |||
70 | def get_encoding_errors(self, errors: str | None) -> str | None: |
||
71 | """ |
||
72 | Returns the value passed as``errors=`` in ``open``. |
||
73 | Raises: |
||
74 | ValueError: If invalid |
||
75 | """ |
||
76 | if errors is None: |
||
77 | return "strict" |
||
78 | if errors in ( |
||
79 | "strict", |
||
80 | "ignore", |
||
81 | "replace", |
||
82 | "xmlcharrefreplace", |
||
83 | "backslashreplace", |
||
84 | "namereplace", |
||
85 | "surrogateescape", |
||
86 | "surrogatepass", |
||
87 | ): |
||
88 | return errors |
||
89 | raise ValueError(f"Invalid value {errors} for errors") |
||
90 | |||
91 | @classmethod |
||
92 | def read_compressed_text(cls, path: PathLike) -> str: |
||
93 | """ |
||
94 | Reads text from a text file, optionally gzipped or bz2-ed. |
||
95 | Recognized suffixes for compression are ``.gz``, ``.gzip``, ``.bz2``, and ``.bzip2``. |
||
96 | """ |
||
97 | path = Path(path) |
||
98 | if path.name.endswith(".bz2") or path.name.endswith(".bzip2"): |
||
99 | return bz2.decompress(path.read_bytes()).decode(encoding="utf-8") |
||
100 | if path.name.endswith(".gz") or path.name.endswith(".gzip"): |
||
101 | return gzip.decompress(path.read_bytes()).decode(encoding="utf-8") |
||
102 | return Path(path).read_text(encoding="utf-8") |
||
103 | |||
104 | @classmethod |
||
105 | def write_compressed_text(cls, txt: str, path: PathLike, *, mkdirs: bool = False) -> None: |
||
106 | """ |
||
107 | Writes text to a text file, optionally gzipped or bz2-ed. |
||
108 | Recognized suffixes for compression are ``.gz``, ``.gzip``, ``.bz2``, and ``.bzip2``. |
||
109 | """ |
||
110 | path = Path(path) |
||
111 | if mkdirs: |
||
112 | path.parent.mkdir(parents=True, exist_ok=True) |
||
113 | if path.name.endswith(".bz2") or path.name.endswith(".bzip2"): |
||
114 | data = bz2.compress(txt.encode(encoding="utf-8")) |
||
115 | path.write_bytes(data) |
||
116 | elif path.name.endswith(".gz") or path.name.endswith(".gzip"): |
||
117 | data = gzip.compress(txt.encode(encoding="utf-8")) |
||
118 | path.write_bytes(data) |
||
119 | else: |
||
120 | path.write_text(txt) |
||
121 | |||
122 | @classmethod |
||
123 | def get_info( |
||
124 | cls, path: PathLike, *, expand_user: bool = False, strict: bool = False |
||
125 | ) -> PathInfo: |
||
126 | path = Path(path) |
||
127 | has_ignore_error = hasattr(pathlib, "_ignore_error") |
||
128 | if not has_ignore_error: |
||
129 | logger.debug("No _ignore_error found; some OSErrors may be suppressed") |
||
130 | resolved = None |
||
131 | real_stat = None |
||
132 | has_access = False |
||
133 | has_read = False |
||
134 | has_write = False |
||
135 | link_stat = None |
||
136 | as_of = datetime.now().astimezone() |
||
137 | if has_ignore_error or path.is_symlink() or path.exists(): |
||
138 | link_stat = cls.__stat_raw(path) |
||
139 | if link_stat is not None: |
||
140 | if expand_user: |
||
141 | resolved = path.expanduser().resolve(strict=strict) |
||
142 | else: |
||
143 | resolved = path.resolve(strict=strict) |
||
144 | if stat.S_ISLNK(link_stat.st_mode): |
||
145 | real_stat = cls.__stat_raw(resolved) |
||
146 | else: |
||
147 | real_stat = link_stat |
||
148 | has_access = os.access(path, os.X_OK, follow_symlinks=True) |
||
149 | has_read = os.access(path, os.R_OK, follow_symlinks=True) |
||
150 | has_write = os.access(path, os.W_OK, follow_symlinks=True) |
||
151 | if not stat.S_ISLNK(link_stat.st_mode): |
||
152 | link_stat = None |
||
153 | return PathInfo( |
||
154 | source=path, |
||
155 | resolved=resolved, |
||
156 | as_of=as_of, |
||
157 | real_stat=real_stat, |
||
158 | link_stat=link_stat, |
||
159 | has_access=has_access, |
||
160 | has_read=has_read, |
||
161 | has_write=has_write, |
||
162 | ) |
||
163 | |||
164 | @classmethod |
||
165 | def prep_dir(cls, path: PathLike, *, exist_ok: bool = True) -> bool: |
||
166 | """ |
||
167 | Prepares a directory by making it if it doesn't exist. |
||
168 | If exist_ok is False, calls ``logger.warning`` if ``path`` already exists |
||
169 | """ |
||
170 | path = Path(path) |
||
171 | exists = path.exists() |
||
172 | # On some platforms we get generic exceptions like permissions errors, |
||
173 | # so these are better |
||
174 | if exists and not path.is_dir(): |
||
175 | raise DirDoesNotExistError(f"Path {path} exists but is not a file") |
||
176 | if exists and not exist_ok: |
||
177 | logger.warning(f"Directory {path} already exists") |
||
178 | if not exists: |
||
179 | # NOTE! exist_ok in mkdir throws an error on Windows |
||
180 | path.mkdir(parents=True) |
||
181 | return exists |
||
182 | |||
183 | @classmethod |
||
184 | def prep_file(cls, path: PathLike, *, exist_ok: bool = True) -> None: |
||
185 | """ |
||
186 | Prepares a file path by making its parent directory. |
||
187 | Same as ``pathlib.Path.mkdir`` but makes sure ``path`` is a file if it exists. |
||
188 | """ |
||
189 | # On some platforms we get generic exceptions like permissions errors, so these are better |
||
190 | path = Path(path) |
||
191 | # check for errors first; don't make the dirs and then fail |
||
192 | if path.exists() and not path.is_file() and not path.is_symlink(): |
||
193 | raise FileDoesNotExistError(f"Path {path} exists but is not a file") |
||
194 | Path(path.parent).mkdir(parents=True, exist_ok=exist_ok) |
||
195 | |||
196 | @classmethod |
||
197 | def dump_error(cls, e: BaseException | None, path: PathLike | datetime | None = None) -> Path: |
||
198 | """ |
||
199 | Writes a .json file containing the error message, stack trace, and sys info. |
||
200 | System info is from :meth:`get_env_info`. |
||
201 | """ |
||
202 | if path is None: |
||
203 | path = f"err-dump-{cls.dt_for_filesys()}.json" |
||
204 | elif isinstance(path, datetime): |
||
205 | path = f"err-dump-{cls.dt_for_filesys(path)}.json" |
||
206 | path = Path(path) |
||
207 | data = cls.dump_error_as_dict(e) |
||
208 | data = orjson.dumps(data, option=orjson.OPT_INDENT_2) |
||
209 | path.write_bytes(data) |
||
210 | return path |
||
211 | |||
212 | @classmethod |
||
213 | def dump_error_as_dict(cls, e: BaseException | None) -> Mapping[str, Any]: |
||
214 | try: |
||
215 | system = SystemTools.get_env_info() |
||
216 | except BaseException as e2: |
||
217 | system = f"UNKNOWN << {e2} >>" |
||
218 | msg, tb = SystemTools.serialize_exception(e) |
||
219 | tb = [t.as_dict() for t in tb] |
||
220 | return dict(message=msg, stacktrace=tb, system=system) |
||
221 | |||
222 | @classmethod |
||
223 | def dt_for_filesys(cls, dt: datetime | None = None) -> str: |
||
224 | if dt is None: |
||
225 | dt = datetime.now() |
||
226 | return dt.strftime("%Y-%m-%d_%H-%M-%S") |
||
227 | |||
228 | View Code Duplication | @classmethod |
|
229 | def verify_can_read_files( |
||
230 | cls, |
||
231 | *paths: str | Path, |
||
232 | missing_ok: bool = False, |
||
233 | attempt: bool = False, |
||
234 | ) -> None: |
||
235 | """ |
||
236 | Checks that all files can be written to, to ensure atomicity before operations. |
||
237 | |||
238 | Args: |
||
239 | *paths: The files |
||
240 | missing_ok: Don't raise an error if a path doesn't exist |
||
241 | attempt: Actually try opening |
||
242 | |||
243 | Returns: |
||
244 | ReadPermissionsError: If a path is not a file (modulo existence) or doesn't have 'W' set |
||
245 | """ |
||
246 | paths = [Path(p) for p in paths] |
||
247 | for path in paths: |
||
248 | if path.exists() and not path.is_file(): |
||
249 | raise ReadPermissionsError(f"Path {path} is not a file", path=path) |
||
250 | if (not missing_ok or path.exists()) and not os.access(path, os.R_OK): |
||
251 | raise ReadPermissionsError(f"Cannot read from {path}", path=path) |
||
252 | if attempt: |
||
253 | try: |
||
254 | with open(path): |
||
255 | pass |
||
256 | except OSError: |
||
257 | raise WritePermissionsError(f"Failed to open {path} for read", key=str(path)) |
||
258 | |||
259 | View Code Duplication | @classmethod |
|
260 | def verify_can_write_files( |
||
261 | cls, |
||
262 | *paths: str | Path, |
||
263 | missing_ok: bool = False, |
||
264 | attempt: bool = False, |
||
265 | ) -> None: |
||
266 | """ |
||
267 | Checks that all files can be written to, to ensure atomicity before operations. |
||
268 | |||
269 | Args: |
||
270 | *paths: The files |
||
271 | missing_ok: Don't raise an error if a path doesn't exist |
||
272 | attempt: Actually try opening |
||
273 | |||
274 | Returns: |
||
275 | WritePermissionsError: If a path is not a file (modulo existence) or doesn't have 'W' set |
||
276 | """ |
||
277 | paths = [Path(p) for p in paths] |
||
278 | for path in paths: |
||
279 | if path.exists() and not path.is_file(): |
||
280 | raise WritePermissionsError(f"Path {path} is not a file", path=path) |
||
281 | if (not missing_ok or path.exists()) and not os.access(path, os.W_OK): |
||
282 | raise WritePermissionsError(f"Cannot write to {path}", path=path) |
||
283 | if attempt: |
||
284 | try: |
||
285 | with open(path, "a"): # or w |
||
286 | pass |
||
287 | except OSError: |
||
288 | raise WritePermissionsError(f"Failed to open {path} for write", path=path) |
||
289 | |||
290 | @classmethod |
||
291 | def verify_can_write_dirs(cls, *paths: str | Path, missing_ok: bool = False) -> None: |
||
292 | """ |
||
293 | Checks that all directories can be written to, to ensure atomicity before operations. |
||
294 | |||
295 | Args: |
||
296 | *paths: The directories |
||
297 | missing_ok: Don't raise an error if a path doesn't exist |
||
298 | |||
299 | Returns: |
||
300 | WritePermissionsError: If a path is not a directory (modulo existence) or doesn't have 'W' set |
||
301 | """ |
||
302 | paths = [Path(p) for p in paths] |
||
303 | for path in paths: |
||
304 | if path.exists() and not path.is_dir(): |
||
305 | raise WritePermissionsError(f"Path {path} is not a dir", path=(path)) |
||
306 | if missing_ok and not path.exists(): |
||
307 | continue |
||
308 | if not os.access(path, os.W_OK): |
||
309 | raise WritePermissionsError(f"{path} lacks write permission", path=path) |
||
310 | if not os.access(path, os.X_OK): |
||
311 | raise WritePermissionsError(f"{path} lacks access permission", path=path) |
||
312 | |||
313 | @classmethod |
||
314 | def delete_surefire(cls, path: PathLike) -> Exception | None: |
||
315 | """ |
||
316 | Deletes files or directories cross-platform, but working around multiple issues in Windows. |
||
317 | |||
318 | Returns: |
||
319 | None, or an Exception for minor warnings |
||
320 | |||
321 | Raises: |
||
322 | IOError: If it can't delete |
||
323 | """ |
||
324 | # we need this because of Windows |
||
325 | path = Path(path) |
||
326 | logger.debug(f"Permanently deleting {path} ...") |
||
327 | chmod_err = None |
||
328 | try: |
||
329 | os.chmod(str(path), stat.S_IRWXU) |
||
330 | except Exception as e: |
||
331 | chmod_err = e |
||
332 | # another reason for returning exception: |
||
333 | # We don't want to interrupt the current line being printed like in slow_delete |
||
334 | if path.is_dir(): |
||
335 | shutil.rmtree(str(path), ignore_errors=True) # ignore_errors because of Windows |
||
336 | try: |
||
337 | path.unlink(missing_ok=True) # again, because of Windows |
||
338 | except OSError: |
||
339 | pass # almost definitely because it doesn't exist |
||
340 | else: |
||
341 | path.unlink(missing_ok=True) |
||
342 | logger.debug(f"Permanently deleted {path}") |
||
343 | return chmod_err |
||
344 | |||
345 | @classmethod |
||
346 | def trash(cls, path: PathLike, trash_dir: PathLike | None = None) -> None: |
||
347 | """ |
||
348 | Trash a file or directory. |
||
349 | |||
350 | Args: |
||
351 | path: The path to move to the trash |
||
352 | trash_dir: If None, uses :meth:`pocketutils.tools.path_tools.PathTools.guess_trash` |
||
353 | """ |
||
354 | if trash_dir is None: |
||
355 | trash_dir = PathTools.guess_trash() |
||
356 | logger.debug(f"Trashing {path} to {trash_dir} ...") |
||
357 | shutil.move(str(path), str(trash_dir)) |
||
358 | logger.debug(f"Trashed {path} to {trash_dir}") |
||
359 | |||
360 | @classmethod |
||
361 | def try_cleanup(cls, path: Path, *, bound: type[Exception] = PermissionError) -> None: |
||
362 | """ |
||
363 | Try to delete a file (probably temp file), if it exists, and log any ``PermissionError``. |
||
364 | """ |
||
365 | path = Path(path) |
||
366 | # noinspection PyBroadException |
||
367 | try: |
||
368 | path.unlink(missing_ok=True) |
||
369 | except bound: |
||
370 | logger.error(f"Permission error preventing deleting {path}") |
||
371 | |||
372 | @classmethod |
||
373 | def read_lines_file(cls, path: PathLike, *, ignore_comments: bool = False) -> Sequence[str]: |
||
374 | """ |
||
375 | Returns a list of lines in the file. |
||
376 | Optionally skips lines starting with ``#`` or that only contain whitespace. |
||
377 | """ |
||
378 | lines = [] |
||
379 | with cls.open_file(path, "r") as f: |
||
380 | for line in f.readlines(): |
||
381 | line = line.strip() |
||
382 | if not ignore_comments or not line.startswith("#") and not len(line.strip()) == 0: |
||
383 | lines.append(line) |
||
384 | return lines |
||
385 | |||
386 | @classmethod |
||
387 | def read_properties_file(cls, path: PathLike) -> Mapping[str, str]: |
||
388 | """ |
||
389 | Reads a .properties file. |
||
390 | A list of lines with key=value pairs (with an equals sign). |
||
391 | Lines beginning with # are ignored. |
||
392 | Each line must contain exactly 1 equals sign. |
||
393 | |||
394 | .. caution:: |
||
395 | The escaping is not compliant with the standard |
||
396 | |||
397 | Args: |
||
398 | path: Read the file at this local path |
||
399 | |||
400 | Returns: |
||
401 | A dict mapping keys to values, both with surrounding whitespace stripped |
||
402 | """ |
||
403 | dct = {} |
||
404 | with cls.open_file(path, "r") as f: |
||
405 | for i, line in enumerate(f.readlines()): |
||
406 | line = line.strip() |
||
407 | if len(line) == 0 or line.startswith("#"): |
||
408 | continue |
||
409 | if line.count("=") != 1: |
||
410 | raise ParsingError(f"Bad line {i} in {path}", resource=path) |
||
411 | k, v = line.split("=") |
||
412 | k, v = k.strip(), v.strip() |
||
413 | if k in dct: |
||
414 | raise AlreadyUsedError(f"Duplicate property {k} (line {i})", key=k) |
||
415 | dct[k] = v |
||
416 | return dct |
||
417 | |||
418 | @classmethod |
||
419 | def write_properties_file( |
||
420 | cls, properties: Mapping[Any, Any], path: str | PurePath, mode: str = "o" |
||
421 | ) -> None: |
||
422 | """ |
||
423 | Writes a .properties file. |
||
424 | |||
425 | .. caution:: |
||
426 | The escaping is not compliant with the standard |
||
427 | """ |
||
428 | with FilesysTools.open_file(path, mode) as f: |
||
429 | bad_keys = [] |
||
430 | bad_values = [] |
||
431 | for k, v in properties.items(): |
||
432 | if "=" in k or "\n" in k: |
||
433 | bad_keys.append(k) |
||
434 | if "=" in v or "\n" in v: |
||
435 | bad_values.append(k) |
||
436 | f.write( |
||
437 | str(k).replace("=", "--").replace("\n", "\\n") |
||
438 | + "=" |
||
439 | + str(v).replace("=", "--").replace("\n", "\\n") |
||
440 | + "\n" |
||
441 | ) |
||
442 | if len(bad_keys) > 0: |
||
443 | logger.warning( |
||
444 | f"These keys containing '=' or \\n were escaped: {', '.join(bad_keys)}" |
||
445 | ) |
||
446 | if len(bad_values) > 0: |
||
447 | logger.warning( |
||
448 | f"These keys containing '=' or \\n were escaped: {', '.join(bad_values)}" |
||
449 | ) |
||
450 | |||
451 | @classmethod |
||
452 | def save_json(cls, data: Any, path: PathLike, mode: str = "w") -> None: |
||
453 | mode = mode.replace("t", "") |
||
454 | if "b" not in mode: |
||
455 | mode += "b" |
||
456 | with cls.open_file(path, mode) as f: |
||
457 | f.write(orjson.dumps(data)) |
||
458 | |||
459 | @classmethod |
||
460 | def load_json(cls, path: PathLike) -> dict | list: |
||
461 | return orjson.loads(Path(path).read_text(encoding="utf-8")) |
||
462 | |||
463 | @classmethod |
||
464 | def read_any( |
||
465 | cls, path: PathLike |
||
466 | ) -> ( |
||
467 | str |
||
468 | | bytes |
||
469 | | Sequence[str] |
||
470 | | Sequence[int] |
||
471 | | Sequence[float] |
||
472 | | Sequence[str] |
||
473 | | Mapping[str, str] |
||
474 | ): |
||
475 | """ |
||
476 | Reads a variety of simple formats based on filename extension. |
||
477 | Includes '.txt', 'csv', .xml', '.properties', '.json'. |
||
478 | Also reads '.data' (binary), '.lines' (text lines). |
||
479 | And formatted lists: '.strings', '.floats', and '.ints' (ex: "[1, 2, 3]"). |
||
480 | """ |
||
481 | path = Path(path) |
||
482 | ext = path.suffix.lstrip(".") |
||
483 | |||
484 | def load_list(dtype): |
||
485 | return [ |
||
486 | dtype(s) |
||
487 | for s in FilesysTools.read_lines_file(path)[0] |
||
488 | .replace(" ", "") |
||
489 | .replace("[", "") |
||
490 | .replace("]", "") |
||
491 | .split(",") |
||
492 | ] |
||
493 | |||
494 | if ext == "lines": |
||
495 | return cls.read_lines_file(path) |
||
496 | elif ext == "txt": |
||
497 | return path.read_text(encoding="utf-8") |
||
498 | elif ext == "bytes": |
||
499 | return path.read_bytes() |
||
500 | elif ext == "json": |
||
501 | return cls.load_json(path) |
||
502 | elif ext == "properties": |
||
503 | return cls.read_properties_file(path) |
||
504 | elif ext == "csv": |
||
505 | with path.open(encoding="utf-8") as f: |
||
506 | reader = csv.DictReader(f) |
||
507 | return list(reader) |
||
508 | elif ext == "ints": |
||
509 | return load_list(int) |
||
510 | elif ext == "floats": |
||
511 | return load_list(float) |
||
512 | elif ext == "strings": |
||
513 | return load_list(str) |
||
514 | elif ext == "xml": |
||
515 | ElementTree.parse(path).getroot() |
||
516 | else: |
||
517 | raise TypeError(f"Did not recognize resource file type for file {path}") |
||
518 | |||
519 | @classmethod |
||
520 | @contextmanager |
||
521 | def open_file(cls, path: PathLike, mode: OpenMode | str, *, mkdir: bool = False): |
||
522 | """ |
||
523 | Opens a text file, always using utf-8, optionally gzipped. |
||
524 | |||
525 | See Also: |
||
526 | :class:`pocketutils.core.input_output.OpenMode` |
||
527 | """ |
||
528 | path = Path(path) |
||
529 | mode = OpenMode(mode) |
||
530 | if mode.write and mkdir: |
||
531 | path.parent.mkdir(exist_ok=True, parents=True) |
||
532 | if not mode.read: |
||
533 | cls.prep_file(path, exist_ok=mode.overwrite or mode.append) |
||
534 | if path.suffix == ".gz" or path.suffix == ".gzip": |
||
535 | yield gzip.open(path, mode, compresslevel=COMPRESS_LEVEL, encoding="utf-8") |
||
536 | elif mode.binary: |
||
537 | yield open(path, mode, encoding="utf-8") |
||
538 | else: |
||
539 | yield open(path, mode, encoding="utf-8") |
||
540 | |||
541 | @classmethod |
||
542 | def write_lines(cls, iterable: Iterable[Any], path: PathLike, mode: str = "w") -> int: |
||
543 | r""" |
||
544 | Just writes an iterable line-by-line to a file, using '\n'. |
||
545 | |||
546 | Makes the parent directory if needed. |
||
547 | Checks that the iterable is a "true iterable" (not a string or bytes). |
||
548 | |||
549 | Returns: |
||
550 | The number of lines written (the same as len(iterable) if iterable has a length) |
||
551 | |||
552 | Raises: |
||
553 | FileExistsError: If the path exists and append is False |
||
554 | PathIsNotFileError: If append is True, and the path exists but is not a file |
||
555 | """ |
||
556 | n = 0 |
||
557 | with cls.open_file(path, mode) as f: |
||
558 | for x in iterable: |
||
559 | f.write(str(x) + "\n") |
||
560 | n += 1 |
||
561 | return n |
||
562 | |||
563 | @classmethod |
||
564 | def replace_in_file(cls, path: PathLike, changes: Mapping[str, str]) -> None: |
||
565 | """ |
||
566 | Uses ``regex.sub`` repeatedly to modify (AND REPLACE) a file's content. |
||
567 | """ |
||
568 | path = Path(path) |
||
569 | data = path.read_text(encoding="utf-8") |
||
570 | for key, value in changes.items(): |
||
571 | data = regex.sub(key, value, data, flags=regex.V1 | regex.MULTILINE | regex.DOTALL) |
||
572 | path.write_text(data, encoding="utf-8") |
||
573 | |||
574 | @classmethod |
||
575 | def tmp_path(cls, path: PathLike | None = None, **kwargs) -> Generator[Path, None, None]: |
||
576 | """ |
||
577 | Makes a temporary Path. Won't create ``path`` but will delete it at the end. |
||
578 | If ``path`` is None, will use ``tempfile.mkstemp``. |
||
579 | """ |
||
580 | if path is None: |
||
581 | _, path = tempfile.mkstemp() |
||
582 | try: |
||
583 | yield Path(path, **kwargs) |
||
584 | finally: |
||
585 | Path(path).unlink() |
||
586 | |||
587 | @classmethod |
||
588 | def tmp_file( |
||
589 | cls, path: PathLike | None = None, *, spooled: bool = False, **kwargs |
||
590 | ) -> Generator[Writeable, None, None]: |
||
591 | """ |
||
592 | Simple wrapper around tempfile functions. |
||
593 | Wraps ``TemporaryFile``, ``NamedTemporaryFile``, and ``SpooledTemporaryFile``. |
||
594 | """ |
||
595 | if spooled: |
||
596 | with tempfile.SpooledTemporaryFile(**kwargs) as x: |
||
597 | yield x |
||
598 | elif path is None: |
||
599 | with tempfile.TemporaryFile(**kwargs) as x: |
||
600 | yield x |
||
601 | else: |
||
602 | with tempfile.NamedTemporaryFile(str(path), **kwargs) as x: |
||
603 | yield x |
||
604 | |||
605 | @classmethod |
||
606 | def tmp_dir(cls, **kwargs) -> Generator[Path, None, None]: |
||
607 | with tempfile.TemporaryDirectory(**kwargs) as x: |
||
608 | yield Path(x) |
||
609 | |||
610 | @classmethod |
||
611 | def check_expired( |
||
612 | cls, |
||
613 | path: PathLike, |
||
614 | max_sec: timedelta | float, |
||
615 | *, |
||
616 | parent: PathLike | None = None, |
||
617 | warn_expired_fmt: str = "{path_rel} is {delta} out of date [{mod_rel}]", |
||
618 | warn_unknown_fmt: str = "{path_rel} mod date is unknown [created: {create_rel}]", |
||
619 | log: Callable[[str], Any] | None = logger.warning, |
||
620 | ) -> bool | None: |
||
621 | """ |
||
622 | Warns and returns True if ``path`` mod date is more than ``max_sec`` in the past. |
||
623 | Returns None if it could not be determined. |
||
624 | |||
625 | The formatting strings can refer to any of these (will be empty if unknown): |
||
626 | - path: Full path |
||
627 | - name: File/dir name |
||
628 | - path_rel: Path relative to ``self._dir``, or full path if not under |
||
629 | - now: formatted current datetime |
||
630 | - [mod/create]_dt: Formatted mod/creation datetime |
||
631 | - [mod/create]_rel: Mod/create datetime in terms of offset from now |
||
632 | - [mod/create]_delta: Formatted timedelta from now |
||
633 | - [mod/create]_delta_sec: Number of seconds from now (negative if now < mod/create dt) |
||
634 | |||
635 | Args: |
||
636 | path: A specific path to check |
||
637 | max_sec: Max seconds, or a timedelta |
||
638 | parent: If provided, path_rel is relative to this directory (to simplify warnings) |
||
639 | warn_expired_fmt: Formatting string in terms of the variables listed above |
||
640 | warn_unknown_fmt: Formatting string in terms of the variables listed above |
||
641 | log: Log about problems |
||
642 | |||
643 | Returns: |
||
644 | Whether it is expired, or None if it could not be determined |
||
645 | """ |
||
646 | path = Path(path) |
||
647 | if log is None: |
||
648 | |||
649 | def log(_): |
||
650 | return None |
||
651 | |||
652 | limit = max_sec if isinstance(max_sec, timedelta) else timedelta(seconds=max_sec) |
||
653 | now = datetime.now().astimezone() |
||
654 | info = FilesysTools.get_info(path) |
||
655 | if info.mod_dt and now - info.mod_dt > limit: |
||
656 | cls._warn_expired(now, info.mod_dt, info.create_dt, path, parent, warn_expired_fmt, log) |
||
657 | return True |
||
658 | elif not info.mod_dt and (not info.create_dt or (now - info.create_dt) > limit): |
||
659 | cls._warn_expired(now, info.mod_dt, info.create_dt, path, parent, warn_unknown_fmt, log) |
||
660 | return None |
||
661 | return False |
||
662 | |||
663 | @classmethod |
||
664 | def _warn_expired( |
||
665 | cls, |
||
666 | now: datetime, |
||
667 | mod: datetime | None, |
||
668 | created: datetime | None, |
||
669 | path: Path, |
||
670 | parent: Path | None, |
||
671 | fmt: str | None, |
||
672 | log: Callable[[str], Any], |
||
673 | ): |
||
674 | if isinstance(fmt, str): |
||
675 | fmt = fmt.format |
||
676 | if parent is not None and path.is_relative_to(parent): |
||
677 | path_rel = str(path.relative_to(parent)) |
||
678 | else: |
||
679 | path_rel = str(path) |
||
680 | now_str, mod_str, mod_rel, mod_delta, mod_delta_sec = cls._expire_warning_info(now, mod) |
||
681 | _, create_str, create_rel, create_delta, create_delta_sec = cls._expire_warning_info( |
||
682 | now, created |
||
683 | ) |
||
684 | msg = fmt( |
||
685 | path=path, |
||
686 | path_rel=path_rel, |
||
687 | name=path.name, |
||
688 | now=now_str, |
||
689 | mod_dt=mod_str, |
||
690 | mod_rel=mod_rel, |
||
691 | mod_delta=mod_delta, |
||
692 | mod_sec=mod_delta_sec, |
||
693 | create_dt=create_str, |
||
694 | create_rel=create_rel, |
||
695 | create_delta=create_delta, |
||
696 | create_sec=create_delta_sec, |
||
697 | ) |
||
698 | log(msg) |
||
699 | |||
700 | @classmethod |
||
701 | def _expire_warning_info( |
||
702 | cls, now: datetime, then: datetime | None |
||
703 | ) -> tuple[str, str, str, str, str]: |
||
704 | now_str = now.strftime("%Y-%m-%d %H:%M:%S") |
||
705 | if then is None: |
||
706 | return now_str, "", "", "", "" |
||
707 | delta = now - then |
||
708 | then_str = then.strftime("%Y-%m-%d %H:%M:%S") |
||
709 | then_rel = UnitTools.approx_time_wrt(now, then) |
||
710 | delta_str = UnitTools.delta_time_to_str(delta, space=Chars.narrownbsp) |
||
711 | return now_str, then_str, then_rel, delta_str, str(delta.total_seconds()) |
||
712 | |||
713 | @classmethod |
||
714 | def __stat_raw(cls, path: Path) -> os.stat_result | None: |
||
715 | try: |
||
716 | return path.lstat() |
||
717 | except OSError as e: |
||
718 | if hasattr(pathlib, "_ignore_error") and not pathlib._ignore_error(e): |
||
719 | raise |
||
720 | return None |
||
721 | |||
722 | |||
723 | __all__ = ["FilesysTools", "PathInfo"] |
||
724 |