Completed
Push — develop ( cb0c0c...a02596 )
by Jace
12s
created

gitman.models.source.Source.update_files()   C

Complexity

Conditions 9

Size

Total Lines 37
Code Lines 25

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 19
CRAP Score 9.4259

Importance

Changes 0
Metric Value
cc 9
eloc 25
nop 5
dl 0
loc 37
ccs 19
cts 23
cp 0.8261
crap 9.4259
rs 6.6666
c 0
b 0
f 0
1 1
import logging
2 1
import os
3 1
import warnings
4
5 1
import yorm
6 1
from yorm.types import AttributeDictionary, List, NullableString, String
7
8 1
from .. import common, exceptions, git, shell
9
10
11 1
log = logging.getLogger(__name__)
12
13
14 1
@yorm.attr(name=String)
15 1
@yorm.attr(type=String)
16 1
@yorm.attr(repo=String)
17 1
@yorm.attr(sparse_paths=List.of_type(String))
18 1
@yorm.attr(rev=String)
19 1
@yorm.attr(link=NullableString)
20
@yorm.attr(scripts=List.of_type(String))
21
class Source(AttributeDictionary):
22 1
    """A dictionary of `git` and `ln` arguments."""
23 1
24
    DIRTY = '<dirty>'
25 1
    UNKNOWN = '<unknown>'
26 1
27 1
    def __init__(self, type, repo, name=None, rev='master',
28 1
                 link=None, scripts=None, sparse_paths=None):
29 1
30 1
        super().__init__()
31 1
        self.type = type or 'git'
32
        self.repo = repo
33 1
        self.name = self._infer_name(repo) if name is None else name
34 1
        self.rev = rev
35 1
        self.link = link
36 1
        self.scripts = scripts or []
37
        self.sparse_paths = sparse_paths or []
38 1
39 1
        for key in ['name', 'repo', 'rev']:
40
            if not self[key]:
41 1
                msg = "'{}' required for {}".format(key, repr(self))
42 1
                raise exceptions.InvalidConfig(msg)
43 1
44 1
    def _on_post_load(self):
45 1
        self.type = self.type or 'git'
46
47 1
    def __repr__(self):
48 1
        return "<source {}>".format(self)
49
50 1
    def __str__(self):
51 1
        pattern = "['{t}'] '{r}' @ '{v}' in '{d}'"
52
        if self.link:
53 1
            pattern += " <- '{s}'"
54 1
        return pattern.format(t=self.type, r=self.repo,
55
                              v=self.rev, d=self.name, s=self.link)
56 1
57
    def __eq__(self, other):
58 1
        return self.name == other.name
59
60
    def __ne__(self, other):
61 1
        return self.name != other.name
62 1
63
    def __lt__(self, other):
64
        return self.name < other.name
65 1
66 1
    def update_files(self, force=False, fetch=False, clean=True,
67 1
                     skip_changes=False):
68
        """Ensure the source matches the specified revision."""
69
        log.info("Updating source files...")
70 1
71 1
        # Clone the repository if needed
72 1
        if not os.path.exists(self.name):
73
            git.clone(self.type, self.repo, self.name,
74
                      sparse_paths=self.sparse_paths, rev=self.rev)
75
76
        # Enter the working tree
77 1
        shell.cd(self.name)
78
        if not git.valid():
79
            raise self._invalid_repository
80 1
81
        # Check for uncommitted changes
82
        if not force:
83 1
            log.debug("Confirming there are no uncommitted changes...")
84
            if skip_changes:
85 1
                if git.changes(self.type, include_untracked=clean,
86
                               display_status=False):
87 1
                    msg = ("Skipped update due to uncommitted changes "
88 1
                           "in {}").format(os.getcwd())
89
                    common.show(msg, color='git_changes')
90 1
                    return
91
            else:
92 1
                if git.changes(self.type, include_untracked=clean):
93
                    msg = "Uncommitted changes in {}".format(os.getcwd())
94
                    raise exceptions.UncommittedChanges(msg)
95
96 1
        # Fetch the desired revision
97 1
        if fetch or git.is_fetch_required(self.type, self.rev):
98
            git.fetch(self.type, self.repo, self.name, rev=self.rev)
99 1
100 1
        # Update the working tree to the desired revision
101 1
        git.update(self.type, self.repo, self.name,
102 1
                   fetch=fetch, clean=clean, rev=self.rev)
