Completed
Push — develop ( f5b93c...24eb3d )
by Jace
11s
created

gitman.models.config.load_config()   B

Complexity

Conditions 8

Size

Total Lines 32
Code Lines 22

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 3
CRAP Score 8

Importance

Changes 0
Metric Value
cc 8
eloc 22
nop 3
dl 0
loc 32
ccs 3
cts 3
cp 1
crap 8
rs 7.3333
c 0
b 0
f 0
1 1
import logging
2 1
import os
3
4 1
import yorm
5 1
from yorm.types import SortedList, String
6
7 1
from . import Source
8 1
from .. import common, shell
9 1
10
11 1
log = logging.getLogger(__name__)
12
13
14 1
@yorm.attr(location=String)
15 1
@yorm.attr(sources=SortedList.of_type(Source))
16 1
@yorm.attr(sources_locked=SortedList.of_type(Source))
17 1
@yorm.sync("{self.root}/{self.filename}", auto_save=False)
18 1
class Config(yorm.ModelMixin):
19
    """Specifies all dependencies for a project."""
20
21 1
    LOG = "gitman.log"
22
23 1
    def __init__(self, root=None,
24
                 filename="gitman.yml", location="gitman_sources"):
25 1
        super().__init__()
26 1
        self.root = root or os.getcwd()
27 1
        self.filename = filename
28 1
        self.location = location
29 1
        self.sources = []
30 1
        self.sources_locked = []
31
32 1
    def _on_post_load(self):
33
        for source in self.sources:
34
            source._on_post_load()  # pylint: disable=protected-access
35 1
        for source in self.sources_locked:
36 1
            source._on_post_load()  # pylint: disable=protected-access
37
38 1
    @property
39
    def config_path(self):
40
        """Get the full path to the config file."""
41 1
        return os.path.normpath(os.path.join(self.root, self.filename))
42
    path = config_path
43 1
44
    @property
45
    def log_path(self):
46 1
        """Get the full path to the log file."""
47
        return os.path.normpath(os.path.join(self.location_path, self.LOG))
48 1
49
    @property
50 1
    def location_path(self):
51 1
        """Get the full path to the dependency storage location."""
52 1
        return os.path.normpath(os.path.join(self.root, self.location))
53 1
54 1
    def get_path(self, name=None):
55 1
        """Get the full path to a dependency or internal file."""
56 1
        base = self.location_path
57
        if name == '__config__':
58 1
            return self.path
59
        if name == '__log__':
60 1
            return self.log_path
61
        if name:
62
            return os.path.normpath(os.path.join(base, name))
63
        return base
64 1
65 1
    def install_dependencies(self, *names, depth=None,
66 1
                             update=True, recurse=False,
67
                             force=False, fetch=False, clean=True):
68 1
        """Download or update the specified dependencies."""
69 1
        if depth == 0:
70
            log.info("Skipped directory: %s", self.location_path)
71 1
            return 0
72 1
73 1
        sources = self._get_sources(use_locked=False if update else None)
74 1
        sources_filter = list(names) if names else [s.name for s in sources]
75 1
76
        if not os.path.isdir(self.location_path):
77 1
            shell.mkdir(self.location_path)
78 1
        shell.cd(self.location_path)
79 1
        common.newline()
80 1
        common.indent()
81
82 1
        count = 0
83 1
        for source in sources:
84
            if source.name in sources_filter:
85 1
                sources_filter.remove(source.name)
86 1
            else:
87 1
                log.info("Skipped dependency: %s", source.name)
88 1
                continue
89
90 1
            source.update_files(force=force, fetch=fetch, clean=clean)
91 1
            source.create_link(self.root, force=force)
92 1
            common.newline()
93 1
            count += 1
94
95
            config = load_config(search=False)
96
            if config:
97
                common.indent()
98
                count += config.install_dependencies(
99
                    depth=None if depth is None else max(0, depth - 1),
100
                    update=update and recurse,
101 1
                    recurse=recurse,
102
                    force=force,
103 1
                    fetch=fetch,
104
                    clean=clean,
105 1
                )
106 1
                common.dedent()
107 1
108 1
            shell.cd(self.location_path, _show=False)
109
110 1
        common.dedent()
111
        if sources_filter:
112 1
            log.error("No such dependency: %s", ' '.join(sources_filter))
113
            return 0
114 1
115 1
        return count
116 1
117
    def run_scripts(self, *names, depth=None, force=False):
118 1
        """Run scripts for the specified dependencies."""
119 1
        if depth == 0:
120
            log.info("Skipped directory: %s", self.location_path)
121 1
            return 0
122 1
123 1
        sources = self._get_sources()
124
        sources_filter = list(names) if names else [s.name for s in sources]
125 1
126 1
        shell.cd(self.location_path)
127 1
        common.newline()
128 1
        common.indent()
129 1
130
        count = 0
131 1
        for source in sources:
132 1
            if source.name in sources_filter:
133 1
                source.run_scripts(force=force)
134 1
                count += 1
135
136
                config = load_config(search=False)
137
                if config:
138 1
                    common.indent()
139
                    count += config.run_scripts(
140 1
                        depth=None if depth is None else max(0, depth - 1),
141
                        force=force,
142 1
                    )
143
                    common.dedent()
144 1
145
                shell.cd(self.location_path, _show=False)
