Completed
Pull Request — develop (#168)
by
unknown
03:06
created

Source._on_post_load()   A

Complexity

Conditions 1

Size

Total Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 1
CRAP Score 1

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 1
c 1
b 0
f 0
dl 0
loc 2
ccs 1
cts 1
cp 1
crap 1
rs 10
1 1
import os
2 1
import logging
3 1
import warnings
4
from enum import Enum
0 ignored issues
show
Unused Code introduced by
Unused Enum imported from enum
Loading history...
5 1
import yorm
6 1
from yorm.types import String, NullableString, List, AttributeDictionary
7
8 1
from .. import common, exceptions, shell, git
9
10
11 1
log = logging.getLogger(__name__)
12
13
14 1
15 1
@yorm.attr(name=String)
16 1
@yorm.attr(type=String)
17 1
@yorm.attr(repo=String)
18 1
@yorm.attr(sparse_paths=List.of_type(String))
19 1
@yorm.attr(rev=String)
20
@yorm.attr(link=NullableString)
21
@yorm.attr(scripts=List.of_type(String))
22 1
class Source(AttributeDictionary):
23 1
    """A dictionary of `git` and `ln` arguments."""
24
25 1
    DIRTY = '<dirty>'
26 1
    UNKNOWN = '<unknown>'
27 1
28 1
    def __init__(self, type, repo, name=None, rev='master', link=None, scripts=None, sparse_paths=None):
0 ignored issues
show
Coding Style introduced by
This line is too long as per the coding-style (104/100).

This check looks for lines that are too long. You can specify the maximum line length.

Loading history...
Bug Best Practice introduced by
This seems to re-define the built-in type.

It is generally discouraged to redefine built-ins as this makes code very hard to read.

Loading history...
29 1
        super().__init__()
30 1
        self.type = type or 'git'
31 1
        self.repo = repo
32
        self.name = self._infer_name(repo) if name is None else name
33 1
        self.rev = rev
34 1
        self.link = link
35 1
        self.scripts = scripts or []
36 1
        self.sparse_paths = sparse_paths or []
37
38 1
        for key in ['name', 'repo', 'rev']:
39 1
            if not self[key]:
40
                msg = "'{}' required for {}".format(key, repr(self))
41 1
                raise exceptions.InvalidConfig(msg)
42 1
    
0 ignored issues
show
Coding Style introduced by
Trailing whitespace
Loading history...
43 1
        #uncomment next line to print source configuration
44 1
        #print(str(self))
45 1
46
    def _on_post_load(self):
47 1
        self.type = self.type or 'git'
48 1
49
    def __repr__(self):
50 1
        return "<source {}>".format(self)
51 1
52
    def __str__(self):
53 1
        pattern = "['{t}'] '{r}' @ '{v}' in '{d}'"
54 1
        if self.link:
55
            pattern += " <- '{s}'"
56 1
        return pattern.format(t=self.type, r=self.repo, v=self.rev, d=self.name, s=self.link)
57
58 1
    def __eq__(self, other):
59
        return self.name == other.name
60
61 1
    def __ne__(self, other):
62 1
        return self.name != other.name
63
64
    def __lt__(self, other):
65 1
        return self.name < other.name
66 1
67 1
    def update_files(self, force=False, fetch=False, clean=True):
68
        """Ensure the source matches the specified revision."""
69
        log.info("Updating source files...")
70 1
71 1
        # Clone the repository if needed
72 1
        if not os.path.exists(self.name):
73
            git.clone(self.type, self.repo, self.name, sparse_paths=self.sparse_paths, rev=self.rev)
74
75
        # Enter the working tree
76
        shell.cd(self.name)
77 1
        if not git.valid():
78
            raise self._invalid_repository
79
80 1
        # Check for uncommitted changes
81
        if not force:
82
            log.debug("Confirming there are no uncommitted changes...")
83 1
            if git.changes(self.type, include_untracked=clean):
84
                msg = "Uncommitted changes in {}".format(os.getcwd())
85 1
                raise exceptions.UncommittedChanges(msg)
86
87 1
        # Fetch the desired revision
88 1
        if fetch or git.is_fetch_required(self.type, self.rev):
89
            git.fetch(self.type, self.repo, self.name, rev=self.rev)
90 1
91
        # Update the working tree to the desired revision
92 1
        git.update(self.type, self.repo, self.name, fetch=fetch, clean=clean, rev=self.rev)
93
94
    def create_link(self, root, force=False):
95
        """Create a link from the target name to the current directory."""
96 1
        if not self.link:
97 1
            return
98
99 1
        log.info("Creating a symbolic link...")
100 1
101 1
        if os.name == 'nt':
102 1
            warnings.warn("Symbolic links are not supported on Windows")
103 1
            return
104
105 1
        target = os.path.join(root, self.link)
106 1
        source = os.path.relpath(os.getcwd(), os.path.dirname(target))
107
108 1
        if os.path.islink(target):
109
            os.remove(target)
110 1
        elif os.path.exists(target):
111 1
            if force:
112
                shell.rm(target)
113
            else:
114 1
                msg = "Preexisting link location at {}".format(target)
115 1
                raise exceptions.UncommittedChanges(msg)
116
117
        shell.ln(source, target)
118
119 1
    def run_scripts(self, force=False):
120 1
        log.info("Running install scripts...")
121 1
122 1
        # Enter the working tree
123
        shell.cd(self.name)
124
        if not git.valid():
125 1
            raise self._invalid_repository
126 1
127 1
        # Check for scripts
128 1
        if not self.scripts:
129 1
            common.show("(no scripts to run)", color='shell_info')
130 1
            common.newline()
131 1
            return
132 1
133
        # Run all scripts
134 1
        for script in self.scripts:
135 1
            try:
136
                lines = shell.call(script, _shell=True)
137
            except exceptions.ShellError as exc:
138 1
                common.show(*exc.output, color='shell_error')
139
                cmd = exc.program
140 1
                if force:
141
                    log.debug("Ignored error from call to '%s'", cmd)
142 1
                else:
143
                    msg = "Command '{}' failed in {}".format(cmd, os.getcwd())
144 1
                    raise exceptions.ScriptFailure(msg)
145 1
            else:
146 1
                common.show(*lines, color='shell_output')
147
        common.newline()
148 1
149 1
    def identify(self, allow_dirty=True, allow_missing=True):
150 1
        """Get the path and current repository URL and hash."""
151 1
        if os.path.isdir(self.name):
152 1
153 1
            shell.cd(self.name)
154
            if not git.valid():
155
                raise self._invalid_repository
156
157
            path = os.getcwd()
158
            url = git.get_url(self.type)
159 1
            if git.changes(self.type, display_status=not allow_dirty, _show=True):
160 1
                if not allow_dirty:
161 1
                    msg = "Uncommitted changes in {}".format(os.getcwd())
162 1
                    raise exceptions.UncommittedChanges(msg)
163
164 1
                common.show(self.DIRTY, color='git_dirty', log=False)
165
                common.newline()
166 1
                return path, url, self.DIRTY
167
            else:
168
                rev = git.get_hash(self.type, _show=True)
169
                common.show(rev, color='git_rev', log=False)
170 1
                common.newline()
171
                return path, url, rev
172 1
173
        elif allow_missing:
174 1
175 1
            return os.getcwd(), '<missing>', self.UNKNOWN
176 1
177
        else:
178 1
179
            raise self._invalid_repository
180 1
181
    def lock(self, rev=None):
182 1
        """Return a locked version of the current source."""
183 1
        if rev is None:
184 1
            _, _, rev = self.identify(allow_dirty=False, allow_missing=False)
185
        source = self.__class__(self.type, self.repo, 
0 ignored issues
show
Coding Style introduced by
Trailing whitespace
Loading history...
186 1
                                self.name, rev, 
0 ignored issues
show
Coding Style introduced by
Trailing whitespace
Loading history...
187
                                self.link, self.scripts,
188 1
                                self.sparse_paths)
189 1
        return source
190 1
191
    @property
192
    def _invalid_repository(self):
193
        path = os.path.join(os.getcwd(), self.name)
194
        msg = "Not a valid repository: {}".format(path)
195
        return exceptions.InvalidRepository(msg)
196
197
    @staticmethod
198
    def _infer_name(repo):
199
        filename = repo.split('/')[-1]
200
        name = filename.split('.')[0]
201
        return name
202