Total Complexity | 59 |
Total Lines | 246 |
Duplicated Lines | 26.83 % |
Changes | 0 |
Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.
Common duplication problems, and corresponding solutions are:
Complex classes like typeddfs.utils.io_utils often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
1 | # SPDX-FileCopyrightText: Copyright 2020-2023, Contributors to typed-dfs |
||
2 | # SPDX-PackageHomePage: https://github.com/dmyersturnbull/typed-dfs |
||
3 | # SPDX-License-Identifier: Apache-2.0 |
||
4 | """ |
||
5 | Tools for IO. |
||
6 | """ |
||
7 | from __future__ import annotations |
||
8 | |||
9 | import os |
||
10 | import sys |
||
11 | from datetime import datetime |
||
12 | from pathlib import Path, PurePath |
||
13 | from typing import TYPE_CHECKING, Any |
||
14 | |||
15 | from pandas.io.common import get_handle |
||
16 | |||
17 | from typeddfs.df_errors import ( |
||
18 | ReadPermissionsError, |
||
19 | UnsupportedOperationError, |
||
20 | WritePermissionsError, |
||
21 | ) |
||
22 | from typeddfs.file_formats import CompressionFormat, FileFormat |
||
23 | from typeddfs.utils._utils import PathLike |
||
24 | |||
25 | if TYPE_CHECKING: |
||
26 | from pandas._typing import BaseBuffer, FilePath |
||
27 | |||
28 | |||
29 | class IoUtils: |
||
30 | View Code Duplication | @classmethod |
|
|
|||
31 | def verify_can_read_files( |
||
32 | cls, |
||
33 | *paths: str | Path, |
||
34 | missing_ok: bool = False, |
||
35 | attempt: bool = False, |
||
36 | ) -> None: |
||
37 | """ |
||
38 | Checks that all files can be written to, to ensure atomicity before operations. |
||
39 | |||
40 | Args: |
||
41 | *paths: The files |
||
42 | missing_ok: Don't raise an error if a path doesn't exist |
||
43 | attempt: Actually try opening |
||
44 | |||
45 | Returns: |
||
46 | ReadPermissionsError: If a path is not a file (modulo existence) or doesn't have 'W' set |
||
47 | """ |
||
48 | paths = [Path(p) for p in paths] |
||
49 | for path in paths: |
||
50 | if path.exists() and not path.is_file(): |
||
51 | msg = f"Path {path} is not a file" |
||
52 | raise ReadPermissionsError(msg, key=str(path)) |
||
53 | if (not missing_ok or path.exists()) and not os.access(path, os.R_OK): |
||
54 | msg = f"Cannot read from {path}" |
||
55 | raise ReadPermissionsError(msg, key=str(path)) |
||
56 | if attempt: |
||
57 | try: |
||
58 | with path.open(): |
||
59 | pass |
||
60 | except OSError as e: |
||
61 | msg = f"Failed to open {path} for read" |
||
62 | raise WritePermissionsError(msg, key=str(path)) from e |
||
63 | |||
64 | View Code Duplication | @classmethod |
|
65 | def verify_can_write_files( |
||
66 | cls, |
||
67 | *paths: str | Path, |
||
68 | missing_ok: bool = False, |
||
69 | attempt: bool = False, |
||
70 | ) -> None: |
||
71 | """ |
||
72 | Checks that all files can be written to, to ensure atomicity before operations. |
||
73 | |||
74 | Args: |
||
75 | *paths: The files |
||
76 | missing_ok: Don't raise an error if a path doesn't exist |
||
77 | attempt: Actually try opening |
||
78 | |||
79 | Returns: |
||
80 | WritePermissionsError: If a path is not a file (modulo existence) or doesn't have 'W' set |
||
81 | """ |
||
82 | paths = [Path(p) for p in paths] |
||
83 | for path in paths: |
||
84 | if path.exists() and not path.is_file(): |
||
85 | msg = f"Path {path} is not a file" |
||
86 | raise WritePermissionsError(msg, key=str(path)) |
||
87 | if (not missing_ok or path.exists()) and not os.access(path, os.W_OK): |
||
88 | msg = f"Cannot write to {path}" |
||
89 | raise WritePermissionsError(msg, key=str(path)) |
||
90 | if attempt: |
||
91 | try: |
||
92 | with path.open("a"): # or w |
||
93 | pass |
||
94 | except OSError as e: |
||
95 | msg = f"Failed to open {path} for write" |
||
96 | raise WritePermissionsError(msg, key=str(path)) from e |
||
97 | |||
98 | @classmethod |
||
99 | def verify_can_write_dirs(cls, *paths: str | Path, missing_ok: bool = False) -> None: |
||
100 | """ |
||
101 | Checks that all directories can be written to, to ensure atomicity before operations. |
||
102 | |||
103 | Args: |
||
104 | *paths: The directories |
||
105 | missing_ok: Don't raise an error if a path doesn't exist |
||
106 | |||
107 | Returns: |
||
108 | WritePermissionsError: If a path is not a directory (modulo existence) or doesn't have 'W' set |
||
109 | """ |
||
110 | paths = [Path(p) for p in paths] |
||
111 | for path in paths: |
||
112 | if path.exists() and not path.is_dir(): |
||
113 | msg = f"Path {path} is not a dir" |
||
114 | raise WritePermissionsError(msg, key=str(path)) |
||
115 | if missing_ok and not path.exists(): |
||
116 | continue |
||
117 | if not os.access(path, os.W_OK): |
||
118 | msg = f"{path} lacks write permission" |
||
119 | raise WritePermissionsError(msg, key=str(path)) |
||
120 | if not os.access(path, os.X_OK): |
||
121 | msg = f"{path} lacks access permission" |
||
122 | raise WritePermissionsError(msg, key=str(path)) |
||
123 | |||
124 | @classmethod |
||
125 | def write( |
||
126 | cls, |
||
127 | path_or_buff: FilePath | BaseBuffer, |
||
128 | content, |
||
129 | *, |
||
130 | mode: str = "w", |
||
131 | atomic: bool = False, |
||
132 | compression_kwargs: dict[str, Any] | None = None, |
||
133 | **kwargs, |
||
134 | ) -> str | None: |
||
135 | """ |
||
136 | Writes using Pandas's ``get_handle``. |
||
137 | By default (unless ``compression=`` is set), infers the compression type from the filename suffix |
||
138 | (e.g. ``.csv.gz``). |
||
139 | """ |
||
140 | if compression_kwargs is None: |
||
141 | compression_kwargs = {} |
||
142 | if atomic and "a" in mode: |
||
143 | msg = "Can't append in atomic write" |
||
144 | raise UnsupportedOperationError(msg) |
||
145 | if path_or_buff is None: |
||
146 | return content |
||
147 | compression = cls.path_or_buff_compression(path_or_buff, kwargs) |
||
148 | kwargs = {**kwargs, "compression": compression.pandas_value} |
||
149 | if atomic and isinstance(path_or_buff, PathLike): |
||
150 | path = Path(path_or_buff) |
||
151 | tmp = cls.tmp_path(path) |
||
152 | with get_handle(tmp, mode, **kwargs) as f: |
||
153 | f.handle.write(content) |
||
154 | os.replace(tmp, path) |
||
155 | with get_handle(path_or_buff, mode, **kwargs) as f: |
||
156 | f.handle.write(content) |
||
157 | |||
158 | @classmethod |
||
159 | def read(cls, path_or_buff, *, mode: str = "r", **kwargs) -> str: |
||
160 | """ |
||
161 | Reads using Pandas's ``get_handle``. |
||
162 | By default (unless ``compression=`` is set), infers the compression type from the filename suffix. |
||
163 | (e.g. ``.csv.gz``). |
||
164 | """ |
||
165 | compression = cls.path_or_buff_compression(path_or_buff, kwargs) |
||
166 | kwargs = {**kwargs, "compression": compression.pandas_value} |
||
167 | with get_handle(path_or_buff, mode, **kwargs) as f: |
||
168 | return f.handle.read() |
||
169 | |||
170 | @classmethod |
||
171 | def path_or_buff_compression(cls, path_or_buff, kwargs) -> CompressionFormat: |
||
172 | if "compression" in kwargs: |
||
173 | return CompressionFormat.of(kwargs["compression"]) |
||
174 | elif isinstance(path_or_buff, PurePath | str): |
||
175 | return CompressionFormat.from_path(path_or_buff) |
||
176 | return CompressionFormat.none |
||
177 | |||
178 | @classmethod |
||
179 | def is_binary(cls, path: PathLike) -> bool: |
||
180 | path = Path(path) |
||
181 | if CompressionFormat.from_path(path).is_compressed: |
||
182 | return True |
||
183 | return FileFormat.from_path(path).is_binary |
||
184 | |||
185 | @classmethod |
||
186 | def tmp_path(cls, path: PathLike, extra: str = "tmp") -> Path: |
||
187 | now = datetime.now().isoformat(timespec="ns").replace(":", "").replace("-", "") |
||
188 | path = Path(path) |
||
189 | suffix = "".join(path.suffixes) |
||
190 | return path.parent / (".__" + extra + "." + now + suffix) |
||
191 | |||
192 | @classmethod |
||
193 | def get_encoding(cls, encoding: str = "utf-8") -> str: |
||
194 | """ |
||
195 | Returns a text encoding from a more flexible string. |
||
196 | Ignores hyphens and lowercases the string. |
||
197 | Permits these nonstandard shorthands: |
||
198 | |||
199 | - ``"platform"``: use ``sys.getdefaultencoding()`` on the fly |
||
200 | - ``"utf8(bom)"``: use ``"utf-8-sig"`` on Windows; ``"utf-8"`` otherwise |
||
201 | - ``"utf16(bom)"``: use ``"utf-16-sig"`` on Windows; ``"utf-16"`` otherwise |
||
202 | - ``"utf32(bom)"``: use ``"utf-32-sig"`` on Windows; ``"utf-32"`` otherwise |
||
203 | """ |
||
204 | e = encoding.lower().replace("-", "") |
||
205 | if e == "platform": |
||
206 | return sys.getdefaultencoding() |
||
207 | if e == "utf8(bom)": |
||
208 | return "utf-8-sig" if os.name == "nt" else "utf-8" |
||
209 | if e == "utf16(bom)": |
||
210 | return "utf-16-sig" if os.name == "nt" else "utf-16" |
||
211 | if e == "utf32(bom)": |
||
212 | return "utf-32-sig" if os.name == "nt" else "utf-32" |
||
213 | if e in {"utf8", "utf-8"}: |
||
214 | return "utf-8" |
||
215 | if e in {"utf16", "utf-16"}: |
||
216 | return "utf-16" |
||
217 | if e in {"utf32", "utf-32"}: |
||
218 | return "utf-32" |
||
219 | return encoding |
||
220 | |||
221 | @classmethod |
||
222 | def get_encoding_errors(cls, errors: str | None) -> str | None: |
||
223 | """ |
||
224 | Returns the value passed as``errors=`` in ``open``. |
||
225 | Raises: |
||
226 | ValueError: If invalid |
||
227 | """ |
||
228 | if errors is None: |
||
229 | return "strict" |
||
230 | if errors in ( |
||
231 | "strict", |
||
232 | "ignore", |
||
233 | "replace", |
||
234 | "xmlcharrefreplace", |
||
235 | "backslashreplace", |
||
236 | "namereplace", |
||
237 | "surrogateescape", |
||
238 | "surrogatepass", |
||
239 | ): |
||
240 | return errors |
||
241 | msg = f"Invalid value {errors} for errors" |
||
242 | raise ValueError(msg) |
||
243 | |||
244 | |||
245 | __all__ = ["IoUtils"] |
||
246 |