Completed
Push — develop ( d0d35e...48c1c9 )
by Jace
15s queued 10s
created

gitman.models.source.Source.update_files()   F

Complexity

Conditions 15

Size

Total Lines 74
Code Lines 50

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 43
CRAP Score 15.1386

Importance

Changes 0
Metric Value
cc 15
eloc 50
nop 6
dl 0
loc 74
ccs 43
cts 47
cp 0.9149
crap 15.1386
rs 2.9998
c 0
b 0
f 0

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like gitman.models.source.Source.update_files() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1 1
import logging
2 1
import os
3 1
import warnings
4
5 1
import yorm
0 ignored issues
show
introduced by
Unable to import 'yorm'
Loading history...
6 1
from yorm.types import AttributeDictionary, List, NullableString, String
0 ignored issues
show
introduced by
Unable to import 'yorm.types'
Loading history...
7
8 1
from .. import common, exceptions, git, shell
9
10
11 1
log = logging.getLogger(__name__)
12
13
14 1
@yorm.attr(name=String)
15 1
@yorm.attr(type=String)
16 1
@yorm.attr(repo=String)
17 1
@yorm.attr(sparse_paths=List.of_type(String))
18 1
@yorm.attr(rev=String)
19 1
@yorm.attr(link=NullableString)
20
@yorm.attr(scripts=List.of_type(String))
21
class Source(AttributeDictionary):
22 1
    """A dictionary of `git` and `ln` arguments."""
23 1
24
    DIRTY = '<dirty>'
25 1
    UNKNOWN = '<unknown>'
26 1
27 1
    def __init__(
28 1
        self,
29 1
        type,
30 1
        repo,
31 1
        name=None,
32
        rev='master',
33 1
        link=None,
34 1
        scripts=None,
35 1
        sparse_paths=None,
36 1
    ):
37
38 1
        super().__init__()
39 1
        self.type = type or 'git'
40
        self.repo = repo
41 1
        self.name = self._infer_name(repo) if name is None else name
42 1
        self.rev = rev
43 1
        self.link = link
44 1
        self.scripts = scripts or []
45 1
        self.sparse_paths = sparse_paths or []
46
47 1
        for key in ['name', 'repo', 'rev']:
48 1
            if not self[key]:
49
                msg = "'{}' required for {}".format(key, repr(self))
50 1
                raise exceptions.InvalidConfig(msg)
51 1
52
    def _on_post_load(self):
53 1
        self.type = self.type or 'git'
54 1
55
    def __repr__(self):
56 1
        return "<source {}>".format(self)
57
58 1
    def __str__(self):
59
        pattern = "['{t}'] '{r}' @ '{v}' in '{d}'"
60
        if self.link:
61 1
            pattern += " <- '{s}'"
62 1
        return pattern.format(
63
            t=self.type, r=self.repo, v=self.rev, d=self.name, s=self.link
64
        )
65 1
66 1
    def __eq__(self, other):
67 1
        return self.name == other.name
68
69
    def __ne__(self, other):
70 1
        return self.name != other.name
71 1
72 1
    def __lt__(self, other):
73
        return self.name < other.name
74
75
    def update_files(
76
        self,
77 1
        force=False,
78
        force_interactive=False,
79
        fetch=False,
80 1
        clean=True,
81
        skip_changes=False,
82
    ):
83 1
        """Ensure the source matches the specified revision."""
84
        log.info("Updating source files...")
85 1
86
        # Clone the repository if needed
87 1
        if not os.path.exists(self.name):
88 1
            git.clone(
89
                self.type,
90 1
                self.repo,
91
                self.name,
92 1
                sparse_paths=self.sparse_paths,
93
                rev=self.rev,
94
            )
95
96 1
        # Enter the working tree
97 1
        shell.cd(self.name)
98
        if not git.valid():
99 1
            raise self._invalid_repository
100 1
101 1
        # Check for uncommitted changes
102 1
        if not force:
103 1
            log.debug("Confirming there are no uncommitted changes...")
104
            if skip_changes:
105 1
                if git.changes(
106 1
                    self.type, include_untracked=clean, display_status=False
107
                ):
108 1
                    common.show(
109
                        f'Skipped update due to uncommitted changes in {os.getcwd()}',
110 1
                        color='git_changes',
111 1
                    )
112
                    return
113
            elif force_interactive:
114 1
                if git.changes(
115 1
                    self.type, include_untracked=clean, display_status=False
116
                ):
117
                    common.show(
118
                        f'Uncommitted changes found in {os.getcwd()}',
119 1
                        color='git_changes',
120 1
                    )
121 1
122 1
                    while True:
123
                        yn_input = str(
124
                            input("Do you want to overwrite? (Y/N)[Y]: ")
125 1
                        ).rstrip('\r\n')
126 1
127 1
                        if yn_input.lower() == "y" or not yn_input:
128 1
                            break
129 1
130 1
                        if yn_input.lower() == "n":
131 1
                            common.show(
132 1
                                f'Skipped update in {os.getcwd()}', color='git_changes'
133
                            )
