typeddfs.utils.checksums   B
last analyzed

Complexity

Total Complexity 51

Size/Duplication

Total Lines 277
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 171
dl 0
loc 277
rs 7.92
c 0
b 0
f 0
wmc 51

17 Methods

Rating   Name   Duplication   Size   Complexity  
A Checksums.default_algorithm() 0 3 1
A Checksums.load_filesum_of_file() 0 3 1
A Checksums.get_dirsum_of_file() 0 9 1
A Checksums.load_dirsum_of_file() 0 3 1
A Checksums.get_filesum_of_file() 0 9 1
A Checksums.calc_hash() 0 10 4
A Checksums.load_dirsum_of_dir() 0 3 1
F Checksums.write_any() 0 68 19
A Checksums.load_dirsum_exact() 0 2 1
A Checksums.load_filesum_exact() 0 2 1
C Checksums.verify_any() 0 31 10
A Checksums.resolve_algorithm() 0 16 2
A Checksums.delete_any() 0 12 2
A Checksums.generate_dirsum() 0 15 1
A Checksums.guess_algorithm() 0 19 2
A Checksums.get_dirsum_of_dir() 0 9 1
A Checksums.verify_hex() 0 14 2

How to fix   Complexity   

Complexity

Complex classes like typeddfs.utils.checksums often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# SPDX-FileCopyrightText: Copyright 2020-2023, Contributors to typed-dfs
2
# SPDX-PackageHomePage: https://github.com/dmyersturnbull/typed-dfs
3
# SPDX-License-Identifier: Apache-2.0
4
"""
5
Tools for shasum-like files.
6
"""
7
from __future__ import annotations
8
9
import hashlib
10
from dataclasses import dataclass
11
from pathlib import Path
12
13
from typeddfs.df_errors import (
14
    HashAlgorithmMissingError,
15
    HashDidNotValidateError,
16
    HashFileExistsError,
17
    HashFileMissingError,
18
    HashFilenameMissingError,
19
    MultipleHashFilenamesError,
20
    PathNotRelativeError,
21
)
22
from typeddfs.utils._utils import _DEFAULT_HASH_ALG, PathLike
23
from typeddfs.utils.checksum_models import ChecksumFile, ChecksumMapping
24
25
26
@dataclass(frozen=True, slots=True, order=True)
27
class Checksums:
28
    alg: str = _DEFAULT_HASH_ALG
29
30
    @classmethod
31
    def default_algorithm(cls) -> str:
32
        return _DEFAULT_HASH_ALG
33
34
    def write_any(
35
        self,
36
        path: PathLike,
37
        *,
38
        to_file: bool,
39
        to_dir: bool,
40
        overwrite: bool | None = True,
41
    ) -> str | None:
42
        """
43
        Adds and/or appends the hex hash of ``path``.
44
45
        Args:
46
            path: Path to the file to hash
47
            to_file: Whether to save a per-file hash
48
            to_dir: Whether to save a per-dir hash
49
            overwrite: If True, overwrite the file hash and any entry in the dir hash.
50
                       If False, never overwrite either.
51
                       If None, never overwrite, but ignore if equal to any existing entries.
52
        """
53
        if not to_file and not to_dir:
54
            return None
55
        fh, dh = None, None
56
        x, y = None, None
57
        path = Path(path)
58
        hash_file_path = self.get_filesum_of_file(path)
59
        if to_file:
60
            if hash_file_path.exists():
61
                fh = self.load_filesum_exact(hash_file_path)
62
            else:
63
                fh = ChecksumFile.new(hash_file_path, file_path=path, hash_value="")
64
            y = fh.hash_value
65
            if y != "" and overwrite is False:  # check first -- save time
66
                msg = f"Hash file of {path} already exists"
67
                raise HashFileExistsError(msg, key=str(path))
68
        hash_dir_path = self.get_dirsum_of_file(path)
69
        if to_dir:
70
            dh = self.load_dirsum_exact(hash_dir_path)
71
            x = dh.get(path)