103 1
104
    def create_link(self, root, force=False):
105 1
        """Create a link from the target name to the current directory."""
106 1
        if not self.link:
107
            return
108 1
109
        log.info("Creating a symbolic link...")
110 1
111 1
        if os.name == 'nt':
112
            warnings.warn("Symbolic links are not supported on Windows")
113
            return
114 1
115 1
        target = os.path.join(root, self.link)
116
        source = os.path.relpath(os.getcwd(), os.path.dirname(target))
117
118
        if os.path.islink(target):
119 1
            os.remove(target)
120 1
        elif os.path.exists(target):
121 1
            if force:
122 1
                shell.rm(target)
123
            else:
124
                msg = "Preexisting link location at {}".format(target)
125 1
                raise exceptions.UncommittedChanges(msg)
126 1
127 1
        shell.ln(source, target)
128 1
129 1
    def run_scripts(self, force=False):
130 1
        log.info("Running install scripts...")
131 1
132 1
        # Enter the working tree
133
        shell.cd(self.name)
134 1
        if not git.valid():
135 1
            raise self._invalid_repository
136
137
        # Check for scripts
138 1
        if not self.scripts:
139
            common.show("(no scripts to run)", color='shell_info')
140 1
            common.newline()
141
            return
142 1
143
        # Run all scripts
144 1
        for script in self.scripts:
145 1
            try:
146 1
                lines = shell.call(script, _shell=True)
147
            except exceptions.ShellError as exc:
148 1
                common.show(*exc.output, color='shell_error')
149 1
                cmd = exc.program
150 1
                if force:
151 1
                    log.debug("Ignored error from call to '%s'", cmd)
152 1
                else:
153 1
                    msg = "Command '{}' failed in {}".format(cmd, os.getcwd())
154
                    raise exceptions.ScriptFailure(msg)
155
            else:
156
                common.show(*lines, color='shell_output')
157
        common.newline()
158
159 1
    def identify(self, allow_dirty=True, allow_missing=True,
160 1
                 skip_changes=False):
161 1
        """Get the path and current repository URL and hash."""
162 1
        if os.path.isdir(self.name):
163
164 1
            shell.cd(self.name)
165
            if not git.valid():
166 1
                raise self._invalid_repository
167
168
            path = os.getcwd()
169
            url = git.get_url(self.type)
170 1
            if git.changes(self.type,
171
                           display_status=not allow_dirty and not skip_changes,
172 1
                           _show=not skip_changes):
173
174 1
                if allow_dirty:
175 1
                    common.show(self.DIRTY, color='git_dirty', log=False)
176 1
                    common.newline()
177
                    return path, url, self.DIRTY
178 1
179
                if skip_changes:
180 1
                    msg = ("Skipped lock due to uncommitted changes "
181
                           "in {}").format(os.getcwd())
182 1
                    common.show(msg, color='git_changes')
183 1
                    common.newline()
184 1
                    return path, url, self.DIRTY
185
186 1
                msg = "Uncommitted changes in {}".format(os.getcwd())
187
                raise exceptions.UncommittedChanges(msg)
188 1
189 1
            rev = git.get_hash(self.type, _show=True)
190 1
            common.show(rev, color='git_rev', log=False)
191
            common.newline()
192
            return path, url, rev
193
194
        if allow_missing:
195
            return os.getcwd(), '<missing>', self.UNKNOWN
196
197
        raise self._invalid_repository
198
199
    def lock(self, rev=None, allow_dirty=False, skip_changes=False):
200
        """Create a locked source object.
201
202
        Return a locked version of the current source if not dirty
203
        otherwise None.
204
        """
205
206
        if rev is None:
207
            _, _, rev = self.identify(allow_dirty=allow_dirty,
208
                                      allow_missing=False,
209
                                      skip_changes=skip_changes)
210
211
        if rev == self.DIRTY:
212
            return None
213
214
        source = self.__class__(self.type, self.repo,
215
                                self.name, rev,
216
                                self.link, self.scripts,
217
                                self.sparse_paths)
218
        return source
219
220
    @property
221
    def _invalid_repository(self):
222
        path = os.path.join(os.getcwd(), self.name)
223
        msg = "Not a valid repository: {}".format(path)
224
        return exceptions.InvalidRepository(msg)
225
226
    @staticmethod
227
    def _infer_name(repo):
228
        filename = repo.split('/')[-1]
229
        name = filename.split('.')[0]
230
        return name
231