Passed
Push — main ( 6d4817...ffd182 )
by Douglas
01:37
created

CompressionSet.__getitem__()   A

Complexity

Conditions 2

Size

Total Lines 11
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 2
eloc 4
nop 2
dl 0
loc 11
rs 10
c 0
b 0
f 0
1
"""
2
Compression-aware reading and writing of files.
3
"""
4
from __future__ import annotations
5
6
import abc
7
import bz2
8
import gzip
9
import lzma
10
import os
11
from collections.abc import Callable
12
from dataclasses import dataclass
13
from datetime import datetime
14
from pathlib import Path, PurePath
15
from typing import Any, Self
0 ignored issues
show
Bug introduced by
The name Self does not seem to exist in module typing.
Loading history...
16
17
from pocketutils import WritePermissionsError
18
19
PathLike = str | PurePath
20
21
22
@dataclass(frozen=True)
0 ignored issues
show
introduced by
Missing class docstring
Loading history...
23
class Compression(metaclass=abc.ABCMeta):
24
    name: str
25
    suffixes: set[str]
26
    compress: Callable[[bytes], bytes]
27
    decompress: Callable[[bytes], bytes]
28
29
30
def identity(x):
0 ignored issues
show
Coding Style Naming introduced by
Argument name "x" doesn't conform to snake_case naming style ('([^\\W\\dA-Z][^\\WA-Z]2,|_[^\\WA-Z]*|__[^\\WA-Z\\d_][^\\WA-Z]+__)$' pattern)

This check looks for invalid names for a range of different identifiers.

You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements.

If your project includes a Pylint configuration file, the settings contained in that file take precedence.

To find out more about Pylint, please refer to their site.

Loading history...
introduced by
Missing function or method docstring
Loading history...
31
    return x
32
33
34
@dataclass(frozen=True)
0 ignored issues
show
introduced by
Missing class docstring
Loading history...
35
class CompressionSet:
36
    mapping: dict[str, Compression]
0 ignored issues
show
introduced by
Value 'dict' is unsubscriptable
Loading history...
37
38
    @classmethod
39
    def empty(cls) -> Self:
0 ignored issues
show
introduced by
Missing function or method docstring
Loading history...
40
        return CompressionSet({"": Compression("", set(), identity, identity)})
41
42
    def __add__(self, fmt: Compression):
43
        new = {fmt.name: fmt} | {s: fmt for s in fmt.suffixes}
44
        already = {v for k, v in self.mapping.items() if k in new}
45
        if len(already) > 1 or len(already) == 1 and already != {fmt}:
46
            raise ValueError(f"Keys from {fmt} already mapped to {already}")
47
        return CompressionSet(self.mapping | new)
48
49
    def __sub__(self, fmt: Compression):
50
        return CompressionSet(
51
            {k: v for k, v in self.mapping.items() if k != fmt.name and k not in fmt.suffixes}
52
        )
53
54
    def __or__(self, fmt: CompressionSet):
55
        return CompressionSet(self.mapping | fmt.mapping)
56
57
    def __getitem__(self, t: Compression | str) -> Compression:
0 ignored issues
show
Coding Style Naming introduced by
Argument name "t" doesn't conform to snake_case naming style ('([^\\W\\dA-Z][^\\WA-Z]2,|_[^\\WA-Z]*|__[^\\WA-Z\\d_][^\\WA-Z]+__)$' pattern)

This check looks for invalid names for a range of different identifiers.

You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements.

If your project includes a Pylint configuration file, the settings contained in that file take precedence.

To find out more about Pylint, please refer to their site.

Loading history...
58
        """
59
        Returns a FileFormat from a name (e.g. "gz" or "gzip").
60
        Case-insensitive.
61
62
        Example:
63
            ``Compression.of("gzip").suffix  # ".gz"``
64
        """
65
        if isinstance(t, Compression):
66
            return t
67
        return self.mapping[t]
68
69
    def guess(self, path: PathLike) -> Compression:
0 ignored issues
show
introduced by
Missing function or method docstring
Loading history...
70
        if "." not in path.name:
71
            return self[""]
72
        try:
73
            return self[path.suffix]
74
        except KeyError:
75
            return self[""]
76
77
78
def _get_compressions():
79
    import brotli
0 ignored issues
show
introduced by
Import outside toplevel (brotli)
Loading history...
introduced by
Unable to import 'brotli'
Loading history...
80
    import lz4.frame
0 ignored issues
show
introduced by
Import outside toplevel (lz4.frame)
Loading history...
introduced by
Unable to import 'lz4.frame'
Loading history...
81
    import snappy
