ChecksumFile.delete()   A
last analyzed

Complexity

Conditions 1

Size

Total Lines 8
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 2
nop 1
dl 0
loc 8
rs 10
c 0
b 0
f 0
1
# SPDX-FileCopyrightText: Copyright 2020-2023, Contributors to typed-dfs
2
# SPDX-PackageHomePage: https://github.com/dmyersturnbull/typed-dfs
3
# SPDX-License-Identifier: Apache-2.0
4
"""
5
Models for shasum-like files.
6
"""
7
from __future__ import annotations
8
9
from dataclasses import dataclass
10
from pathlib import Path, PurePath
11
from typing import TYPE_CHECKING
12
13
import regex
14
15
from typeddfs.df_errors import (
16
    HashContradictsExistingError,
17
    HashDidNotValidateError,
18
    HashExistsError,
19
    HashFileMissingError,
20
    HashFilenameMissingError,
21
    PathNotRelativeError,
22
)
23
24
if TYPE_CHECKING:
25
    from collections.abc import Callable, Iterable, Mapping, Sequence, ValuesView
26
27
    from typeddfs.utils._utils import PathLike
28
29
_hex_pattern = regex.compile(r"[A-Ha-h0-9]+", flags=regex.V1)
30
_hashsum_file_sep = regex.compile(r" [ *]", flags=regex.V1)
31
32
33
@dataclass(frozen=True, slots=True)
34
class _ChecksumMapping:
35
    hash_path: Path
36
    _dct: Mapping[Path, str]
37
38
    def __post_init__(self):
39
        try:
40
            for p in self._dct:
41
                # will error if it's not
42
                p.relative_to(self.directory)
43
        except ValueError as e:
44
            msg = f"{e}: Full contents are {self._dct}"
45
            raise PathNotRelativeError(msg) from e
46
47
    def lines(self) -> Sequence[str]:
48
        """
49
        Returns the text that would be written for this .shasum-like file.
50
        Calls :meth:`unresolve` first.
51
        """
52
        unresolved = self.unresolve()
53
        return [f"{v} *{p.name}" for p, v in unresolved._dct.items()]
54
55
    def line(self, path: PathLike) -> str:
56
        """
57
        Returns the text that would be written for a single path in a .shasum-like file.
58
        """
59
        path = Path(path)
60
        v = self._dct[path]
61
        return f"{v} *{path.name}"
62
63
    @property
64
    def directory(self) -> Path:
65
        return self.hash_path.parent
66
67
    def resolve(self) -> __qualname__:
68
        """
69
        Calls ``pathlib.Path.resolve()`` on all paths.
70
        This will follow symlinks, etc.
71
        """
72
        # noinspection PyArgumentList
73
        return self.__class__(
74
            self.hash_path.resolve(),
75
            {k.resolve(): v for k, v in self._dct.items()},
76
        )
77
78
    def unresolve(self) -> __qualname__:
79
        """
80
        Each path becomes its filename under ``self.directory``.
81
        This means that the parent nodes of each path are discarded in favor of ``self.directory``.
82
83
        Raises:
84
            ValueError: If two paths are "un-resolved" to the same path
85
86
        .. note::
87
            This will not work correctly with subdirectories
88
        """
89
        dct = {}
90
        for k, v in self._dct.items():
91
            k = self.directory / k.name
92
            if k in dct:
93
                msg = f"At least 2 paths resolve to {k}"
94
                raise ValueError(msg)
95
            dct[k] = v
96
        # noinspection PyArgumentList
97
        return self.__class__(self.hash_path, dct)
98
99
    @classmethod
100
    def _parse(
101
        cls,
102
        path: Path,
103
        *,
104
        lines: Sequence[str] | None = None,
105
        missing_ok: bool = False,
106
        subdirs: bool = False,
107
    ) -> __qualname__:
108
        path = Path(path)
109
        if lines is None and path.exists():
110
            lines = path.read_text(encoding="utf-8").splitlines()
111
        elif missing_ok and lines is None:
112
            lines = []
113
        elif lines is None:
114
            msg = f"Hash file {path} not found"
115
            raise HashFileMissingError(msg)
116
117
        # ignore spaces -- editors often add an extra line break, and it's probably fine anyway
118
        read = [_hashsum_file_sep.split(s, 1) for s in lines if len(s) > 0]
119
        # obviously this means that / can't appear in a node
120
        # this is consistent with the commonly accepted spec for shasum
121
        # does not handle root (beginning with /)
122
        kv = {
123
            Path(*[n for n in r[1].strip().split("/") if n != "."]): r[0]
124
            for r in read
125
            if len(r[0]) != 0
126
        }
