Passed
Pull Request — develop (#203)
by Jace
02:44
created

gitman.models.source.Source.__init__()   A

Complexity

Conditions 4

Size

Total Lines 24
Code Lines 21

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 19
CRAP Score 4

Importance

Changes 0
Metric Value
cc 4
eloc 21
nop 8
dl 0
loc 24
ccs 19
cts 19
cp 1
crap 4
rs 9.376
c 0
b 0
f 0

How to fix   Many Parameters   

Many Parameters

Methods with many parameters are not only hard to understand, but their parameters also often become inconsistent when you need more, or different data.

There are several approaches to avoid long parameter lists:

1 1
import os
2 1
from dataclasses import dataclass, field
3 1
from typing import List, Optional
4
5 1
import log
6 1
7
from .. import common, exceptions, git, shell
8 1
9
10
@dataclass
11 1
class Source:
12
    """A dictionary of `git` and `ln` arguments."""
13
14 1
    name: Optional[str]
0 ignored issues
show
introduced by
Variable annotation syntax is only supported in Python 3.6 and greater (<unknown>, line 14)
Loading history...
15 1
    type: str
16 1
    repo: str
17 1
    sparse_paths: List[Optional[str]] = field(default_factory=list)
18 1
    rev: str = 'master'
19 1
    link: Optional[str] = None
20
    scripts: List[Optional[str]] = field(default_factory=list)
21
22 1
    DIRTY = '<dirty>'
23 1
    UNKNOWN = '<unknown>'
24
25 1
    def __post_init__(self):
26 1
        if self.name is None:
27 1
            self.name = self._infer_name(self.repo)
28 1
29 1
        # TODO: Remove this?
30 1
        for name in ['name', 'repo', 'rev']:
31 1
            if not getattr(self, name):
32
                msg = "'{}' required for {}".format(name, repr(self))
33 1
                raise exceptions.InvalidConfig(msg)
34 1
35 1
    def _on_post_load(self):
36 1
        # TODO: Remove this?
37
        self.type = self.type or 'git'
38 1
39 1
    def __repr__(self):
40
        return "<source {}>".format(self)
41 1
42 1
    def __str__(self):
43 1
        pattern = "['{t}'] '{r}' @ '{v}' in '{d}'"
44 1
        if self.link:
45 1
            pattern += " <- '{s}'"
46
        return pattern.format(
47 1
            t=self.type, r=self.repo, v=self.rev, d=self.name, s=self.link
48 1
        )
49
50 1
    def __eq__(self, other):
51 1
        return self.name == other.name
52
53 1
    def __ne__(self, other):
54 1
        return self.name != other.name
55
56 1
    def __lt__(self, other):
57
        return self.name < other.name
58 1
59
    def update_files(
60
        self,
61 1
        force=False,
62 1
        force_interactive=False,
63
        fetch=False,
64
        clean=True,
65 1
        skip_changes=False,
66 1
    ):
67 1
        """Ensure the source matches the specified revision."""
68
        log.info("Updating source files...")
69
70 1
        # Clone the repository if needed
71 1
        if not os.path.exists(self.name):
72 1
            git.clone(
73
                self.type,
74
                self.repo,
75
                self.name,
76
                sparse_paths=self.sparse_paths,
77 1
                rev=self.rev,
78
            )
79
80 1
        # Enter the working tree
81
        shell.cd(self.name)
82
        if not git.valid():
83 1
            if force:
84
                git.rebuild(self.type, self.repo)
85 1
                fetch = True
86
            else:
87 1
                raise self._invalid_repository
88 1
89
        # Check for uncommitted changes
90 1
        if not force:
91
            log.debug("Confirming there are no uncommitted changes...")
92 1
            if skip_changes:
93
                if git.changes(
94
                    self.type, include_untracked=clean, display_status=False
95
                ):
96 1
                    common.show(
97 1
                        f'Skipped update due to uncommitted changes in {os.getcwd()}',
98
                        color='git_changes',
99 1
                    )
100 1
                    return
101 1
            elif force_interactive:
102 1
                if git.changes(
103 1
                    self.type, include_untracked=clean, display_status=False
104
                ):
105 1
                    common.show(
106 1
                        f'Uncommitted changes found in {os.getcwd()}',
107
                        color='git_changes',
108 1
                    )
109
110 1
                    while True:
111 1
                        yn_input = str(
112
                            input("Do you want to overwrite? (Y/N)[Y]: ")
113
                        ).rstrip('\r\n')
114 1
115 1
                        if yn_input.lower() == "y" or not yn_input:
116
                            break
117
118
                        if yn_input.lower() == "n":
119 1
                            common.show(
120 1
                                f'Skipped update in {os.getcwd()}', color='git_changes'
121 1
                            )
122 1
                            return
123
124
            else:
125 1
                if git.changes(self.type, include_untracked=clean):
126 1
                    raise exceptions.UncommittedChanges(
127 1
                        f'Uncommitted changes in {os.getcwd()}'
128 1
                    )
129 1
130 1
        # Fetch the desired revision
131 1
        if fetch or git.is_fetch_required(self.type, self.rev):
132 1
            git.fetch(self.type, self.repo, self.name, rev=self.rev)
133
134 1
        # Update the working tree to the desired revision
135 1
        git.update(
136
            self.type, self.repo, self.name, fetch=fetch, clean=clean, rev=self.rev
137
        )
138 1
139
    def create_link(self, root, force=False):
140 1
        """Create a link from the target name to the current directory."""
141
        if not self.link:
142 1
            return
143
144 1
        log.info("Creating a symbolic link...")
145 1
146 1
        target = os.path.join(root, self.link)
147
        source = os.path.relpath(os.getcwd(), os.path.dirname(target))
148 1
149 1
        if os.path.islink(target):
150 1
            os.remove(target)
151 1
        elif os.path.exists(target):
152 1
            if force:
153 1
                shell.rm(target)
154
            else:
155
                msg = "Preexisting link location at {}".format(target)
156
                raise exceptions.UncommittedChanges(msg)
157
158
        shell.ln(source, target)
159 1
160 1
    def run_scripts(self, force=False):
161 1
        log.info("Running install scripts...")
162 1
163
        # Enter the working tree
164 1
        shell.cd(self.name)
165
        if not git.valid():
166 1
            raise self._invalid_repository
167
168
        # Check for scripts
169
        if not self.scripts:
170 1
            common.show("(no scripts to run)", color='shell_info')
171
            common.newline()
172 1
            return
173
174 1
        # Run all scripts
175 1
        for script in self.scripts:
176 1
            try:
177
                lines = shell.call(script, _shell=True)
178 1
            except exceptions.ShellError as exc:
179
                common.show(*exc.output, color='shell_error')
180 1
                cmd = exc.program
181
                if force:
182 1
                    log.debug("Ignored error from call to '%s'", cmd)
183 1
                else:
184 1
                    msg = "Command '{}' failed in {}".format(cmd, os.getcwd())
185
                    raise exceptions.ScriptFailure(msg)
186 1
            else:
187
                common.show(*lines, color='shell_output')
188 1
        common.newline()
189 1
190 1
    def identify(self, allow_dirty=True, allow_missing=True, skip_changes=False):
191
        """Get the path and current repository URL and hash."""
192
        if os.path.isdir(self.name):
193
194
            shell.cd(self.name)
195
            if not git.valid():
196
                raise self._invalid_repository
197
198
            path = os.getcwd()
199
            url = git.get_url(self.type)
200
            if git.changes(
201
                self.type,
202
                display_status=not allow_dirty and not skip_changes,
203
                _show=not skip_changes,
204
            ):
205
206
                if allow_dirty:
207
                    common.show(self.DIRTY, color='git_dirty', log=False)
208
                    common.newline()
209
                    return path, url, self.DIRTY
210
211
                if skip_changes:
212
                    msg = ("Skipped lock due to uncommitted changes " "in {}").format(
213
                        os.getcwd()
214
                    )
215
                    common.show(msg, color='git_changes')
216
                    common.newline()
217
                    return path, url, self.DIRTY
218
219
                msg = "Uncommitted changes in {}".format(os.getcwd())
220
                raise exceptions.UncommittedChanges(msg)
221
222
            rev = git.get_hash(self.type, _show=True)
223
            common.show(rev, color='git_rev', log=False)
224
            common.newline()
225
            return path, url, rev
226
227
        if allow_missing:
228
            return os.getcwd(), '<missing>', self.UNKNOWN
229
230
        raise self._invalid_repository
231
232
    def lock(self, rev=None, allow_dirty=False, skip_changes=False):
233
        """Create a locked source object.
234
235
        Return a locked version of the current source if not dirty
236
        otherwise None.
237
        """
238
239
        if rev is None:
240
            _, _, rev = self.identify(
241
                allow_dirty=allow_dirty, allow_missing=False, skip_changes=skip_changes
242
            )
243
244
        if rev == self.DIRTY:
245
            return None
246
247
        source = self.__class__(
248
            type=self.type,
249
            repo=self.repo,
250
            name=self.name,
251
            rev=rev,
252
            link=self.link,
253
            scripts=self.scripts,
254
            sparse_paths=self.sparse_paths,
255
        )
256
        return source
257
258
    @property
259
    def _invalid_repository(self):
260
        path = os.path.join(os.getcwd(), self.name)
261
        msg = """
262
263
            Not a valid repository: {}
264
            During install you can rebuild a repo with a missing .git directory using the --force option
265
            """.format(
266
            path
267
        )
268
        return exceptions.InvalidRepository(msg)
269
270
    @staticmethod
271
    def _infer_name(repo):
272
        filename = repo.split('/')[-1]
273
        name = filename.split('.')[0]
274
        return name
275