Passed
Push — develop ( 4d6495...2dddb7 )
by Jace
03:19
created

gitman.models.source   A

Complexity

Total Complexity 40

Size/Duplication

Total Lines 193
Duplicated Lines 0 %

Test Coverage

Coverage 92.86%

Importance

Changes 0
Metric Value
eloc 138
dl 0
loc 193
ccs 117
cts 126
cp 0.9286
rs 9.2
c 0
b 0
f 0
wmc 40

13 Methods

Rating   Name   Duplication   Size   Complexity  
A Source.lock() 0 7 2
A Source._invalid_repository() 0 5 1
A Source._infer_name() 0 5 1
A Source.__str__() 0 6 2
A Source.__ne__() 0 2 1
B Source.identify() 0 28 6
A Source.__eq__() 0 2 1
A Source.__repr__() 0 2 1
B Source.update_files() 0 29 7
B Source.create_link() 0 24 6
A Source.__init__() 0 14 4
A Source.__lt__() 0 2 1
B Source.run_scripts() 0 29 7

How to fix   Complexity   

Complexity

Complex classes like gitman.models.source often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1 1
import logging
2 1
import os
3 1
import warnings
4
5 1
import yorm
6 1
from yorm.types import AttributeDictionary, List, NullableString, String
7
8 1
from .. import common, exceptions, git, shell
9
10
11 1
log = logging.getLogger(__name__)
12
13
14 1
@yorm.attr(name=String)
15 1
@yorm.attr(repo=String)
16 1
@yorm.attr(sparse_paths=List.of_type(String))
17 1
@yorm.attr(rev=String)
18 1
@yorm.attr(link=NullableString)
19 1
@yorm.attr(scripts=List.of_type(String))
20
class Source(AttributeDictionary):
21
    """A dictionary of `git` and `ln` arguments."""
22 1
23 1
    DIRTY = '<dirty>'
24
    UNKNOWN = '<unknown>'
25 1
26 1
    def __init__(self, repo, name=None, rev='master',
27 1
                 link=None, scripts=None, sparse_paths=None):
28 1
        super().__init__()
29 1
        self.repo = repo
30 1
        self.name = self._infer_name(repo) if name is None else name
31 1
        self.rev = rev
32
        self.link = link
33 1
        self.scripts = scripts or []
34 1
        self.sparse_paths = sparse_paths or []
35 1
36 1
        for key in ['name', 'repo', 'rev']:
37
            if not self[key]:
38 1
                msg = "'{}' required for {}".format(key, repr(self))
39 1
                raise exceptions.InvalidConfig(msg)
40
41 1
    def __repr__(self):
42 1
        return "<source {}>".format(self)
43 1
44 1
    def __str__(self):
45 1
        pattern = "'{r}' @ '{v}' in '{d}'"
46
        if self.link:
47 1
            pattern += " <- '{s}'"
48 1
        return pattern.format(r=self.repo, v=self.rev,
49
                              d=self.name, s=self.link)
50 1
51 1
    def __eq__(self, other):
52
        return self.name == other.name
53 1
54 1
    def __ne__(self, other):
55
        return self.name != other.name
56 1
57
    def __lt__(self, other):
58 1
        return self.name < other.name
59
60
    def update_files(self, force=False, fetch=False, clean=True):
61 1
        """Ensure the source matches the specified revision."""
62 1
        log.info("Updating source files...")
63
64
        # Clone the repository if needed
65 1
        if not os.path.exists(self.name):
66 1
            git.clone(self.repo, self.name,
67 1
                      sparse_paths=self.sparse_paths, rev=self.rev)
68
69
        # Enter the working tree
70 1
        shell.cd(self.name)
71 1
        if not git.valid():
72 1
            raise self._invalid_repository
73
74
        # Check for uncommitted changes
75
        if not force:
76
            log.debug("Confirming there are no uncommitted changes...")
77 1
            if git.changes(include_untracked=clean):
78
                msg = "Uncommitted changes in {}".format(os.getcwd())
79
                raise exceptions.UncommittedChanges(msg)
80 1
81
        # Fetch the desired revision
