Completed
Push — develop ( d0d35e...48c1c9 )
by Jace
15s queued 10s
created

gitman.models.source.Source.__init__()   A

Complexity

Conditions 4

Size

Total Lines 24
Code Lines 21

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 19
CRAP Score 4

Importance

Changes 0
Metric Value
cc 4
eloc 21
nop 8
dl 0
loc 24
ccs 19
cts 19
cp 1
crap 4
rs 9.376
c 0
b 0
f 0

How to fix   Many Parameters   

Many Parameters

Methods with many parameters are not only hard to understand, but their parameters also often become inconsistent when you need more, or different data.

There are several approaches to avoid long parameter lists:

1 1
import logging
2 1
import os
3 1
import warnings
4
5 1
import yorm
0 ignored issues
show
introduced by
Unable to import 'yorm'
Loading history...
6 1
from yorm.types import AttributeDictionary, List, NullableString, String
0 ignored issues
show
introduced by
Unable to import 'yorm.types'
Loading history...
7
8 1
from .. import common, exceptions, git, shell
9
10
11 1
log = logging.getLogger(__name__)
12
13
14 1
@yorm.attr(name=String)
15 1
@yorm.attr(type=String)
16 1
@yorm.attr(repo=String)
17 1
@yorm.attr(sparse_paths=List.of_type(String))
18 1
@yorm.attr(rev=String)
19 1
@yorm.attr(link=NullableString)
20
@yorm.attr(scripts=List.of_type(String))
21
class Source(AttributeDictionary):
22 1
    """A dictionary of `git` and `ln` arguments."""
23 1
24
    DIRTY = '<dirty>'
25 1
    UNKNOWN = '<unknown>'
26 1
27 1
    def __init__(
28 1
        self,
29 1
        type,
30 1
        repo,
31 1
        name=None,
32
        rev='master',
33 1
        link=None,
34 1
        scripts=None,
35 1
        sparse_paths=None,
36 1
    ):
37
38 1
        super().__init__()
39 1
        self.type = type or 'git'
40
        self.repo = repo
41 1
        self.name = self._infer_name(repo) if name is None else name
42 1
        self.rev = rev
43 1
        self.link = link
44 1
        self.scripts = scripts or []
45 1
        self.sparse_paths = sparse_paths or []
46
47 1
        for key in ['name', 'repo', 'rev']:
48 1
            if not self[key]:
49
                msg = "'{}' required for {}".format(key, repr(self))
50 1
                raise exceptions.InvalidConfig(msg)
51 1
52
    def _on_post_load(self):
53 1
        self.type = self.type or 'git'
54 1
55
    def __repr__(self):
56 1
        return "<source {}>".format(self)
57
58 1
    def __str__(self):
59
        pattern = "['{t}'] '{r}' @ '{v}' in '{d}'"
60
        if self.link:
61 1
            pattern += " <- '{s}'"
62 1
        return pattern.format(
63
            t=self.type, r=self.repo, v=self.rev, d=self.name, s=self.link
64
        )
65 1
66 1
    def __eq__(self, other):
67 1
        return self.name == other.name
68
69
    def __ne__(self, other):
70 1
        return self.name != other.name
71 1
72 1
    def __lt__(self, other):
73
        return self.name < other.name
74
75
    def update_files(
76
        self,
77 1
        force=False,
78
        force_interactive=False,
79
        fetch=False,
80 1
        clean=True,
81
        skip_changes=False,
82
    ):
83 1
        """Ensure the source matches the specified revision."""
84
        log.info("Updating source files...")
85 1
86
        # Clone the repository if needed
87 1
        if not os.path.exists(self.name):
88 1
            git.clone(
89
                self.type,
90 1
                self.repo,
91
                self.name,
92 1
                sparse_paths=self.sparse_paths,
93
                rev=self.rev,
94
            )
95
96 1
        # Enter the working tree
97 1
        shell.cd(self.name)
98
        if not git.valid():
99 1
            raise self._invalid_repository
100 1
101 1
        # Check for uncommitted changes
102 1
        if not force:
103 1
            log.debug("Confirming there are no uncommitted changes...")
104
            if skip_changes:
105 1
                if git.changes(
106 1
                    self.type, include_untracked=clean, display_status=False
107
                ):
108 1
                    common.show(
109
                        f'Skipped update due to uncommitted changes in {os.getcwd()}',
110 1
                        color='git_changes',
111 1
                    )
112
                    return
113
            elif force_interactive:
114 1
                if git.changes(
115 1
                    self.type, include_untracked=clean, display_status=False
116
                ):
117
                    common.show(
118
                        f'Uncommitted changes found in {os.getcwd()}',
119 1
                        color='git_changes',
120 1
                    )
121 1
122 1
                    while True:
123
                        yn_input = str(
124
                            input("Do you want to overwrite? (Y/N)[Y]: ")
125 1
                        ).rstrip('\r\n')
126 1
127 1
                        if yn_input.lower() == "y" or not yn_input:
128 1
                            break
129 1
130 1
                        if yn_input.lower() == "n":