134 1
                            return
135 1
136
            else:
137
                if git.changes(self.type, include_untracked=clean):
138 1
                    raise exceptions.UncommittedChanges(
139
                        f'Uncommitted changes in {os.getcwd()}'
140 1
                    )
141
142 1
        # Fetch the desired revision
143
        if fetch or git.is_fetch_required(self.type, self.rev):
144 1
            git.fetch(self.type, self.repo, self.name, rev=self.rev)
145 1
146 1
        # Update the working tree to the desired revision
147
        git.update(
148 1
            self.type, self.repo, self.name, fetch=fetch, clean=clean, rev=self.rev
149 1
        )
150 1
151 1
    def create_link(self, root, force=False):
152 1
        """Create a link from the target name to the current directory."""
153 1
        if not self.link:
154
            return
155
156
        log.info("Creating a symbolic link...")
157
158
        if os.name == 'nt':
159 1
            warnings.warn("Symbolic links are not supported on Windows")
160 1
            return
161 1
162 1
        target = os.path.join(root, self.link)
163
        source = os.path.relpath(os.getcwd(), os.path.dirname(target))
164 1
165
        if os.path.islink(target):
166 1
            os.remove(target)
167
        elif os.path.exists(target):
168
            if force:
169
                shell.rm(target)
170 1
            else:
171
                msg = "Preexisting link location at {}".format(target)
172 1
                raise exceptions.UncommittedChanges(msg)
173
174 1
        shell.ln(source, target)
175 1
176 1
    def run_scripts(self, force=False):
177
        log.info("Running install scripts...")
178 1
179
        # Enter the working tree
180 1
        shell.cd(self.name)
181
        if not git.valid():
182 1
            raise self._invalid_repository
183 1
184 1
        # Check for scripts
185
        if not self.scripts:
186 1
            common.show("(no scripts to run)", color='shell_info')
187
            common.newline()
188 1
            return
189 1
190 1
        # Run all scripts
191
        for script in self.scripts:
192
            try:
193
                lines = shell.call(script, _shell=True)
194
            except exceptions.ShellError as exc:
195
                common.show(*exc.output, color='shell_error')
196
                cmd = exc.program
197
                if force:
198
                    log.debug("Ignored error from call to '%s'", cmd)
199
                else:
200
                    msg = "Command '{}' failed in {}".format(cmd, os.getcwd())
201
                    raise exceptions.ScriptFailure(msg)
202
            else:
203
                common.show(*lines, color='shell_output')
204
        common.newline()
205
206
    def identify(self, allow_dirty=True, allow_missing=True, skip_changes=False):
207
        """Get the path and current repository URL and hash."""
208
        if os.path.isdir(self.name):
209
210
            shell.cd(self.name)
211
            if not git.valid():
212
                raise self._invalid_repository
213
214
            path = os.getcwd()
215
            url = git.get_url(self.type)
216
            if git.changes(
217
                self.type,
218
                display_status=not allow_dirty and not skip_changes,
219
                _show=not skip_changes,
220
            ):
221
222
                if allow_dirty:
223
                    common.show(self.DIRTY, color='git_dirty', log=False)
224
                    common.newline()
225
                    return path, url, self.DIRTY
226
227
                if skip_changes:
228
                    msg = ("Skipped lock due to uncommitted changes " "in {}").format(
229
                        os.getcwd()
230
                    )
231
                    common.show(msg, color='git_changes')
232
                    common.newline()
233
                    return path, url, self.DIRTY
234
235
                msg = "Uncommitted changes in {}".format(os.getcwd())
236
                raise exceptions.UncommittedChanges(msg)
237
238
            rev = git.get_hash(self.type, _show=True)
239
            common.show(rev, color='git_rev', log=False)
240
            common.newline()
241
            return path, url, rev
242
243
        if allow_missing:
244
            return os.getcwd(), '<missing>', self.UNKNOWN
245
246
        raise self._invalid_repository
247
248
    def lock(self, rev=None, allow_dirty=False, skip_changes=False):
249
        """Create a locked source object.
250
251
        Return a locked version of the current source if not dirty
252
        otherwise None.
253
        """
254
255
        if rev is None:
256
            _, _, rev = self.identify(
257
                allow_dirty=allow_dirty, allow_missing=False, skip_changes=skip_changes
258
            )
259
260
        if rev == self.DIRTY:
261
            return None
262
263
        source = self.__class__(
264
            self.type,
265
            self.repo,
266
            self.name,
267
            rev,
268
            self.link,
269
            self.scripts,
270
            self.sparse_paths,
271
        )
272
        return source
273
274
    @property
275
    def _invalid_repository(self):
276
        path = os.path.join(os.getcwd(), self.name)
277
        msg = "Not a valid repository: {}".format(path)
278
        return exceptions.InvalidRepository(msg)
279
280
    @staticmethod
281
    def _infer_name(repo):
282
        filename = repo.split('/')[-1]
283
        name = filename.split('.')[0]
284
        return name
285