82
        if fetch or self.rev not in (git.get_branch(),
83 1
                                     git.get_hash(),
84
                                     git.get_tag()):
85 1
            git.fetch(self.repo, self.rev)
86
87 1
        # Update the working tree to the desired revision
88 1
        git.update(self.rev, fetch=fetch, clean=clean)
89
90 1
    def create_link(self, root, force=False):
91
        """Create a link from the target name to the current directory."""
92 1
        if not self.link:
93
            return
94
95
        log.info("Creating a symbolic link...")
96 1
97 1
        if os.name == 'nt':
98
            warnings.warn("Symbolic links are not supported on Windows")
99 1
            return
100 1
101 1
        target = os.path.join(root, self.link)
102 1
        source = os.path.relpath(os.getcwd(), os.path.dirname(target))
103 1
104
        if os.path.islink(target):
105 1
            os.remove(target)
106 1
        elif os.path.exists(target):
107
            if force:
108 1
                shell.rm(target)
109
            else:
110 1
                msg = "Preexisting link location at {}".format(target)
111 1
                raise exceptions.UncommittedChanges(msg)
112
113
        shell.ln(source, target)
114 1
115 1
    def run_scripts(self, force=False):
116
        log.info("Running install scripts...")
117
118
        # Enter the working tree
119 1
        shell.cd(self.name)
120 1
        if not git.valid():
121 1
            raise self._invalid_repository
122 1
123
        # Check for scripts
124
        if not self.scripts:
125 1
            common.show("(no scripts to run)", color='shell_info')
126 1
            common.newline()
127 1
            return
128 1
129 1
        # Run all scripts
130 1
        for script in self.scripts:
131 1
            try:
132 1
                lines = shell.call(script, _shell=True)
133
            except exceptions.ShellError as exc:
134 1
                common.show(*exc.output, color='shell_error')
135 1
                cmd = exc.program
136
                if force:
137
                    log.debug("Ignored error from call to '%s'", cmd)
138 1
                else:
139
                    msg = "Command '{}' failed in {}".format(cmd, os.getcwd())
140 1
                    raise exceptions.ScriptFailure(msg)
141
            else:
142 1
                common.show(*lines, color='shell_output')
143
        common.newline()
144 1
145 1
    def identify(self, allow_dirty=True, allow_missing=True):
146 1
        """Get the path and current repository URL and hash."""
147
        if os.path.isdir(self.name):
148 1
149 1
            shell.cd(self.name)
150 1
            if not git.valid():
151 1
                raise self._invalid_repository
152 1
153 1
            path = os.getcwd()
154
            url = git.get_url()
155
            if git.changes(display_status=not allow_dirty, _show=True):
156
                if not allow_dirty:
157
                    msg = "Uncommitted changes in {}".format(os.getcwd())
158
                    raise exceptions.UncommittedChanges(msg)
159 1
160 1
                common.show(self.DIRTY, color='git_dirty', log=False)
161 1
                common.newline()
162 1
                return path, url, self.DIRTY
163
164 1
            rev = git.get_hash(_show=True)
165
            common.show(rev, color='git_rev', log=False)
166 1
            common.newline()
167
            return path, url, rev
168
169
        if allow_missing:
170 1
            return os.getcwd(), '<missing>', self.UNKNOWN
171
172 1
        raise self._invalid_repository
173
174 1
    def lock(self, rev=None):
175 1
        """Return a locked version of the current source."""
176 1
        if rev is None:
177
            _, _, rev = self.identify(allow_dirty=False, allow_missing=False)
178 1
        source = self.__class__(self.repo, self.name, rev,
179
                                self.link, self.scripts)
180 1
        return source
181
182 1
    @property
183 1
    def _invalid_repository(self):
184 1
        path = os.path.join(os.getcwd(), self.name)
185
        msg = "Not a valid repository: {}".format(path)
186 1
        return exceptions.InvalidRepository(msg)
187
188 1
    @staticmethod
189 1
    def _infer_name(repo):
190 1
        filename = repo.split('/')[-1]
191
        name = filename.split('.')[0]
192
        return name
193