Passed
Pull Request — develop (#183)
by
unknown
01:58
created

gitman.models.source.Source.__init__()   A

Complexity

Conditions 4

Size

Total Lines 16
Code Lines 14

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 13
CRAP Score 4

Importance

Changes 0
Metric Value
cc 4
eloc 14
nop 8
dl 0
loc 16
ccs 13
cts 13
cp 1
crap 4
rs 9.7
c 0
b 0
f 0

How to fix   Many Parameters   

Many Parameters

Methods with many parameters are not only hard to understand, but their parameters also often become inconsistent when you need more, or different data.

There are several approaches to avoid long parameter lists:

1 1
import logging
2 1
import os
3 1
import warnings
4
5 1
import yorm
6 1
from yorm.types import AttributeDictionary, List, NullableString, String
7
8 1
from .. import common, exceptions, git, shell
9
10
11 1
log = logging.getLogger(__name__)
12
13
14 1
@yorm.attr(name=String)
15 1
@yorm.attr(type=String)
16 1
@yorm.attr(repo=String)
17 1
@yorm.attr(sparse_paths=List.of_type(String))
18 1
@yorm.attr(rev=String)
19 1
@yorm.attr(link=NullableString)
20
@yorm.attr(scripts=List.of_type(String))
21
class Source(AttributeDictionary):
22 1
    """A dictionary of `git` and `ln` arguments."""
23 1
24
    DIRTY = '<dirty>'
25 1
    UNKNOWN = '<unknown>'
26 1
27 1
    def __init__(self, type, repo, name=None, rev='master',
28 1
                 link=None, scripts=None, sparse_paths=None):
29 1
30 1
        super().__init__()
31 1
        self.type = type or 'git'
32
        self.repo = repo
33 1
        self.name = self._infer_name(repo) if name is None else name
34 1
        self.rev = rev
35 1
        self.link = link
36 1
        self.scripts = scripts or []
37
        self.sparse_paths = sparse_paths or []
38 1
39 1
        for key in ['name', 'repo', 'rev']:
40
            if not self[key]:
41 1
                msg = "'{}' required for {}".format(key, repr(self))
42 1
                raise exceptions.InvalidConfig(msg)
43 1
44 1
    def _on_post_load(self):
45 1
        self.type = self.type or 'git'
46
47 1
    def __repr__(self):
48 1
        return "<source {}>".format(self)
49
50 1
    def __str__(self):
51 1
        pattern = "['{t}'] '{r}' @ '{v}' in '{d}'"
52
        if self.link:
53 1
            pattern += " <- '{s}'"
54 1
        return pattern.format(t=self.type, r=self.repo,
55
                              v=self.rev, d=self.name, s=self.link)
56 1
57
    def __eq__(self, other):
58 1
        return self.name == other.name
59
60
    def __ne__(self, other):
61 1
        return self.name != other.name
62 1
63
    def __lt__(self, other):
64
        return self.name < other.name
65 1
66 1
    def update_files(self, force=False, fetch=False, clean=True,
67 1
                     skip_changes=False):
68
        """Ensure the source matches the specified revision."""
69
        log.info("Updating source files...")
70 1
71 1
        # Clone the repository if needed
72 1
        if not os.path.exists(self.name):
73
            git.clone(self.type, self.repo, self.name,
74
                      sparse_paths=self.sparse_paths, rev=self.rev)
75
76
        # Enter the working tree
77 1
        shell.cd(self.name)
78
        if not git.valid():
79
            raise self._invalid_repository
80 1
81
        # Check for uncommitted changes
82
        if not force:
83 1
            log.debug("Confirming there are no uncommitted changes...")
84
            if skip_changes:
85 1
                if git.changes(self.type, include_untracked=clean,
86
                               display_status=False):
87 1
                    msg = ("Skipped update due to uncommitted changes "
88 1
                           "in {}").format(os.getcwd())
89
                    common.show(msg, color='git_changes')
90 1
                    return
91
            else:
92 1
                if git.changes(self.type, include_untracked=clean):
93
                    msg = "Uncommitted changes in {}".format(os.getcwd())
94
                    raise exceptions.UncommittedChanges(msg)
95
96 1
        # Fetch the desired revision
97 1
        if fetch or git.is_fetch_required(self.type, self.rev):
98
            git.fetch(self.type, self.repo, self.name, rev=self.rev)
99 1
100 1
        # Update the working tree to the desired revision
101 1
        git.update(self.type, self.repo, self.name,
102 1
                   fetch=fetch, clean=clean, rev=self.rev)
103 1
104
    def create_link(self, root, force=False):
105 1
        """Create a link from the target name to the current directory."""
106 1
        if not self.link:
107
            return
108 1
109
        log.info("Creating a symbolic link...")
110 1
111 1
        if os.name == 'nt':
112
            warnings.warn("Symbolic links are not supported on Windows")
113
            return
114 1
115 1
        target = os.path.join(root, self.link)
116
        source = os.path.relpath(os.getcwd(), os.path.dirname(target))
117
118
        if os.path.islink(target):
119 1
            os.remove(target)
120 1
        elif os.path.exists(target):
121 1
            if force:
122 1
                shell.rm(target)
123
            else:
124
                msg = "Preexisting link location at {}".format(target)
125 1
                raise exceptions.UncommittedChanges(msg)
126 1
127 1
        shell.ln(source, target)
128 1
129 1
    def run_scripts(self, force=False):
130 1
        log.info("Running install scripts...")
131 1
132 1
        # Enter the working tree
133
        shell.cd(self.name)
134 1
        if not git.valid():
135 1
            raise self._invalid_repository
136
137
        # Check for scripts
138 1
        if not self.scripts:
139
            common.show("(no scripts to run)", color='shell_info')
140 1
            common.newline()
141
            return
142 1
143
        # Run all scripts
144 1
        for script in self.scripts:
145 1
            try:
146 1
                lines = shell.call(script, _shell=True)
147
            except exceptions.ShellError as exc:
148 1
                common.show(*exc.output, color='shell_error')
149 1
                cmd = exc.program
150 1
                if force:
151 1
                    log.debug("Ignored error from call to '%s'", cmd)
152 1
                else:
153 1
                    msg = "Command '{}' failed in {}".format(cmd, os.getcwd())
154
                    raise exceptions.ScriptFailure(msg)
155
            else:
156
                common.show(*lines, color='shell_output')
157
        common.newline()
158
159 1
    def identify(self, allow_dirty=True, allow_missing=True,
160 1
                 skip_changes=False):
161 1
        """Get the path and current repository URL and hash."""
162 1
        if os.path.isdir(self.name):
163
164 1
            shell.cd(self.name)
165
            if not git.valid():
166 1
                raise self._invalid_repository
167
168
            path = os.getcwd()
169
            url = git.get_url(self.type)
170 1
            if git.changes(self.type,
171
                           display_status=not allow_dirty and not skip_changes,
172 1
                           _show=not skip_changes):
173
174 1
                if allow_dirty:
175 1
                    common.show(self.DIRTY, color='git_dirty', log=False)
176 1
                    common.newline()
177
                    return path, url, self.DIRTY
178 1
179
                if skip_changes:
180 1
                    msg = ("Skipped lock due to uncommitted changes "
181
                           "in {}").format(os.getcwd())
182 1
                    common.show(msg, color='git_changes')
183 1
                    common.newline()
184 1
                    return path, url, self.DIRTY
185
186 1
                msg = "Uncommitted changes in {}".format(os.getcwd())
187
                raise exceptions.UncommittedChanges(msg)
188 1
189 1
            rev = git.get_hash(self.type, _show=True)
190 1
            common.show(rev, color='git_rev', log=False)
191
            common.newline()
192
            return path, url, rev
193
194
        if allow_missing:
195
            return os.getcwd(), '<missing>', self.UNKNOWN
196
197
        raise self._invalid_repository
198
199
    def lock(self, rev=None, allow_dirty=False, skip_changes=False):
200
        """Create a locked source object.
201
202
        Return a locked version of the current source if not dirty
203
        otherwise None.
204
        """
205
206
        if rev is None:
207
            _, _, rev = self.identify(allow_dirty=allow_dirty,
208
                                      allow_missing=False,
209
                                      skip_changes=skip_changes)
210
211
        if rev == self.DIRTY:
212
            return None
213
214
        source = self.__class__(self.type, self.repo,
215
                                self.name, rev,
216
                                self.link, self.scripts,
217
                                self.sparse_paths)
218
        return source
219
220
    @property
221
    def _invalid_repository(self):
222
        path = os.path.join(os.getcwd(), self.name)
223
        msg = "Not a valid repository: {}".format(path)
224
        return exceptions.InvalidRepository(msg)
225
226
    @staticmethod
227
    def _infer_name(repo):
228
        filename = repo.split('/')[-1]
229
        name = filename.split('.')[0]
230
        return name
231