typeddfs.utils.io_utils   D
last analyzed

Complexity

Total Complexity 59

Size/Duplication

Total Lines 246
Duplicated Lines 26.83 %

Importance

Changes 0
Metric Value
eloc 161
dl 66
loc 246
rs 4.08
c 0
b 0
f 0
wmc 59

10 Methods

Rating   Name   Duplication   Size   Complexity  
A IoUtils.path_or_buff_compression() 0 7 3
C IoUtils.write() 0 33 9
A IoUtils.read() 0 11 2
A IoUtils.is_binary() 0 6 2
A IoUtils.get_encoding_errors() 0 22 3
C IoUtils.verify_can_read_files() 33 33 10
B IoUtils.verify_can_write_dirs() 0 25 8
C IoUtils.verify_can_write_files() 33 33 10
C IoUtils.get_encoding() 0 28 11
A IoUtils.tmp_path() 0 6 1

How to fix   Duplicated Code    Complexity   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

Complexity

 Tip:   Before tackling complexity, make sure that you eliminate any duplication first. This often can reduce the size of classes significantly.

Complex classes like typeddfs.utils.io_utils often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# SPDX-FileCopyrightText: Copyright 2020-2023, Contributors to typed-dfs
2
# SPDX-PackageHomePage: https://github.com/dmyersturnbull/typed-dfs
3
# SPDX-License-Identifier: Apache-2.0
4
"""
5
Tools for IO.
6
"""
7
from __future__ import annotations
8
9
import os
10
import sys
11
from datetime import datetime
12
from pathlib import Path, PurePath
13
from typing import TYPE_CHECKING, Any
14
15
from pandas.io.common import get_handle
16
17
from typeddfs.df_errors import (
18
    ReadPermissionsError,
19
    UnsupportedOperationError,
20
    WritePermissionsError,
21
)
22
from typeddfs.file_formats import CompressionFormat, FileFormat
23
from typeddfs.utils._utils import PathLike
24
25
if TYPE_CHECKING:
26
    from pandas._typing import BaseBuffer, FilePath
27
28
29
class IoUtils:
30 View Code Duplication
    @classmethod
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
31
    def verify_can_read_files(
32
        cls,
33
        *paths: str | Path,
34
        missing_ok: bool = False,
35
        attempt: bool = False,
36
    ) -> None:
37
        """
38
        Checks that all files can be written to, to ensure atomicity before operations.
39
40
        Args:
41
            *paths: The files
42
            missing_ok: Don't raise an error if a path doesn't exist
43
            attempt: Actually try opening
44
45
        Returns:
46
            ReadPermissionsError: If a path is not a file (modulo existence) or doesn't have 'W' set
47
        """
48
        paths = [Path(p) for p in paths]
49
        for path in paths:
50
            if path.exists() and not path.is_file():
51
                msg = f"Path {path} is not a file"
52
                raise ReadPermissionsError(msg, key=str(path))
53
            if (not missing_ok or path.exists()) and not os.access(path, os.R_OK):
54
                msg = f"Cannot read from {path}"
55
                raise ReadPermissionsError(msg, key=str(path))
56
            if attempt:
57
                try:
58
                    with path.open():
59
                        pass
60
                except OSError as e:
61
                    msg = f"Failed to open {path} for read"
62
                    raise WritePermissionsError(msg, key=str(path)) from e
63
64 View Code Duplication
    @classmethod
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
65
    def verify_can_write_files(
66
        cls,
67
        *paths: str | Path,
68
        missing_ok: bool = False,
69
        attempt: bool = False,
70
    ) -> None:
71
        """
72
        Checks that all files can be written to, to ensure atomicity before operations.
73
74
        Args:
75
            *paths: The files
76
            missing_ok: Don't raise an error if a path doesn't exist
77
            attempt: Actually try opening
78
79
        Returns:
80
            WritePermissionsError: If a path is not a file (modulo existence) or doesn't have 'W' set
81
        """
82
        paths = [Path(p) for p in paths]
83
        for path in paths:
84
            if path.exists() and not path.is_file():
85
                msg = f"Path {path} is not a file"
86
                raise WritePermissionsError(msg, key=str(path))
87
            if (not missing_ok or path.exists()) and not os.access(path, os.W_OK):
88
                msg = f"Cannot write to {path}"
89
                raise WritePermissionsError(msg, key=str(path))
90
            if attempt:
91
                try:
92
                    with path.open("a"):  # or w
93
                        pass
94
                except OSError as e:
95
                    msg = f"Failed to open {path} for write"
96
                    raise WritePermissionsError(msg, key=str(path)) from e
97
98
    @classmethod
99
    def verify_can_write_dirs(cls, *paths: str | Path, missing_ok: bool = False) -> None:
100
        """
101
        Checks that all directories can be written to, to ensure atomicity before operations.
102
103
        Args:
104
            *paths: The directories
105
            missing_ok: Don't raise an error if a path doesn't exist
106
107
        Returns:
108
            WritePermissionsError: If a path is not a directory (modulo existence) or doesn't have 'W' set
109
        """
110
        paths = [Path(p) for p in paths]
111
        for path in paths:
112
            if path.exists() and not path.is_dir():
113
                msg = f"Path {path} is not a dir"
114
                raise WritePermissionsError(msg, key=str(path))
115
            if missing_ok and not path.exists():
116
                continue
117
            if not os.access(path, os.W_OK):