146 1
147
        common.dedent()
148 1
149 1
        return count
150
151 1
    def lock_dependencies(self, *names, obey_existing=True):
152 1
        """Lock down the immediate dependency versions."""
153 1
        sources = self._get_sources(use_locked=obey_existing).copy()
154
        sources_filter = list(names) if names else [s.name for s in sources]
155 1
156 1
        shell.cd(self.location_path)
157 1
        common.newline()
158 1
        common.indent()
159 1
160
        count = 0
161 1
        for source in sources:
162 1
            if source.name not in sources_filter:
163 1
                log.info("Skipped dependency: %s", source.name)
164 1
                continue
165
166 1
            try:
167 1
                index = self.sources_locked.index(source)
168
            except ValueError:
169 1
                self.sources_locked.append(source.lock())
170
            else:
171 1
                self.sources_locked[index] = source.lock()
172 1
            count += 1
173
174 1
            shell.cd(self.location_path, _show=False)
175
176 1
        if count:
177
            self.save()
178 1
179 1
        return count
180 1
181
    def uninstall_dependencies(self):
182 1
        """Delete the dependency storage location."""
183
        shell.cd(self.root)
184 1
        shell.rm(self.location_path)
185 1
        common.newline()
186
187 1
    def clean_dependencies(self):
188 1
        """Delete the dependency storage location."""
189 1
        for path in self.get_top_level_dependencies():
190
191 1
            if path == self.location_path:
192
                log.info("Skipped dependency: %s", path)
193 1
            else:
194 1
                shell.rm(path)
195 1
196
            common.newline()
197 1
198
        shell.rm(self.log_path)
199 1
200 1
    def get_top_level_dependencies(self):
201 1
        """Yield the path, repository, and hash of top-level dependencies."""
202 1
        if not os.path.exists(self.location_path):
203
            return
204
205
        shell.cd(self.location_path)
206 1
        common.newline()
207
        common.indent()
208 1
209
        for source in self.sources:
210 1
211
            yield os.path.join(self.location_path, source.name)
212 1
213
            shell.cd(self.location_path, _show=False)
214 1
215 1
        common.dedent()
216
217 1
    def get_dependencies(self, depth=None, allow_dirty=True):
218
        """Yield the path, repository, and hash of each dependency."""
219 1
        if not os.path.exists(self.location_path):
220 1
            return
221 1
222
        shell.cd(self.location_path)
223 1
        common.newline()
224 1
        common.indent()
225
226 1
        for source in self.sources:
227 1
228 1
            if depth == 0:
229
                log.info("Skipped dependency: %s", source.name)
230 1
                continue
231 1
232 1
            yield source.identify(allow_dirty=allow_dirty)
233
234 1
            config = load_config(search=False)
235 1
            if config:
236
                common.indent()
237 1
                yield from config.get_dependencies(
238 1
                    depth=None if depth is None else max(0, depth - 1),
239 1
                    allow_dirty=allow_dirty,
240 1
                )
241 1
                common.dedent()
242
243 1
            shell.cd(self.location_path, _show=False)
244
245
        common.dedent()
246 1
247
    def log(self, message="", *args):
248 1
        """Append a message to the log file."""
249 1
        with open(self.log_path, 'a') as outfile:
250
            outfile.write(message.format(*args) + '\n')
251 1
252
    def _get_sources(self, *, use_locked=None):
253 1
        """Merge source lists using the requested section as the base."""
254 1
        if use_locked is True:
255
            if self.sources_locked:
256 1
                return self.sources_locked
257 1
            log.info("No locked sources, defaulting to none...")
258 1
            return []
259
260 1
        sources = []
261 1
        if use_locked is False:
262 1
            sources = self.sources
263 1
        else:
264 1
            if self.sources_locked:
265
                log.info("Defaulting to locked sources...")
266 1
                sources = self.sources_locked
267 1
            else:
268
                log.info("No locked sources, using latest...")
269 1
                sources = self.sources
270
271 1
        extras = []
272 1
        for source in self.sources + self.sources_locked:
273
            if source not in sources:
274 1
                log.info("Source %r missing from selected section",
275
                         source.name)
276 1
                extras.append(source)
277
278
        return sources + extras
279 1
280 1
281 1
def load_config(start=None, *, search=True):
282 1
    """Load the config for the current project."""
283 1
    if start:
284
        start = os.path.abspath(start)
285
    else:
286
        start = os.getcwd()
287
288
    if search:
289
        log.debug("Searching for config...")
290
291
    path = start
292
    while path != os.path.dirname(path):
293
        log.debug("Looking for config in: %s", path)
294
295
        for filename in os.listdir(path):
296
            if _valid_filename(filename):
297
                config = Config(path, filename)
298
                config._on_post_load()  # pylint: disable=protected-access
299
                log.debug("Found config: %s", config.path)
300
                return config
301
302
        if search:
303
            path = os.path.dirname(path)
304
        else:
305
            break
306
307
    if search:
308
        log.debug("No config found starting from: %s", start)
309
    else:
310
        log.debug("No config found in: %s", start)
311
312
    return None
313
314
315
def _valid_filename(filename):
316
    name, ext = os.path.splitext(filename.lower())
317
    if name.startswith('.'):
318
        name = name[1:]
319
    return name in ['gitman', 'gdm'] and ext in ['.yml', '.yaml']
320