131 1
                            common.show(
132 1
                                f'Skipped update in {os.getcwd()}', color='git_changes'
133
                            )
134 1
                            return
135 1
136
            else:
137
                if git.changes(self.type, include_untracked=clean):
138 1
                    raise exceptions.UncommittedChanges(
139
                        f'Uncommitted changes in {os.getcwd()}'
140 1
                    )
141
142 1
        # Fetch the desired revision
143
        if fetch or git.is_fetch_required(self.type, self.rev):
144 1
            git.fetch(self.type, self.repo, self.name, rev=self.rev)
145 1
146 1
        # Update the working tree to the desired revision
147
        git.update(
148 1
            self.type, self.repo, self.name, fetch=fetch, clean=clean, rev=self.rev
149 1
        )
150 1
151 1
    def create_link(self, root, force=False):
152 1
        """Create a link from the target name to the current directory."""
153 1
        if not self.link:
154
            return
155
156
        log.info("Creating a symbolic link...")
157
158
        if os.name == 'nt':
159 1
            warnings.warn("Symbolic links are not supported on Windows")
160 1
            return
161 1
162 1
        target = os.path.join(root, self.link)
163
        source = os.path.relpath(os.getcwd(), os.path.dirname(target))
164 1
165
        if os.path.islink(target):
166 1
            os.remove(target)
167
        elif os.path.exists(target):
168
            if force:
169
                shell.rm(target)
170 1
            else:
171
                msg = "Preexisting link location at {}".format(target)
172 1
                raise exceptions.UncommittedChanges(msg)
173
174 1
        shell.ln(source, target)
175 1
176 1
    def run_scripts(self, force=False):
177
        log.info("Running install scripts...")
178 1
179
        # Enter the working tree
180 1
        shell.cd(self.name)
181
        if not git.valid():
182 1
            raise self._invalid_repository
183 1
184 1
        # Check for scripts
185
        if not self.scripts:
186 1
            common.show("(no scripts to run)", color='shell_info')
187
            common.newline()
188 1
            return
189 1
190 1
        # Run all scripts
191
        for script in self.scripts:
192
            try:
193
                lines = shell.call(script, _shell=True)
194
            except exceptions.ShellError as exc:
195
                common.show(*exc.output, color='shell_error')
196
                cmd = exc.program
197
                if force:
198
                    log.debug("Ignored error from call to '%s'", cmd)
199
                else:
200
                    msg = "Command '{}' failed in {}".format(cmd, os.getcwd())
201
                    raise exceptions.ScriptFailure(msg)
202
            else:
203
                common.show(*lines, color='shell_output')
204
        common.newline()
205
206
    def identify(self, allow_dirty=True, allow_missing=True, skip_changes=False):
207
        """Get the path and current repository URL and hash."""
208
        if os.path.isdir(self.name):
209
210
            shell.cd(self.name)
211
            if not git.valid():
212
                raise self._invalid_repository
213
214
            path = os.getcwd()
215
            url = git.get_url(self.type)
216
            if git.changes(
217
                self.type,
218
                display_status=not allow_dirty and not skip_changes,
219
                _show=not skip_changes,
220
            ):
221
222
                if allow_dirty:
223
                    common.show(self.DIRTY, color='git_dirty', log=False)
224
                    common.newline()
225
                    return path, url, self.DIRTY
226
227
                if skip_changes:
228
                    msg = ("Skipped lock due to uncommitted changes " "in {}").format(
229
                        os.getcwd()
230
                    )
231
                    common.show(msg, color='git_changes')
232
                    common.newline()
233
                    return path, url, self.DIRTY
234
235
                msg = "Uncommitted changes in {}".format(os.getcwd())
236
                raise exceptions.UncommittedChanges(msg)
237
238
            rev = git.get_hash(self.type, _show=True)
239
            common.show(rev, color='git_rev', log=False)
240
            common.newline()
241
            return path, url, rev
242
243
        if allow_missing:
244
            return os.getcwd(), '<missing>', self.UNKNOWN
245
246
        raise self._invalid_repository
247
248
    def lock(self, rev=None, allow_dirty=False, skip_changes=False):
249
        """Create a locked source object.
250
251
        Return a locked version of the current source if not dirty
252
        otherwise None.
253
        """
254
255
        if rev is None:
256
            _, _, rev = self.identify(
257
                allow_dirty=allow_dirty, allow_missing=False, skip_changes=skip_changes
258
            )
259
260
        if rev == self.DIRTY:
261
            return None
262
263
        source = self.__class__(
264
            self.type,
265
            self.repo,
266
            self.name,
267
            rev,
268
            self.link,
269
            self.scripts,
270
            self.sparse_paths,
271
        )
272
        return source
273
274
    @property
275
    def _invalid_repository(self):
276
        path = os.path.join(os.getcwd(), self.name)
277
        msg = "Not a valid repository: {}".format(path)
278
        return exceptions.InvalidRepository(msg)
279
280
    @staticmethod
281
    def _infer_name(repo):
282
        filename = repo.split('/')[-1]
283
        name = filename.split('.')[0]
284
        return name
285