0 ignored issues
show
introduced by
Import outside toplevel (snappy)
Loading history...
introduced by
Unable to import 'snappy'
Loading history...
82
    import zstandard
0 ignored issues
show
introduced by
Import outside toplevel (zstandard)
Loading history...
introduced by
Unable to import 'zstandard'
Loading history...
83
84
    return (
85
        CompressionSet.empty()
86
        + Compression("gzip", {".gz", ".gzip"}, gzip.compress, gzip.decompress)
87
        + Compression("brotli", {".bro", ".brotli"}, brotli.compress, brotli.decompress)
88
        + Compression("zstandard", {".zst", ".zstd"}, zstandard.compress, zstandard.decompress)
89
        + Compression("lz4", {".lz4"}, lz4.frame.compress, lz4.frame.decompress)
90
        + Compression("snappy", {".snappy"}, snappy.compress, snappy.decompress)
91
        + Compression("bzip2", {".bz2", ".bzip2"}, bz2.compress, bz2.decompress)
92
        + Compression("xz", {".xz"}, lzma.compress, lzma.decompress)
93
    )
94
95
96
@dataclass(slots=True, frozen=True)
0 ignored issues
show
introduced by
Missing class docstring
Loading history...
Bug introduced by
The keyword slots does not seem to exist for the function call.
Loading history...
97
class SmartIo:
98
    _COMPRESSIONS = None
0 ignored issues
show
Coding Style Naming introduced by
Attribute name "_COMPRESSIONS" doesn't conform to snake_case naming style ('([^\\W\\dA-Z][^\\WA-Z]2,|_[^\\WA-Z]*|__[^\\WA-Z\\d_][^\\WA-Z]+__)$' pattern)

This check looks for invalid names for a range of different identifiers.

You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements.

If your project includes a Pylint configuration file, the settings contained in that file take precedence.

To find out more about Pylint, please refer to their site.

Loading history...
99
100
    @classmethod
101
    def compressions(cls) -> CompressionSet:
0 ignored issues
show
introduced by
Missing function or method docstring
Loading history...
102
        if cls._COMPRESSIONS is None:
103
            _COMPRESSIONS = _get_compressions()
0 ignored issues
show
Coding Style Naming introduced by
Variable name "_COMPRESSIONS" doesn't conform to snake_case naming style ('([^\\W\\dA-Z][^\\WA-Z]2,|_[^\\WA-Z]*|__[^\\WA-Z\\d_][^\\WA-Z]+__)$' pattern)

This check looks for invalid names for a range of different identifiers.

You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements.

If your project includes a Pylint configuration file, the settings contained in that file take precedence.

To find out more about Pylint, please refer to their site.

Loading history...
104
        return cls._COMPRESSIONS
105
106
    @classmethod
107
    def write(
0 ignored issues
show
introduced by
Missing function or method docstring
Loading history...
108
        cls, data: Any, path: PathLike, *, atomic: bool = False, mkdirs: bool = False
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
109
    ) -> None:
110
        path = Path(path)
111
        compressed = cls.compressions().guess(path).compress(data)
112
        if path.exists() and not path.is_file():
113
            raise WritePermissionsError(f"Path {path} is not a file", path=path)
114
        if path.exists() and not os.access(path, os.W_OK):
115
            raise WritePermissionsError(f"Cannot write to {path}", path=path)
116
        if mkdirs:
117
            path.parent.mkdir(parents=True, exist_ok=True)
118
        if atomic:
119
            tmp = cls.tmp_path(path)
120
            path.write_bytes(compressed)
121
            tmp.rename(path)
122
        else:
123
            path.write_bytes(compressed)
124
125
    @classmethod
126
    def read_text(cls, path: PathLike) -> str:
127
        """
128
        Similar to :meth:`read_bytes`, but then converts to UTF-8.
129
        """
130
        return cls.read_bytes(path).decode(encoding="utf-8")
131
132
    @classmethod
133
    def read_bytes(cls, path: PathLike) -> bytes:
134
        """
135
        Reads, decompressing according to the filename suffix.
136
        """
137
        data = Path(path).read_bytes()
138
        return cls.compressions().guess(path).decompress(data)
139
140
    @classmethod
141
    def tmp_path(cls, path: PathLike, extra: str = "tmp") -> Path:
0 ignored issues
show
introduced by
Missing function or method docstring
Loading history...
142
        now = datetime.now().isoformat(timespec="microsecond").replace(":", "").replace("-", "")
143
        path = Path(path)
144
        suffix = "".join(path.suffixes)
145
        return path.parent / f".part_{extra}.{now}{suffix}"
146
147
148
__all__ = ["Compression", "CompressionSet", "SmartIo"]
149