Completed
Push — develop ( bc7642...53919e )
by Jace
12s
created

gitman.models.config.Config._on_post_load()   B

Complexity

Conditions 6

Size

Total Lines 16
Code Lines 12

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 8
CRAP Score 6

Importance

Changes 0
Metric Value
cc 6
eloc 12
nop 1
dl 0
loc 16
ccs 8
cts 8
cp 1
crap 6
rs 8.6666
c 0
b 0
f 0
1 1
import logging
2 1
import os
3
from typing import List
4 1
5 1
import yorm
0 ignored issues
show
introduced by
Unable to import 'yorm'
Loading history...
6
from yorm.types import SortedList, String
0 ignored issues
show
introduced by
Unable to import 'yorm.types'
Loading history...
7 1
8 1
from .. import common, exceptions, shell
9 1
from .group import Group
10
from .source import Source
11 1
12
13
log = logging.getLogger(__name__)
14 1
15 1
16 1
@yorm.attr(location=String)
17 1
@yorm.attr(sources=SortedList.of_type(Source))
18 1
@yorm.attr(sources_locked=SortedList.of_type(Source))
19
@yorm.attr(groups=SortedList.of_type(Group))
20
@yorm.sync("{self.root}/{self.filename}", auto_save=False)
21 1
class Config(yorm.ModelMixin):
22
    """Specifies all dependencies for a project."""
23 1
24
    LOG = "gitman.log"
25 1
26 1
    def __init__(self, root=None, filename="gitman.yml", location="gitman_sources"):
27 1
        super().__init__()
28 1
        self.root = root or os.getcwd()
29 1
        self.filename = filename
30 1
        self.location = location
31
        self.sources: List[Source] = []
32 1
        self.sources_locked: List[Source] = []
33
        self.groups: List[Group] = []
34
35 1
    def _on_post_load(self):
36 1
        for source in self.sources:
37
            source._on_post_load()  # pylint: disable=protected-access
38 1
39
        for source in self.sources_locked:
40
            source._on_post_load()  # pylint: disable=protected-access
41 1
42
        # check for conflicts between source names and group names
43 1
        for source in self.sources:
44
            for group in self.groups:
45
                if source.name == group.name:
46 1
                    msg = (
47
                        "Name conflict detected between source name and "
48 1
                        "group name \"{}\""
49
                    ).format(source.name)
50 1
                    raise exceptions.InvalidConfig(msg)
51 1
52 1
    @property
53 1
    def config_path(self):
54 1
        """Get the full path to the config file."""
55 1
        return os.path.normpath(os.path.join(self.root, self.filename))
56 1
57
    path = config_path
58 1
59
    @property
60 1
    def log_path(self):
61
        """Get the full path to the log file."""
62
        return os.path.normpath(os.path.join(self.location_path, self.LOG))
63
64 1
    @property
65 1
    def location_path(self):
66 1
        """Get the full path to the dependency storage location."""
67
        return os.path.normpath(os.path.join(self.root, self.location))
68 1
69 1
    def get_path(self, name=None):
70
        """Get the full path to a dependency or internal file."""
71 1
        base = self.location_path
72 1
        if name == '__config__':
73 1
            return self.path
74 1
        if name == '__log__':
75 1
            return self.log_path
76
        if name:
77 1
            return os.path.normpath(os.path.join(base, name))
78 1
        return base
79 1
80 1
    def install_dependencies(
81
        self,
82 1
        *names,
83 1
        depth=None,
84
        update=True,
85 1
        recurse=False,
86 1
        force=False,
87 1
        fetch=False,
88 1
        clean=True,
89
        skip_changes=False,
90 1
    ):
91 1
        """Download or update the specified dependencies."""
92 1
        if depth == 0:
93 1
            log.info("Skipped directory: %s", self.location_path)
94
            return 0
95
96
        sources = self._get_sources(use_locked=False if update else None)
97
        sources_filter = self._get_sources_filter(*names, sources=sources)
98
99
        if not os.path.isdir(self.location_path):
100
            shell.mkdir(self.location_path)