127
        if not subdirs:
128
            slashed = {k for k in kv if len(k.parts) > 1}
129
            if len(slashed) > 0:
130
                msg = f"Subdirectory (containing /): {slashed} in {path}"
131
                raise ValueError(msg)
132
        kv = {Path(path.parent, p): v for p, v in kv.items()}
133
        return cls(path, kv)
134
135
    def _get_updated(
136
        self,
137
        *,
138
        path: PathLike,
139
        new: str | None,
140
        overwrite: bool | None,
141
        missing_ok: bool,
142
    ) -> str | None:
143
        path = Path(path)
144
        z = self._dct.get(path)
145
        if z is None and not missing_ok:
146
            msg = f"{path} not found ({len(self._dct)} are)"
147
            raise HashFilenameMissingError(msg)
148
        if z is not None:
149
            err = None
150
            if overwrite is None and z != new:
151
                err = (HashContradictsExistingError, f"Hash for {path} exists but does not match")
152
            elif overwrite is False:
153
                err = (
154
                    HashExistsError,
155
                    f"Hash for {path} exists ({'matches' if z == new else 'differs'})",
156
                )
157
            if err is not None:
158
                raise err[0](err[1], key=str(path), original=z, new=new)
159
        return new
160
161
162
@dataclass(frozen=True, slots=True)
163
class ChecksumFile(_ChecksumMapping):
164
    def load(self) -> __qualname__:
165
        """
166
        Returns a copy of ``self`` read from :attr:`hash_path`.
167
        """
168
        return self.__class__.parse(self.hash_path)
169
170
    @classmethod
171
    def parse(
172
        cls,
173
        path: Path,
174
        *,
175
        lines: Sequence[str] | None = None,
176
    ) -> __qualname__:
177
        """
178
        Reads hash file contents.
179
180
        Args:
181
            path: The path of the checksum file; required to resolve paths relative to its parent
182
            lines: The lines in the checksum file; reads ``path`` if None
183
184
        Returns:
185
            A ChecksumFile
186
        """
187
        return cls._parse(path, lines=lines, missing_ok=False, subdirs=False)
188
189
    @classmethod
190
    def new(
191
        cls,
192
        hash_path: PathLike,
193
        file_path: PathLike,
194
        hash_value: str,
195
    ) -> ChecksumFile:
196
        """
197
        Use this as a constructor.
198
        """
199
        hash_path = Path(hash_path)
200
        return cls(hash_path, {Path(file_path): hash_value})
201
202
    def rename(self, path: Path) -> __qualname__:
203
        """
204
        Replaces :attr:`self.file_path` with ``path``.
205
        This will affect the filename written in a .shasum-like file.
206
        No OS operations are performed.
207
        """
208
        return self.new(self.hash_path, file_path=path, hash_value=self.hash_value)
209
210
    def update(self, value: str, overwrite: bool | None = True) -> __qualname__:
211
        """
212
        Modifies the hash.
213
214
        Args:
215
            value: The new hex-encoded hash
216
            overwrite: If ``None``, requires that the value is the same as before
217
                       (no operation is performed).
218
                       If ``False``, this method will always raise an error.
219
        """
220
        x = self._get_updated(
221
            path=self.file_path,
222
            new=value,
223
            missing_ok=False,
224
            overwrite=overwrite,
225
        )
226
        return self.new(self.hash_path, file_path=self.file_path, hash_value=x)
227
228
    def delete(self) -> None:
229
        """
230
        Deletes the hash file by calling ``pathlib.Path.unlink(self.hash_path)``.
231
232
        Raises:
233
            OSError: Accordingly
234
        """
235
        self.hash_path.unlink(missing_ok=True)
236
237
    def write(self) -> None:
238
        """
239
        Writes the hash file.
240
241
        Raises:
242
            OsError: Accordingly
243
        """
244
        self.directory.mkdir(exist_ok=True, parents=True)
245
        self.hash_path.write_text("\n".join(self.lines()), encoding="utf-8")
246
247
    @property
248
    def file_path(self) -> Path:
249
        if len(self._dct) != 1:
250
            msg = f"{self.hash_path} contains {len(self._dct)} (!= 1) items"
251
            raise AssertionError(msg)
252
        return next(iter(self._dct.keys()))
253
254
    @property
255
    def hash_value(self) -> str:
256
        if len(self._dct) != 1:
257
            msg = f"{self.hash_path} contains {len(self._dct)} (!= 1) items"
258
            raise AssertionError(msg)
259
        return next(iter(self._dct.values()))
260
261
    def verify(self, computed: str) -> None:
262
        """
263
        Verifies the checksum.
264
265
        Args:
266
            computed: A pre-computed hex-encoded hash
267
268
        Raises:
269
            HashDidNotValidateError: If the hashes are not equal
270
        """
271
        if computed != self.hash_value:
272
            msg = f"Hash for {self.file_path}: calculated {computed} != expected {self.hash_value}"
273
            raise HashDidNotValidateError(
274
                msg,
275
                actual=computed,
276
                expected=self.hash_value,
277
            )
278
279
280
@dataclass(frozen=True, slots=True)
281
class ChecksumMapping(_ChecksumMapping):
282
    def load(self, missing_ok: bool = False) -> __qualname__:
283
        """
284
        Replaces this map with one read from the hash file.
285
286
        Args:
287
            missing_ok: If the hash path does not exist, treat it has having no items
288
        """
289
        return self.__class__.parse(self.hash_path, missing_ok=missing_ok)
290
291
    @classmethod
292
    def parse(
293
        cls,
294
        path: Path,
295
        *,
296
        lines: Sequence[str] | None = None,
297
        missing_ok: bool = False,
298
        subdirs: bool = False,
299
    ) -> __qualname__:
300
        """
301
        Reads hash file contents.
302
303
        Args:
304
            path: The path of the checksum file; required to resolve paths relative to its parent
305
            lines: The lines in the checksum file; reads ``path`` if None
306
            missing_ok: If ``path`` does not exist, assume it contains no items
307
            subdirs: Permit files within subdirectories specified with ``/``
308
                     Most tools do not support these.
309
310
        Returns:
311
            A mapping from raw string filenames to their hex hashes.
312
            Any node called ``./`` in the path is stripped.
313
        """
314
        return cls._parse(path, lines=lines, missing_ok=missing_ok, subdirs=subdirs)
315
316
    @classmethod
317
    def new(
318
        cls,
319
        hash_path: PathLike,
320
        dct: Mapping[PathLike, str],
321
    ) -> ChecksumMapping:
322
        """
323
        Use this as the constructor.
324
        """
325
        hash_path = Path(hash_path)
326
        return cls(hash_path, {Path(k): v for k, v in dct.items()})
327
328
    def write(
329
        self,
330
        *,
331
        sort: bool | Callable[[Sequence[Path]], Sequence[Path]] = False,
332
        rm_if_empty: bool = False,
333
    ) -> None:
334
        """
335
        Writes to the hash (.shasum-like) file.
336
337
        Args:
338
            sort: Sort with this function, or ``sorted`` if True
339
            rm_if_empty: Delete with ``pathlib.Path.unlink`` if this contains no items
340
341
        Raises:
342
            OSError: Accordingly
343
        """
344
        if sort is True:
345
            sort = sorted
346
        if rm_if_empty and len(self._dct) == 0:
347
            self.hash_path.unlink(missing_ok=True)
348
        else:
349
            lines = self.lines()
350
            if callable(sort):
351
                lines = sort(lines)
352
            self.directory.mkdir(exist_ok=True, parents=True)
353
            self.hash_path.write_text("\n".join(lines), encoding="utf-8")
354
355
    @property
356
    def entries(self) -> Mapping[Path, str]:
357
        return dict(self._dct)
358
359
    def keys(self) -> set[Path]:
360
        return self._dct.keys()
361
362
    def values(self) -> ValuesView[str]:
363
        return self._dct.values()
364
365
    def items(self) -> set[tuple[Path, str]]:
366
        return self._dct.items()
367
368
    def get(self, key: Path, default: str | None = None) -> str | None:
369
        return self._dct.get(key, default)
370
371
    def __contains__(self, path: Path) -> bool:
372
        return path in self._dct
373
374
    def __getitem__(self, path: Path) -> str:
375
        return self._dct[path]
376
377
    def __len__(self) -> int:
378
        return len(self._dct)
379
380
    def __add__(
381
        self,
382
        other: ChecksumMapping | Mapping[PathLike, str] | __qualname__,
383
    ) -> __qualname__:
384
        """
385
        Performs a symmetric addition.
386
387
        Raises:
388
            ValueError: If ``other`` intersects (shares keys) with ``self``
389
390
        See Also:
391
            :meth:`append`
392
        """
393
        if isinstance(other, ChecksumMapping):
394
            other = other._dct
395
        other = {Path(k): v for k, v in other.items()}
396
        intersection = set(self._dct).intersection(other)
397
        if len(intersection) > 0:
398
            msg = f"Cannot merge with intersection: {intersection}"
399
            raise ValueError(msg)
400
        return ChecksumMapping(self.hash_path, {**self, **other})
401
402
    def __sub__(self, other: PathLike | Iterable[PathLike] | ChecksumMapping) -> __qualname__:
