1
|
|
|
# SPDX-License-Identifier: LGPL-3.0-only |
2
|
|
|
|
3
|
1 |
|
"""Abstract interface to version control systems.""" |
4
|
1 |
|
|
5
|
1 |
|
import fnmatch |
6
|
1 |
|
import os |
7
|
|
|
import subprocess |
8
|
1 |
|
from abc import ABCMeta, abstractmethod |
9
|
1 |
|
from typing import List, Optional, Tuple |
10
|
|
|
|
11
|
1 |
|
from doorstop import common, settings |
12
|
|
|
|
13
|
|
|
log = common.logger(__name__) |
14
|
1 |
|
|
15
|
|
|
|
16
|
|
|
class BaseWorkingCopy(metaclass=ABCMeta): |
17
|
1 |
|
"""Abstract base class for VCS working copies.""" |
18
|
1 |
|
|
19
|
|
|
DIRECTORY: Optional[str] = None # special hidden directory for the working copy |
20
|
1 |
|
IGNORES: Tuple = () # hidden filenames containing ignore patterns |
21
|
1 |
|
|
22
|
1 |
|
def __init__(self, path): |
23
|
1 |
|
self.path = path |
24
|
1 |
|
self._ignores_cache: Optional[List[str]] = None |
25
|
|
|
self._path_cache: Optional[List[Tuple[str, str, str]]] = None |
26
|
1 |
|
|
27
|
|
|
@staticmethod |
28
|
|
|
def relpath(path): |
29
|
1 |
|
"""Get a relative path to the working copy root for commands.""" |
30
|
|
|
return os.path.relpath(path).replace('\\', '/') |
31
|
1 |
|
|
32
|
|
|
@staticmethod |
33
|
|
|
def call(*args, return_stdout=False): # pragma: no cover (abstract method) |
34
|
|
|
"""Call a command with string arguments.""" |
35
|
|
|
log.debug("$ %s", ' '.join(args)) |
36
|
|
|
try: |
37
|
|
|
if return_stdout: |
38
|
|
|
return subprocess.check_output(args).decode('utf-8') |
39
|
|
|
else: |
40
|
|
|
return subprocess.call(args) |
41
|
|
|
except FileNotFoundError: |
42
|
|
|
raise common.DoorstopError("Command not found: {}".format(args[0])) |
43
|
1 |
|
|
44
|
|
|
@abstractmethod |
45
|
|
|
def lock(self, path): # pragma: no cover (abstract method) |
46
|
|
|
"""Pull, update, and lock a file for editing.""" |
47
|
|
|
raise NotImplementedError |
48
|
1 |
|
|
49
|
|
|
@abstractmethod |
50
|
|
|
def edit(self, path): # pragma: no cover (abstract method) |
51
|
|
|
"""Mark a file as modified.""" |
52
|
|
|
raise NotImplementedError |
53
|
1 |
|
|
54
|
|
|
@abstractmethod |
55
|
|
|
def add(self, path): # pragma: no cover (abstract method) |
56
|
|
|
"""Start tracking a file.""" |
57
|
|
|
raise NotImplementedError |
58
|
1 |
|
|
59
|
|
|
@abstractmethod |
60
|
|
|
def delete(self, path): # pragma: no cover (abstract method) |
61
|
|
|
"""Stop tracking a file.""" |
62
|
|
|
raise NotImplementedError |
63
|
1 |
|
|
64
|
|
|
@abstractmethod |
65
|
|
|
def commit(self, message=None): # pragma: no cover (abstract method) |
66
|
|
|
"""Unlock files, commit, and push.""" |
67
|
|
|
raise NotImplementedError |
68
|
1 |
|
|
69
|
|
|
@property |
70
|
|
|
def ignores(self): |
71
|
1 |
|
"""Yield glob expressions to ignore.""" |
72
|
1 |
|
if self._ignores_cache is None: |
73
|
1 |
|
self._ignores_cache = [] |
74
|
1 |
|
log.debug("reading and caching the ignore patterns...") |
75
|
1 |
|
for filename in self.IGNORES: |
76
|
1 |
|
path = os.path.join(self.path, filename) |
77
|
1 |
|
if os.path.isfile(path): |
78
|
1 |
|
for line in common.read_lines(path): |
79
|
1 |
|
pattern = line.strip(" @\\/*\n") |
80
|
1 |
|
if pattern and not pattern.startswith('#'): |
81
|
1 |
|
self._ignores_cache.append('*' + pattern + '*') |
82
|
|
|
yield from self._ignores_cache |
83
|
1 |
|
|
84
|
|
|
@property |
85
|
|
|
def paths(self): |
86
|
1 |
|
"""Yield non-ignored paths in the working copy.""" |
87
|
1 |
|
if self._path_cache is None or not settings.CACHE_PATHS: |
88
|
1 |
|
log.debug("reading and caching all file paths...") |
89
|
1 |
|
self._path_cache = [] |
90
|
1 |
|
for dirpath, _, filenames in os.walk(self.path): |
91
|
1 |
|
for filename in filenames: |
92
|
|
|
path = os.path.join(dirpath, filename) |
93
|
1 |
|
relpath = os.path.relpath(path, self.path) |
94
|
1 |
|
# Skip ignored paths |
95
|
|
|
if self.ignored(relpath): |
96
|
1 |
|
continue |
97
|
1 |
|
# Skip hidden paths |
98
|
1 |
|
if os.path.sep + '.' in os.path.sep + relpath: |
99
|
1 |
|
continue |
100
|
1 |
|
self._path_cache.append((path, filename, relpath)) |
101
|
|
|
yield from self._path_cache |
102
|
1 |
|
|
103
|
|
|
def ignored(self, path): |
104
|
1 |
|
"""Determine if a path matches an ignored pattern.""" |
105
|
1 |
|
for pattern in self.ignores: |
106
|
1 |
|
if fnmatch.fnmatch(path, pattern): |
107
|
1 |
|
return True |
108
|
|
|
return False |
109
|
|
|
|