101 1
        shell.cd(self.location_path)
102
        common.newline()
103 1
        common.indent()
104
105 1
        count = 0
106 1
        for source in sources:
107 1
            if source.name in sources_filter:
108 1
                sources_filter.remove(source.name)
109
            else:
110 1
                log.info("Skipped dependency: %s", source.name)
111
                continue
112 1
113
            source.update_files(
114 1
                force=force, fetch=fetch, clean=clean, skip_changes=skip_changes
115 1
            )
116 1
            source.create_link(self.root, force=force)
117
            common.newline()
118 1
            count += 1
119 1
120
            config = load_config(search=False)
121 1
            if config:
122 1
                common.indent()
123 1
                count += config.install_dependencies(
124
                    depth=None if depth is None else max(0, depth - 1),
125 1
                    update=update and recurse,
126 1
                    recurse=recurse,
127 1
                    force=force,
128 1
                    fetch=fetch,
129 1
                    clean=clean,
130
                    skip_changes=skip_changes,
131 1
                )
132 1
                common.dedent()
133 1
134 1
            shell.cd(self.location_path, _show=False)
135
136
        common.dedent()
137
        if sources_filter:
138 1
            log.error("No such dependency: %s", ' '.join(sources_filter))
139
            return 0
140 1
141
        return count
142 1
143
    def run_scripts(self, *names, depth=None, force=False):
144 1
        """Run scripts for the specified dependencies."""
145
        if depth == 0:
146 1
            log.info("Skipped directory: %s", self.location_path)
147
            return 0
148 1
149 1
        sources = self._get_sources()
150
        sources_filter = self._get_sources_filter(*names, sources=sources)
151 1
152 1
        shell.cd(self.location_path)
153 1
        common.newline()
154
        common.indent()
155 1
156 1
        count = 0
157 1
        for source in sources:
158 1
            if source.name in sources_filter:
159 1
                source.run_scripts(force=force)
160
                count += 1
161 1
162 1
                config = load_config(search=False)
163 1
                if config:
164 1
                    common.indent()
165
                    count += config.run_scripts(
166 1
                        depth=None if depth is None else max(0, depth - 1), force=force
167 1
                    )
168
                    common.dedent()
169 1
170
                shell.cd(self.location_path, _show=False)
171 1
172 1
        common.dedent()
173
174 1
        return count
175
176 1
    def lock_dependencies(self, *names, obey_existing=True, skip_changes=False):
177
        """Lock down the immediate dependency versions."""
178 1
        sources = self._get_sources(use_locked=obey_existing).copy()
179 1
        sources_filter = self._get_sources_filter(*names, sources=sources)
180 1
181
        shell.cd(self.location_path)
182 1
        common.newline()
183
        common.indent()
184 1
185 1
        count = 0
186
        for source in sources:
187 1
            if source.name not in sources_filter:
188 1
                log.info("Skipped dependency: %s", source.name)
189 1
                continue
190
191 1
            source_locked = source.lock(skip_changes=skip_changes)
192
193 1
            if source_locked is not None:
194 1
                try:
195 1
                    index = self.sources_locked.index(source)
196
                except ValueError:
197 1
                    self.sources_locked.append(source_locked)
198
                else:
199 1
                    self.sources_locked[index] = source_locked
200 1
                count += 1
201 1
202 1
            shell.cd(self.location_path, _show=False)
203
204
        if count:
205
            self.save()
206 1
207
        common.dedent()
208 1
209
        return count
210 1
211
    def uninstall_dependencies(self):
212 1
        """Delete the dependency storage location."""
213
        shell.cd(self.root)
214 1
        shell.rm(self.location_path)
215 1
        common.newline()
216
217 1
    def clean_dependencies(self):
218
        """Delete the dependency storage location."""
219 1
        for path in self.get_top_level_dependencies():
220 1
221 1
            if path == self.location_path:
222
                log.info("Skipped dependency: %s", path)
223 1
            else:
224 1
                shell.rm(path)
225
226 1
            common.newline()
227 1
228 1
        shell.rm(self.log_path)