72
            if x is not None and overwrite is False:
73
                msg = f"Path {path} listed in {hash_dir_path}"
74
                raise MultipleHashFilenamesError(
75
                    msg,
76
                    key=str(path),
77
                )
78
        digest = self.calc_hash(path)
79
        if overwrite is None:
80
            if x is not None and x != digest:
81
                msg = f"Path {path} listed in {hash_dir_path}"
82
                raise MultipleHashFilenamesError(
83
                    msg,
84
                    key=str(path),
85
                )
86
            if y is not None and y != digest:
87
                msg = f"Path {path} listed in {hash_dir_path}"
88
                raise MultipleHashFilenamesError(
89
                    msg,
90
                    key=str(path),
91
                )
92
        if to_file:
93
            fh = fh.update(digest, overwrite=overwrite)
94
        if to_dir:
95
            dh = dh.update({path: digest})
96
        # write only at the very end:
97
        if to_file:
98
            fh.write()
99
        if to_dir:
100
            dh.write()
101
        return digest
102
103
    def verify_any(
104
        self,
105
        path: PathLike,
106
        *,
107
        file_hash: bool,
108
        dir_hash: bool,
109
        computed: str | None,
110
    ) -> str | None:
111
        path = Path(path)
112
        if computed is not None:
113
            self.verify_hex(path, computed)
114
        hash_file_path = self.get_filesum_of_file(path)
115
        hash_dir_path = self.get_dirsum_of_file(path)
116
        # check first to save time:
117
        if file_hash and not hash_file_path.exists():
118
            msg = f"File hash of {path} not found"
119
            raise HashFileMissingError(msg, key=str(path))
120
        if dir_hash and not hash_dir_path.exists():
121
            msg = f"Hash of {path} not in {hash_dir_path}"
122
            raise HashFilenameMissingError(msg, key=str(path))
123
        # now calculate the actual hash for comparison
124
        if file_hash or dir_hash:
125
            computed = self.calc_hash(path)
126
        # check it:
127
        if file_hash:
128
            fh = self.load_filesum_exact(hash_file_path)
129
            fh.verify(computed)
130
        if dir_hash:
131
            dh = self.load_dirsum_exact(hash_dir_path)
132
            dh.verify(path, computed)
133
        return computed
134
135
    def delete_any(self, path: PathLike, *, rm_if_empty: bool = False) -> None:
136
        """
137
        Deletes the filesum and removes ``path`` from the dirsum.
138
        Ignores missing files.
139
        """
140
        path = Path(path)
141
        self.get_filesum_of_file(path).unlink(missing_ok=True)
142
        try:
143
            ds = self.load_dirsum_of_file(path, missing_ok=True)
144
            ds.remove(path, missing_ok=True).write(rm_if_empty=rm_if_empty)
145
        except PathNotRelativeError:
146
            pass
147
148
    def verify_hex(self, path: PathLike, expected: str) -> str | None:
149
        """
150
        Verifies a hash directly from a hex string.
151
        """
152
        path = Path(path)
153
        actual = self.calc_hash(path)
154
        if actual != expected:
155
            msg = f"Hash for {path}: calculated {actual} != expected {expected}"
156
            raise HashDidNotValidateError(
157
                msg,
158
                actual=actual,
159
                expected=expected,
160
            )
161
        return actual
162
163
    def calc_hash(self, path: PathLike) -> str:
164
        """
165
        Calculates the hash of a file and returns it, hex-encoded.
166
        """
167
        path = Path(path)
168
        alg = getattr(hashlib, self.alg)()
169
        with path.open("rb") as f:
170
            for chunk in iter(lambda: f.read(16 * 1024), b""):
171
                alg.update(chunk)
172
        return alg.hexdigest()
173
174
    def generate_dirsum(self, directory: PathLike, glob: str = "*") -> ChecksumMapping:
175
        """
176
        Generates a new hash mapping, calculating hashes for extant files.
177
178
        Args:
179
            directory: Base directory
180
            glob: Glob pattern under ``directory`` (cannot be recursive)
181
182
        Returns:
183
            A ChecksumMapping; use ``.write`` to write it
184
        """