403
        """
404
        Removes entries.
405
406
        See Also:
407
            :meth:`remove`
408
        """
409
        if isinstance(other, ChecksumMapping):
410
            other = other._dct
411
        if isinstance(other, PurePath | str):
412
            other = {other}
413
        other = {Path(p) for p in other}
414
        return self.new(self.hash_path, {k: v for k, v in self.items() if k not in other})
415
416
    def remove(
417
        self,
418
        remove: PathLike | Iterable[PathLike],
419
        *,
420
        missing_ok: bool = False,
421
    ) -> __qualname__:
422
        """
423
        Strips paths from this hash collection.
424
        Like :meth:`update` but less flexible and only for removing paths.
425
426
        Raises:
427
            :class:`typeddfs.df_errors.PathNotRelativeError`: To avoid, try calling ``resolve`` first
428
        """
429
        if isinstance(remove, str | PurePath):
430
            remove = [remove]
431
        return self.update({p: None for p in remove}, missing_ok=missing_ok, overwrite=True)
432
433
    def append(
434
        self,
435
        append: Mapping[PathLike, str],
436
        *,
437
        overwrite: bool | None = False,
438
    ) -> __qualname__:
439
        """
440
        Append paths to a dir hash file.
441
        Like :meth:`update` but less flexible and only for adding paths.
442
        """
443
        return self.update(append, missing_ok=True, overwrite=overwrite)
444
445
    def update(
446
        self,
447
        update: Callable[[Path], PathLike | None] | Mapping[PathLike, PathLike | None],
448
        *,
449
        missing_ok: bool = True,
450
        overwrite: bool | None = True,
451
    ) -> __qualname__:
452
        """
453
        Returns updated hashes from a dir hash file.
454
455
        Args:
456
            update: Values to overwrite.
457
                    May be a function or a dictionary from paths to values.
458
                    If ``None`` is returned, the entry will be removed;
459
                    otherwise, updates with the returned hex hash.
460
            missing_ok: Require that the path is already listed
461
            overwrite: Allow overwriting an existing value.
462
                       If ``None``, only allow if the hash is the same.
463
        """
464
        fixed = {}
465
        # update existing items:
466
        for p, v in self.items():
467
            v_new = update(p) if callable(update) else update.get(p, v)
468
            if v == v_new:
469
                # avoid an error about overwriting if we're not changing values
470
                fixed[p] = v
471
            else:
472
                fixed[p] = self._get_updated(
473
                    path=p,
474
                    new=v_new,
475
                    missing_ok=missing_ok,
476
                    overwrite=overwrite,
477
                )
478
        # add new items:
479
        if not callable(update):
480
            for p, v in update.items():
481
                p = Path(p)
482
                fixed[p] = self._get_updated(
483
                    path=p,
484
                    new=v,
485
                    missing_ok=missing_ok,
486
                    overwrite=overwrite,
487
                )
488
        fixed = {k: v for k, v in fixed.items() if v is not None}
489
        return self.new(self.hash_path, fixed)
490
491
    def verify(
492
        self,
493
        path: PathLike,
494
        computed: str,
495
        *,
496
        resolve: bool = False,
497
        exist: bool = False,
498
    ) -> None:
499
        """
500
        Verifies a checksum.
501
        The file ``path`` must be listed.
502
503
        Args:
504
            path: The file to look for
505
            computed: A pre-computed hex-encoded hash; if set, do not calculate from ``path``
506
            resolve: Resolve paths before comparison
507
            exist: Require that ``path`` exists
508
509
        Raises:
510
            FileNotFoundError: If ``path`` does not exist
511
            HashFileMissingError: If the hash file does not exist
512
            HashDidNotValidateError: If the hashes are not equal
513
            HashVerificationError`: Superclass of ``HashDidNotValidateError`` if
514
                                    the filename is not listed, etc.
515
        """
516
        path = Path(path)
517
        if resolve:
518
            path = path.resolve()
519
        elif not path.is_absolute():
520
            path = self.directory / path
521
        if exist and not path.exists():
522
            msg = f"Path {path} does not exist"
523
            raise FileNotFoundError(msg)
524
        found = self.get(path)
525
        if found is None:
526
            msg = f"Path {path} not listed in {self.hash_path}"
527
            raise FileNotFoundError(msg)
528
        if computed != found:
529
            msg = f"Hash for {path}: calculated {computed} != expected {found}"
530
            raise HashDidNotValidateError(
531
                msg,
532
                actual=computed,
533
                expected=found,
534
            )
535
536
537
__all__ = ["ChecksumFile", "ChecksumMapping"]
538