229
230 1
    def get_top_level_dependencies(self):
231 1
        """Yield the path, repository, and hash of top-level dependencies."""
232 1
        if not os.path.exists(self.location_path):
233
            return
234 1
235 1
        shell.cd(self.location_path)
236
        common.newline()
237 1
        common.indent()
238 1
239 1
        for source in self.sources:
240 1
241 1
            yield os.path.join(self.location_path, source.name)
242
243 1
            shell.cd(self.location_path, _show=False)
244
245
        common.dedent()
246 1
247
    def get_dependencies(self, depth=None, allow_dirty=True):
248 1
        """Yield the path, repository, and hash of each dependency."""
249 1
        if not os.path.exists(self.location_path):
250
            return
251 1
252
        shell.cd(self.location_path)
253 1
        common.newline()
254 1
        common.indent()
255
256 1
        for source in self.sources:
257 1
258 1
            if depth == 0:
259
                log.info("Skipped dependency: %s", source.name)
260 1
                continue
261 1
262 1
            yield source.identify(allow_dirty=allow_dirty)
263 1
264 1
            config = load_config(search=False)
265
            if config:
266 1
                common.indent()
267 1
                yield from config.get_dependencies(
268
                    depth=None if depth is None else max(0, depth - 1),
269 1
                    allow_dirty=allow_dirty,
270
                )
271 1
                common.dedent()
272 1
273
            shell.cd(self.location_path, _show=False)
274 1
275
        common.dedent()
276 1
277
    def log(self, message="", *args):
278
        """Append a message to the log file."""
279 1
        with open(self.log_path, 'a') as outfile:
280 1
            outfile.write(message.format(*args) + '\n')
281 1
282 1
    def _get_sources(self, *, use_locked=None):
283 1
        """Merge source lists using the requested section as the base."""
284
        if use_locked is True:
285
            if self.sources_locked:
286
                return self.sources_locked
287
            log.info("No locked sources, defaulting to none...")
288
            return []
289
290
        sources: List[Source] = []
291
        if use_locked is False:
292
            sources = self.sources
293
        else:
294
            if self.sources_locked:
295
                log.info("Defaulting to locked sources...")
296
                sources = self.sources_locked
297
            else:
298
                log.info("No locked sources, using latest...")
299
                sources = self.sources
300
301
        extras = []
302
        for source in self.sources + self.sources_locked:
303
            if source not in sources:
304
                log.info("Source %r missing from selected section", source.name)
305
                extras.append(source)
306
307
        return sources + extras
308
309
    def _get_sources_filter(self, *names, sources):
310
        """Get filtered sublist of sources."""
311
        sources_filter = None
312
313
        groups_filter = [group for group in self.groups if group.name in list(names)]
314
315
        if groups_filter:
316
            sources_filter = [
317
                members for group in groups_filter for members in group.members
318
            ]
319
        else:
320
            sources_filter = list(names) if names else [s.name for s in sources]
321
322
        return sources_filter
323
324
325
def load_config(start=None, *, search=True):
326
    """Load the config for the current project."""
327
    if start:
328
        start = os.path.abspath(start)
329
    else:
330
        start = os.getcwd()
331
332
    if search:
333
        log.debug("Searching for config...")
334
335
    path = start
336
    while path != os.path.dirname(path):
337
        log.debug("Looking for config in: %s", path)
338
339
        for filename in os.listdir(path):
340
            if _valid_filename(filename):
341
                config = Config(path, filename)
342
                config._on_post_load()  # pylint: disable=protected-access
343
                log.debug("Found config: %s", config.path)
344
                return config
345
346
        if search:
347
            path = os.path.dirname(path)
348
        else:
349
            break
350
351
    if search:
352
        log.debug("No config found starting from: %s", start)
353
    else:
354
        log.debug("No config found in: %s", start)
355
356
    return None
357
358
359
def _valid_filename(filename):
360
    name, ext = os.path.splitext(filename.lower())
361
    if name.startswith('.'):
362
        name = name[1:]
363
    return name in ['gitman', 'gdm'] and ext in ['.yml', '.yaml']
364