185
        directory = Path(directory)
186
        path = self.get_dirsum_of_dir(directory)
187
        sums = {p: self.calc_hash(p) for p in directory.glob(glob)}
188
        return ChecksumMapping(path, sums)
189
190
    def load_filesum_of_file(self, path: PathLike) -> ChecksumFile:
191
        hash_file = self.get_filesum_of_file(path)
192
        return ChecksumFile.parse(hash_file)
193
194
    def load_dirsum_of_file(self, path: PathLike, *, missing_ok: bool = True) -> ChecksumMapping:
195
        hash_dir = self.get_dirsum_of_file(path)
196
        return ChecksumMapping.parse(hash_dir, missing_ok=missing_ok)
197
198
    def load_dirsum_of_dir(self, path: PathLike, *, missing_ok: bool = True) -> ChecksumMapping:
199
        hash_dir = self.get_dirsum_of_dir(path)
200
        return ChecksumMapping.parse(hash_dir, missing_ok=missing_ok)
201
202
    def load_dirsum_exact(self, path: PathLike, *, missing_ok: bool = True) -> ChecksumMapping:
203
        return ChecksumMapping.parse(Path(path), missing_ok=missing_ok)
204
205
    def load_filesum_exact(self, path: PathLike) -> ChecksumFile:
206
        return ChecksumFile.parse(Path(path))
207
208
    def get_filesum_of_file(self, path: PathLike) -> Path:
209
        """
210
        Returns the path required for the per-file hash of ``path``.
211
212
        Example:
213
            ``Utils.get_hash_file("my_file.txt.gz")  # Path("my_file.txt.gz.sha256")``
214
        """
215
        path = Path(path)
216
        return path.with_suffix(path.suffix + "." + self.alg)
217
218
    def get_dirsum_of_file(self, path: PathLike) -> Path:
219
        """
220
        Returns the path required for the per-directory hash of ``path``.
221
222
        Example:
223
            ``Utils.get_hash_file(Path("my_dir, my_file.txt.gz"))  # Path("my_dir", "my_dir.sha256")``
224
        """
225
        path = Path(path)
226
        return path.parent / (path.parent.name + "." + self.alg)
227
228
    def get_dirsum_of_dir(self, path: PathLike) -> Path:
229
        """
230
        Returns the path required for the per-directory hash of ``path``.
231
232
        Example:
233
            ``Utils.get_hash_file("my_dir")  # Path("my_dir", "my_dir.sha256")``
234
        """
235
        path = Path(path)
236
        return path / (path.name + "." + self.alg)
237
238
    @classmethod
239
    def guess_algorithm(cls, path: PathLike) -> str:
240
        """
241
        Guesses the hashlib algorithm used from a hash file.
242
243
        Args:
244
            path: The hash file (e.g. my-file.sha256)
245
246
        Example:
247
            ``Utils.guess_algorithm("my_file.sha1")  # "sha1"``
248
        """
249
        path = Path(path)
250
        alg = path.suffix.lstrip(".").lower().replace("-", "")
251
        try:
252
            getattr(hashlib, alg)
253
        except AttributeError:
254
            msg = f"No hashlib algorithm {alg}"
255
            raise HashAlgorithmMissingError(msg, key=alg) from None
256
        return alg
257
258
    @classmethod
259
    def resolve_algorithm(cls, alg: str) -> str:
260
        """
261
        Finds a hash algorithm by name in :mod:`hashlib`.
262
        Converts to lowercase and removes hyphens.
263
264
        Raises:
265
            HashAlgorithmMissingError: If not found
266
        """
267
        alg = alg.lower().replace("-", "")
268
        try:
269
            getattr(hashlib, alg)
270
        except AttributeError:
271
            msg = f"No hashlib algorithm {alg}"
272
            raise HashAlgorithmMissingError(msg, key=alg) from None
273
        return alg
274
275
276
__all__ = ["Checksums"]
277