pocketutils.core.smartio.AbstractSmartIo.write()   C
last analyzed

Complexity

Conditions 9

Size

Total Lines 25
Code Lines 23

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 23
dl 0
loc 25
rs 6.6666
c 0
b 0
f 0
cc 9
nop 7
1
# SPDX-FileCopyrightText: Copyright 2020-2023, Contributors to pocketutils
2
# SPDX-PackageHomePage: https://github.com/dmyersturnbull/pocketutils
3
# SPDX-License-Identifier: Apache-2.0
4
"""
5
Compression-aware reading and writing of files.
6
"""
7
8
from __future__ import annotations
9
10
import abc
11
import bz2
12
import gzip
13
import lzma
14
import os
15
from dataclasses import dataclass
16
from datetime import UTC, datetime
17
from pathlib import Path, PurePath
18
from typing import TYPE_CHECKING, Any, Self, TypeVar
19
20
from pocketutils.core.exceptions import AccessDeniedError, KeyReusedError, PathExistsError
21
22
if TYPE_CHECKING:
23
    from collections.abc import Callable, Iterable, Mapping
24
25
26
PathLike = str | PurePath
27
T = TypeVar("T")
28
29
30
@dataclass(frozen=True, slots=True)
31
class CompressedPath:
32
    parent: Path
33
    stem: str
34
    suffix: str
35
36
37
@dataclass(frozen=True, slots=True)
38
class Compression:
39
    name: str
40
    suffixes: list[str]
41
    compress: Callable[[bytes], bytes]
0 ignored issues
show
introduced by
The variable Callable does not seem to be defined in case TYPE_CHECKING on line 22 is False. Are you sure this can never be the case?
Loading history...
42
    decompress: Callable[[bytes], bytes]
43
44
    def split_path(self: Self, path: PurePath | str) -> CompressedPath:
45
        path = Path(path)
46
        for suffix in self.suffixes:
47
            if path.suffix == suffix:
48
                return CompressedPath(path.parent, path.stem, suffix)
49
        return CompressedPath(path.parent, path.stem, "")
50
51
    def compress_file(self: Self, source: PurePath | str, dest: PurePath | str, atomic: bool = False) -> None:
52
        source = Path(source)
53
        dest = Path(dest)
54
        temp = dest.parent / ("~" + dest.name + ".part") if atomic else dest
55
        data = self.compress(source.read_bytes())
56
        temp.write_bytes(data)
57
        if atomic:
58
            temp.rename(dest)
59
60
    def decompress_file(self: Self, source: PurePath | str, dest: PurePath | str, atomic: bool = False) -> None:
61
        source = Path(source)
62
        dest = Path(dest)
63
        temp = dest.parent / ("~" + dest.name + ".part") if atomic else dest
64
        data = self.decompress(source.read_bytes())
65
        temp.write_bytes(data)
66
        if atomic:
67
            temp.rename(dest)
68
69
70
def identity(x: T) -> T:
71
    return x
72
73
74
@dataclass(frozen=True, slots=True)
75
class CompressionSet:
76
    mapping: dict[str, Compression]
77
78
    @classmethod
79
    def empty(cls: type[Self]) -> Self:
80
        return CompressionSet({"": Compression("", [], identity, identity)})
81
82
    def __add__(self: Self, fmt: Compression):
83
        new = {fmt.name: fmt} | {s: fmt for s in fmt.suffixes}
84
        already = {v for k, v in self.mapping.items() if k in new}
85
        if len(already) > 1 or len(already) == 1 and already != {fmt}:
86
            msg = f"Keys from {fmt} already mapped to {already}"
87
            raise KeyReusedError(msg, key=fmt.name, original_value=already)
88
        return CompressionSet(self.mapping | new)
89
90
    def __sub__(self: Self, fmt: Compression) -> CompressionSet:
91
        return CompressionSet(
92
            {k: v for k, v in self.mapping.items() if k != fmt.name and k not in fmt.suffixes},
93
        )
94
95
    def __or__(self: Self, fmt: CompressionSet) -> CompressionSet:
96
        return CompressionSet(self.mapping | fmt.mapping)
97
98
    def __getitem__(self: Self, t: Compression | str) -> Compression:
99
        """
100
        Returns a FileFormat from a name (e.g. "gz" or "gzip").
101
        Case-insensitive.
102
103
        Example:
104
105
            Compression.of("gzip").suffix  # ".gz"
106
        """
