Completed
Push — develop ( f5b93c...24eb3d )
by Jace
11s
created

gitman.models.source.Source.__init__()   A

Complexity

Conditions 4

Size

Total Lines 16
Code Lines 14

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 13
CRAP Score 4

Importance

Changes 0
Metric Value
cc 4
eloc 14
nop 8
dl 0
loc 16
ccs 13
cts 13
cp 1
crap 4
rs 9.7
c 0
b 0
f 0

How to fix   Many Parameters   

Many Parameters

Methods with many parameters are not only hard to understand, but their parameters also often become inconsistent when you need more, or different data.

There are several approaches to avoid long parameter lists:

1 1
import logging
2 1
import os
3 1
import warnings
4
5 1
import yorm
6 1
from yorm.types import AttributeDictionary, List, NullableString, String
7
8 1
from .. import common, exceptions, git, shell
9
10
11 1
log = logging.getLogger(__name__)
12
13
14 1
@yorm.attr(name=String)
15 1
@yorm.attr(type=String)
16 1
@yorm.attr(repo=String)
17 1
@yorm.attr(sparse_paths=List.of_type(String))
18 1
@yorm.attr(rev=String)
19 1
@yorm.attr(link=NullableString)
20
@yorm.attr(scripts=List.of_type(String))
21
class Source(AttributeDictionary):
22 1
    """A dictionary of `git` and `ln` arguments."""
23 1
24
    DIRTY = '<dirty>'
25 1
    UNKNOWN = '<unknown>'
26 1
27 1
    def __init__(self, type, repo, name=None, rev='master',
28 1
                 link=None, scripts=None, sparse_paths=None):
29 1
30 1
        super().__init__()
31 1
        self.type = type or 'git'
32
        self.repo = repo
33 1
        self.name = self._infer_name(repo) if name is None else name
34 1
        self.rev = rev
35 1
        self.link = link
36 1
        self.scripts = scripts or []
37
        self.sparse_paths = sparse_paths or []
38 1
39 1
        for key in ['name', 'repo', 'rev']:
40
            if not self[key]:
41 1
                msg = "'{}' required for {}".format(key, repr(self))
42 1
                raise exceptions.InvalidConfig(msg)
43 1
44 1
        # uncomment next line to print source configuration
45 1
        # print(str(self))
46
47 1
    def _on_post_load(self):
48 1
        self.type = self.type or 'git'
49
50 1
    def __repr__(self):
51 1
        return "<source {}>".format(self)
52
53 1
    def __str__(self):
54 1
        pattern = "['{t}'] '{r}' @ '{v}' in '{d}'"
55
        if self.link:
56 1
            pattern += " <- '{s}'"
57
        return pattern.format(t=self.type, r=self.repo, v=self.rev, d=self.name, s=self.link)
58 1
59
    def __eq__(self, other):
60
        return self.name == other.name
61 1
62 1
    def __ne__(self, other):
63
        return self.name != other.name
64
65 1
    def __lt__(self, other):
66 1
        return self.name < other.name
67 1
68
    def update_files(self, force=False, fetch=False, clean=True):
69
        """Ensure the source matches the specified revision."""
70 1
        log.info("Updating source files...")
71 1
72 1
        # Clone the repository if needed
73
        if not os.path.exists(self.name):
74
            git.clone(self.type, self.repo, self.name, sparse_paths=self.sparse_paths, rev=self.rev)
75
76
        # Enter the working tree
77 1
        shell.cd(self.name)
78
        if not git.valid():
79
            raise self._invalid_repository
80 1
81
        # Check for uncommitted changes
82
        if not force:
83 1
            log.debug("Confirming there are no uncommitted changes...")
84
            if git.changes(self.type, include_untracked=clean):
85 1
                msg = "Uncommitted changes in {}".format(os.getcwd())
86
                raise exceptions.UncommittedChanges(msg)
87 1
88 1
        # Fetch the desired revision
89
        if fetch or git.is_fetch_required(self.type, self.rev):
90 1
            git.fetch(self.type, self.repo, self.name, rev=self.rev)