118
                msg = f"{path} lacks write permission"
119
                raise WritePermissionsError(msg, key=str(path))
120
            if not os.access(path, os.X_OK):
121
                msg = f"{path} lacks access permission"
122
                raise WritePermissionsError(msg, key=str(path))
123
124
    @classmethod
125
    def write(
126
        cls,
127
        path_or_buff: FilePath | BaseBuffer,
128
        content,
129
        *,
130
        mode: str = "w",
131
        atomic: bool = False,
132
        compression_kwargs: dict[str, Any] | None = None,
133
        **kwargs,
134
    ) -> str | None:
135
        """
136
        Writes using Pandas's ``get_handle``.
137
        By default (unless ``compression=`` is set), infers the compression type from the filename suffix
138
        (e.g. ``.csv.gz``).
139
        """
140
        if compression_kwargs is None:
141
            compression_kwargs = {}
142
        if atomic and "a" in mode:
143
            msg = "Can't append in atomic write"
144
            raise UnsupportedOperationError(msg)
145
        if path_or_buff is None:
146
            return content
147
        compression = cls.path_or_buff_compression(path_or_buff, kwargs)
148
        kwargs = {**kwargs, "compression": compression.pandas_value}
149
        if atomic and isinstance(path_or_buff, PathLike):
150
            path = Path(path_or_buff)
151
            tmp = cls.tmp_path(path)
152
            with get_handle(tmp, mode, **kwargs) as f:
153
                f.handle.write(content)
154
                os.replace(tmp, path)
155
        with get_handle(path_or_buff, mode, **kwargs) as f:
156
            f.handle.write(content)
157
158
    @classmethod
159
    def read(cls, path_or_buff, *, mode: str = "r", **kwargs) -> str:
160
        """
161
        Reads using Pandas's ``get_handle``.
162
        By default (unless ``compression=`` is set), infers the compression type from the filename suffix.
163
        (e.g. ``.csv.gz``).
164
        """
165
        compression = cls.path_or_buff_compression(path_or_buff, kwargs)
166
        kwargs = {**kwargs, "compression": compression.pandas_value}
167
        with get_handle(path_or_buff, mode, **kwargs) as f:
168
            return f.handle.read()
169
170
    @classmethod
171
    def path_or_buff_compression(cls, path_or_buff, kwargs) -> CompressionFormat:
172
        if "compression" in kwargs:
173
            return CompressionFormat.of(kwargs["compression"])
174
        elif isinstance(path_or_buff, PurePath | str):
175
            return CompressionFormat.from_path(path_or_buff)
176
        return CompressionFormat.none
177
178
    @classmethod
179
    def is_binary(cls, path: PathLike) -> bool:
180
        path = Path(path)
181
        if CompressionFormat.from_path(path).is_compressed:
182
            return True
183
        return FileFormat.from_path(path).is_binary
184
185
    @classmethod
186
    def tmp_path(cls, path: PathLike, extra: str = "tmp") -> Path:
187
        now = datetime.now().isoformat(timespec="ns").replace(":", "").replace("-", "")
188
        path = Path(path)
189
        suffix = "".join(path.suffixes)
190
        return path.parent / (".__" + extra + "." + now + suffix)
191
192
    @classmethod
193
    def get_encoding(cls, encoding: str = "utf-8") -> str:
194
        """
195
        Returns a text encoding from a more flexible string.
196
        Ignores hyphens and lowercases the string.
197
        Permits these nonstandard shorthands:
198
199
          - ``"platform"``: use ``sys.getdefaultencoding()`` on the fly
200
          - ``"utf8(bom)"``: use ``"utf-8-sig"`` on Windows; ``"utf-8"`` otherwise
201
          - ``"utf16(bom)"``: use ``"utf-16-sig"`` on Windows; ``"utf-16"`` otherwise
202
          - ``"utf32(bom)"``: use ``"utf-32-sig"`` on Windows; ``"utf-32"`` otherwise
203
        """
204
        e = encoding.lower().replace("-", "")
205
        if e == "platform":
206
            return sys.getdefaultencoding()
207
        if e == "utf8(bom)":
208
            return "utf-8-sig" if os.name == "nt" else "utf-8"
209
        if e == "utf16(bom)":
210
            return "utf-16-sig" if os.name == "nt" else "utf-16"
211
        if e == "utf32(bom)":
212
            return "utf-32-sig" if os.name == "nt" else "utf-32"
213
        if e in {"utf8", "utf-8"}:
214
            return "utf-8"
215
        if e in {"utf16", "utf-16"}:
216
            return "utf-16"
217
        if e in {"utf32", "utf-32"}:
218
            return "utf-32"
219
        return encoding
220
221
    @classmethod
222
    def get_encoding_errors(cls, errors: str | None) -> str | None:
223
        """
224
        Returns the value passed as``errors=`` in ``open``.
225
        Raises:
226
            ValueError: If invalid
227
        """
228
        if errors is None:
229
            return "strict"
230
        if errors in (
231
            "strict",
232
            "ignore",
233
            "replace",
234
            "xmlcharrefreplace",
235
            "backslashreplace",
236
            "namereplace",
237
            "surrogateescape",
238
            "surrogatepass",
239
        ):
240
            return errors
241
        msg = f"Invalid value {errors} for errors"
242
        raise ValueError(msg)
243
244
245
__all__ = ["IoUtils"]
246