Passed
Push — main ( af1065...15d22f )
by Douglas
04:30
created

pocketutils.misc.smartio.SmartIo.write()   B

Complexity

Conditions 7

Size

Total Lines 18
Code Lines 16

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 7
eloc 16
nop 6
dl 0
loc 18
rs 8
c 0
b 0
f 0
1
"""
2
Compression-aware reading and writing of files.
3
"""
4
from __future__ import annotations
5
6
import abc
7
import bz2
8
import gzip
9
import lzma
10
import os
11
from collections.abc import Callable
12
from dataclasses import dataclass
13
from datetime import datetime
14
from pathlib import Path, PurePath
15
from typing import Any, Self
0 ignored issues
show
Bug introduced by
The name Self does not seem to exist in module typing.
Loading history...
16
17
import brotli
0 ignored issues
show
introduced by
Unable to import 'brotli'
Loading history...
18
import lz4.frame
0 ignored issues
show
introduced by
Unable to import 'lz4.frame'
Loading history...
19
import snappy
0 ignored issues
show
introduced by
Unable to import 'snappy'
Loading history...
20
import zstandard
0 ignored issues
show
introduced by
Unable to import 'zstandard'
Loading history...
21
22
from pocketutils import WritePermissionsError
23
24
PathLike = str | PurePath
25
26
27
@dataclass(frozen=True)
0 ignored issues
show
introduced by
Missing class docstring
Loading history...
28
class Compression(metaclass=abc.ABCMeta):
29
    name: str
30
    suffixes: set[str]
31
    compress: Callable[[bytes], bytes]
32
    decompress: Callable[[bytes], bytes]
33
34
35
def identity(x):
0 ignored issues
show
Coding Style Naming introduced by
Argument name "x" doesn't conform to snake_case naming style ('([^\\W\\dA-Z][^\\WA-Z]2,|_[^\\WA-Z]*|__[^\\WA-Z\\d_][^\\WA-Z]+__)$' pattern)

This check looks for invalid names for a range of different identifiers.

You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements.

If your project includes a Pylint configuration file, the settings contained in that file take precedence.

To find out more about Pylint, please refer to their site.

Loading history...
introduced by
Missing function or method docstring
Loading history...
36
    return x
37
38
39
@dataclass(frozen=True)
0 ignored issues
show
introduced by
Missing class docstring
Loading history...
40
class CompressionSet:
41
    mapping: dict[str, Compression]
0 ignored issues
show
introduced by
Value 'dict' is unsubscriptable
Loading history...
42
43
    @classmethod
44
    def empty(cls) -> Self:
0 ignored issues
show
introduced by
Missing function or method docstring
Loading history...
45
        return CompressionSet({"": Compression("", set(), identity, identity)})
46
47
    def __add__(self, fmt: Compression):
48
        new = {fmt.name: fmt} | {s: fmt for s in fmt.suffixes}
49
        already = {v for k, v in self.mapping.items() if k in new}
50
        if len(already) > 1 or len(already) == 1 and already != {fmt}:
51
            raise ValueError(f"Keys from {fmt} already mapped to {already}")
52
        return CompressionSet(self.mapping | new)
53
54
    def __sub__(self, fmt: Compression):
55
        return CompressionSet(
56
            {k: v for k, v in self.mapping.items() if k != fmt.name and k not in fmt.suffixes}
57
        )
58
59
    def __or__(self, fmt: CompressionSet):
60
        return CompressionSet(self.mapping | fmt.mapping)
61
62
    def __getitem__(self, t: Compression | str) -> Compression:
0 ignored issues
show
Coding Style Naming introduced by
Argument name "t" doesn't conform to snake_case naming style ('([^\\W\\dA-Z][^\\WA-Z]2,|_[^\\WA-Z]*|__[^\\WA-Z\\d_][^\\WA-Z]+__)$' pattern)

This check looks for invalid names for a range of different identifiers.

You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements.

If your project includes a Pylint configuration file, the settings contained in that file take precedence.

To find out more about Pylint, please refer to their site.

Loading history...
63
        """
64
        Returns a FileFormat from a name (e.g. "gz" or "gzip").
65
        Case-insensitive.
66
67
        Example:
68
            ``Compression.of("gzip").suffix  # ".gz"``
69
        """
70
        if isinstance(t, Compression):
71
            return t
72
        return self.mapping[t]
73
74
    def guess(self, path: PathLike) -> Compression:
0 ignored issues
show
introduced by
Missing function or method docstring
Loading history...
75
        if "." not in path.name:
76
            return self[""]
77
        try:
78
            return self[path.suffix]
79
        except KeyError:
80
            return self[""]
81
82
83
COMPRESSIONS = (
84
    CompressionSet.empty()
85
    + Compression("gzip", {".gz", ".gzip"}, gzip.compress, gzip.decompress)
86
    + Compression("brotli", {".bro", ".brotli"}, brotli.compress, brotli.decompress)
87
    + Compression("zstandard", {".zst", ".zstd"}, zstandard.compress, zstandard.decompress)
88
    + Compression("lz4", {".lz4"}, lz4.frame.compress, lz4.frame.decompress)
89
    + Compression("snappy", {".snappy"}, snappy.compress, snappy.decompress)
90
    + Compression("bzip2", {".bz2", ".bzip2"}, bz2.compress, bz2.decompress)
91
    + Compression("xz", {".xz"}, lzma.compress, lzma.decompress)
92
)
93
94
95
@dataclass(slots=True, frozen=True)
0 ignored issues
show
introduced by
Missing class docstring
Loading history...
Bug introduced by
The keyword slots does not seem to exist for the function call.
Loading history...
96
class SmartIo:
97
    @classmethod
98
    def write(
0 ignored issues
show
introduced by
Missing function or method docstring
Loading history...
99
        cls, data: Any, path: PathLike, *, atomic: bool = False, mkdirs: bool = False
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
100
    ) -> None:
101
        path = Path(path)
102
        compressed = COMPRESSIONS.guess(path).compress(data)
103
        if path.exists() and not path.is_file():
104
            raise WritePermissionsError(f"Path {path} is not a file", path=path)
105
        if path.exists() and not os.access(path, os.W_OK):
106
            raise WritePermissionsError(f"Cannot write to {path}", path=path)
107
        if mkdirs:
108
            path.parent.mkdir(parents=True, exist_ok=True)
109
        if atomic:
110
            tmp = cls.tmp_path(path)
111
            path.write_bytes(compressed)
112
            tmp.rename(path)
113
        else:
114
            path.write_bytes(compressed)
115
116
    @classmethod
117
    def read_text(cls, path: PathLike) -> str:
118
        """
119
        Similar to :meth:`read_bytes`, but then converts to UTF-8.
120
        """
121
        return cls.read_bytes(path).decode(encoding="utf-8")
122
123
    @classmethod
124
    def read_bytes(cls, path: PathLike) -> bytes:
125
        """
126
        Reads, decompressing according to the filename suffix.
127
        """
128
        data = Path(path).read_bytes()
129
        return COMPRESSIONS.guess(path).decompress(data)
130
131
    @classmethod
132
    def tmp_path(cls, path: PathLike, extra: str = "tmp") -> Path:
0 ignored issues
show
introduced by
Missing function or method docstring
Loading history...
133
        now = datetime.now().isoformat(timespec="microsecond").replace(":", "").replace("-", "")
134
        path = Path(path)
135
        suffix = "".join(path.suffixes)
136
        return path.parent / f".part_{extra}.{now}{suffix}"
137
138
139
__all__ = ["Compression", "CompressionSet", "COMPRESSIONS", "SmartIo"]
140