91
92 1
        # Update the working tree to the desired revision
93
        git.update(self.type, self.repo, self.name, fetch=fetch, clean=clean, rev=self.rev)
94
95
    def create_link(self, root, force=False):
96 1
        """Create a link from the target name to the current directory."""
97 1
        if not self.link:
98
            return
99 1
100 1
        log.info("Creating a symbolic link...")
101 1
102 1
        if os.name == 'nt':
103 1
            warnings.warn("Symbolic links are not supported on Windows")
104
            return
105 1
106 1
        target = os.path.join(root, self.link)
107
        source = os.path.relpath(os.getcwd(), os.path.dirname(target))
108 1
109
        if os.path.islink(target):
110 1
            os.remove(target)
111 1
        elif os.path.exists(target):
112
            if force:
113
                shell.rm(target)
114 1
            else:
115 1
                msg = "Preexisting link location at {}".format(target)
116
                raise exceptions.UncommittedChanges(msg)
117
118
        shell.ln(source, target)
119 1
120 1
    def run_scripts(self, force=False):
121 1
        log.info("Running install scripts...")
122 1
123
        # Enter the working tree
124
        shell.cd(self.name)
125 1
        if not git.valid():
126 1
            raise self._invalid_repository
127 1
128 1
        # Check for scripts
129 1
        if not self.scripts:
130 1
            common.show("(no scripts to run)", color='shell_info')
131 1
            common.newline()
132 1
            return
133
134 1
        # Run all scripts
135 1
        for script in self.scripts:
136
            try:
137
                lines = shell.call(script, _shell=True)
138 1
            except exceptions.ShellError as exc:
139
                common.show(*exc.output, color='shell_error')
140 1
                cmd = exc.program
141
                if force:
142 1
                    log.debug("Ignored error from call to '%s'", cmd)
143
                else:
144 1
                    msg = "Command '{}' failed in {}".format(cmd, os.getcwd())
145 1
                    raise exceptions.ScriptFailure(msg)
146 1
            else:
147
                common.show(*lines, color='shell_output')
148 1
        common.newline()
149 1
150 1
    def identify(self, allow_dirty=True, allow_missing=True):
151 1
        """Get the path and current repository URL and hash."""
152 1
        if os.path.isdir(self.name):
153 1
154
            shell.cd(self.name)
155
            if not git.valid():
156
                raise self._invalid_repository
157
158
            path = os.getcwd()
159 1
            url = git.get_url(self.type)
160 1
            if git.changes(self.type, display_status=not allow_dirty, _show=True):
161 1
                if not allow_dirty:
162 1
                    msg = "Uncommitted changes in {}".format(os.getcwd())
163
                    raise exceptions.UncommittedChanges(msg)
164 1
165
                common.show(self.DIRTY, color='git_dirty', log=False)
166 1
                common.newline()
167
                return path, url, self.DIRTY
168
169
            rev = git.get_hash(self.type, _show=True)
170 1
            common.show(rev, color='git_rev', log=False)
171
            common.newline()
172 1
            return path, url, rev
173
174 1
        if allow_missing:
175 1
            return os.getcwd(), '<missing>', self.UNKNOWN
176 1
177
        raise self._invalid_repository
178 1
179
    def lock(self, rev=None):
180 1
        """Return a locked version of the current source."""
181
        if rev is None:
182 1
            _, _, rev = self.identify(allow_dirty=False, allow_missing=False)
183 1
184 1
        source = self.__class__(self.type, self.repo,
185
                                self.name, rev,
186 1
                                self.link, self.scripts,
187
                                self.sparse_paths)
188 1
        return source
189 1
190 1
    @property
191
    def _invalid_repository(self):
192
        path = os.path.join(os.getcwd(), self.name)
193
        msg = "Not a valid repository: {}".format(path)
194
        return exceptions.InvalidRepository(msg)
195
196
    @staticmethod
197
    def _infer_name(repo):
198
        filename = repo.split('/')[-1]
199
        name = filename.split('.')[0]
200
        return name
201