107
        if isinstance(t, Compression):
108
            return t
109
        return self.mapping[t]
110
111
    def guess(self: Self, path: PathLike) -> Compression:
112
        if "." not in path.name:
113
            return self[""]
114
        try:
115
            return self[path.suffix]
116
        except KeyError:
117
            return self[""]
118
119
120
@dataclass(frozen=True, slots=True)
121
class AbstractSmartIo(metaclass=abc.ABCMeta):
122
    _compressions: CompressionSet | None = None
123
124
    @property
125
    def mapping(self: Self) -> Mapping[str, Compression]:
126
        return self.compressions.mapping
127
128
    @property
129
    def compressions(self: Self) -> CompressionSet:
130
        if self._compressions is None:
131
            self._compressions = self._new_compression_list()
132
        return self._compressions
133
134
    @property
135
    def all_suffixes(self: Self) -> Iterable[str]:
136
        for c in self.compressions:
137
            yield from c.suffixes
138
139
    def _new_compression_list(self: Self) -> CompressionSet:
140
        raise NotImplementedError()
141
142
    def write(
143
        self: Self,
144
        data: Any,
145
        path: PathLike,
146
        *,
147
        atomic: bool = False,
148
        mkdirs: bool = False,
149
        exist_ok: bool = False,
150
    ) -> None:
151
        path = Path(path)
152
        compressed = self.compressions.guess(path).compress(data)
153
        if path.exists() and not path.is_file():
154
            raise PathExistsError(filename=str(path))
155
        if path.exists() and not exist_ok:
156
            raise PathExistsError(filename=str(path))
157
        if path.exists() and not os.access(path, os.W_OK):
158
            raise AccessDeniedError(filename=str(path))
159
        if mkdirs:
160
            path.parent.mkdir(parents=True, exist_ok=True)
161
        if atomic:
162
            tmp = self.tmp_path(path)
163
            path.write_bytes(compressed)
164
            tmp.rename(path)
165
        else:
166
            path.write_bytes(compressed)
167
168
    def read_text(self: Self, path: PathLike, encoding: str = "utf-8") -> str:
169
        """
170
        Similar to :meth:`read_bytes`, but then converts to UTF-8.
171
        """
172
        return self.read_bytes(path).decode(encoding=encoding)
173
174
    def read_bytes(self: Self, path: PathLike) -> bytes:
175
        """
176
        Reads, decompressing according to the filename suffix.
177
        """
178
        data = Path(path).read_bytes()
179
        return self.compressions.guess(path).decompress(data)
180
181
    def tmp_path(self: Self, path: PathLike, extra: str = "tmp") -> Path:
182
        now = datetime.now(tz=UTC).isoformat(timespec="microsecond")
183
        now = now.replace(":", "").replace("-", "")
184
        path = Path(path)
185
        suffix = "".join(path.suffixes)
186
        return path.parent / f".part_{extra}.{now}{suffix}"
187
188
189
@dataclass(frozen=True, slots=True)
190
class SmartIoUtil(AbstractSmartIo, metaclass=abc.ABCMeta):
191
    def _new_compression_list(self: Self) -> CompressionSet:
192
        import brotli
193
        import lz4.frame
194
        import snappy
195
        import zstandard
196
197
        return (
198
            CompressionSet.empty()
199
            + Compression("gzip", [".gz", ".gzip"], gzip.compress, gzip.decompress)
200
            + Compression("brotli", [".br", ".brotli"], brotli.compress, brotli.decompress)
201
            + Compression("zstandard", [".zst", ".zstd"], zstandard.compress, zstandard.decompress)
202
            + Compression("lz4", [".lz4"], lz4.frame.compress, lz4.frame.decompress)
203
            + Compression("snappy", [".snappy"], snappy.compress, snappy.decompress)
204
            + Compression("bzip2", [".bz2", ".bzip2"], bz2.compress, bz2.decompress)
205
            + Compression("xz", [".xz"], lzma.compress, lzma.decompress)
206
            + Compression("lzma", [".lzma"], lzma.compress, lzma.decompress)
207
        )
208
209
210
SmartIo = SmartIoUtil()
211
212
__all__ = ["Compression", "CompressionSet", "SmartIo", "SmartIoUtil"]
213