Completed
Push — develop ( 9ba97b...51e53a )
by Jace
14s queued 11s
created

gitman.models.config.Config.__post_init__()   A

Complexity

Conditions 2

Size

Total Lines 3
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 2

Importance

Changes 0
Metric Value
cc 2
eloc 3
nop 1
dl 0
loc 3
ccs 2
cts 2
cp 1
crap 2
rs 10
c 0
b 0
f 0
1 1
import os
2 1
from typing import List, Optional
3
4 1
import log
5 1
from datafiles import datafile, field
6
7 1
from .. import common, exceptions, shell
8 1
from .group import Group
9 1
from .source import Source
10
11 1
12
@datafile("{self.root}/{self.filename}", defaults=True, manual=True)
13
class Config:
14 1
    """Specifies all dependencies for a project."""
15 1
16 1
    root: Optional[str] = None
17 1
    filename: str = "gitman.yml"
18 1
19
    location: str = "gitman_sources"
20
    sources: List[Source] = field(default_factory=list)
21 1
    sources_locked: List[Source] = field(default_factory=list)
22
    groups: List[Group] = field(default_factory=list)
23 1
24
    def __post_init__(self):
25 1
        if self.root is None:
26 1
            self.root = os.getcwd()
27 1
28 1
    def _on_post_load(self):
29 1
        for source in self.sources:
30 1
            source._on_post_load()  # pylint: disable=protected-access
31
32 1
        for source in self.sources_locked:
33
            source._on_post_load()  # pylint: disable=protected-access
34
35 1
        # check for conflicts between source names and group names
36 1
        for source in self.sources:
37
            for group in self.groups:
38 1
                if source.name == group.name:
39
                    msg = (
40
                        "Name conflict detected between source name and "
41 1
                        "group name \"{}\""
42
                    ).format(source.name)
43 1
                    raise exceptions.InvalidConfig(msg)
44
45
    @property
46 1
    def config_path(self):
47
        """Get the full path to the config file."""
48 1
        assert self.root
49
        return os.path.normpath(os.path.join(self.root, self.filename))
50 1
51 1
    path = config_path
52 1
53 1
    @property
54 1
    def log_path(self):
55 1
        """Get the full path to the log file."""
56 1
        return os.path.normpath(os.path.join(self.location_path, "gitman.log"))
57
58 1
    @property
59
    def location_path(self):
60 1
        """Get the full path to the dependency storage location."""
61
        assert self.root
62
        return os.path.normpath(os.path.join(self.root, self.location))
63
64 1
    def get_path(self, name=None):
65 1
        """Get the full path to a dependency or internal file."""
66 1
        base = self.location_path
67
        if name == '__config__':
68 1
            return self.path
69 1
        if name == '__log__':
70
            return self.log_path
71 1
        if name:
72 1
            return os.path.normpath(os.path.join(base, name))
73 1
        return base
74 1
75 1
    def install_dependencies(
76
        self,
77 1
        *names,
78 1
        depth=None,
79 1
        update=True,
80 1
        recurse=False,
81
        force=False,
82 1
        force_interactive=False,
83 1
        fetch=False,
84
        clean=True,
85 1
        skip_changes=False,
86 1
    ):  # pylint: disable=too-many-locals
87 1
        """Download or update the specified dependencies."""
88 1
        if depth == 0:
89
            log.info("Skipped directory: %s", self.location_path)
90 1
            return 0
91 1
92 1
        sources = self._get_sources(use_locked=False if update else None)
93 1
        sources_filter = self._get_sources_filter(*names, sources=sources)
94
95
        if not os.path.isdir(self.location_path):
96
            shell.mkdir(self.location_path)
97
        shell.cd(self.location_path)
98
        common.newline()
99
        common.indent()
100
101 1
        count = 0
102
        for source in sources:
103 1
            if source.name in sources_filter:
104
                sources_filter.remove(source.name)
105 1
            else:
106 1
                log.info("Skipped dependency: %s", source.name)
107 1
                continue
108 1
109
            source.update_files(
110 1
                force=force,
111
                force_interactive=force_interactive,
112 1
                fetch=fetch,
113
                clean=clean,
114 1
                skip_changes=skip_changes,
115 1
            )
116 1
            source.create_link(self.root, force=force)
117
            common.newline()
118 1
            count += 1
119 1
120
            config = load_config(search=False)
121 1
            if config:
122 1
                common.indent()
123 1
                count += config.install_dependencies(
124
                    depth=None if depth is None else max(0, depth - 1),
125 1
                    update=update and recurse,
126 1
                    recurse=recurse,
127 1
                    force=force,
128 1
                    fetch=fetch,
129 1
                    clean=clean,
130
                    skip_changes=skip_changes,
131 1
                )
132 1
                common.dedent()
133 1
134 1
            shell.cd(self.location_path, _show=False)
135
136
        common.dedent()
137
        if sources_filter:
138 1
            log.error("No such dependency: %s", ' '.join(sources_filter))
139
            return 0
140 1
141
        return count
142 1
143
    def run_scripts(self, *names, depth=None, force=False):
144 1
        """Run scripts for the specified dependencies."""
145
        if depth == 0:
146 1
            log.info("Skipped directory: %s", self.location_path)
147
            return 0
148 1
149 1
        sources = self._get_sources()
150
        sources_filter = self._get_sources_filter(*names, sources=sources)
151 1
152 1
        shell.cd(self.location_path)
153 1
        common.newline()
154
        common.indent()
155 1
156 1
        count = 0
157 1
        for source in sources:
158 1
            if source.name in sources_filter:
159 1
                source.run_scripts(force=force)
160
                count += 1
161 1
162 1
                config = load_config(search=False)
163 1
                if config:
164 1
                    common.indent()
165
                    count += config.run_scripts(
166 1
                        depth=None if depth is None else max(0, depth - 1), force=force
167 1
                    )
168
                    common.dedent()
169 1
170
                shell.cd(self.location_path, _show=False)
171 1
172 1
        common.dedent()
173
174 1
        return count
175
176 1
    def lock_dependencies(self, *names, obey_existing=True, skip_changes=False):
177
        """Lock down the immediate dependency versions."""
178 1
        sources = self._get_sources(use_locked=obey_existing).copy()
179 1
        sources_filter = self._get_sources_filter(*names, sources=sources)
180 1
181
        shell.cd(self.location_path)
182 1
        common.newline()
183
        common.indent()
184 1
185 1
        count = 0
186
        for source in sources:
187 1
            if source.name not in sources_filter:
188 1
                log.info("Skipped dependency: %s", source.name)
189 1
                continue
190
191 1
            source_locked = source.lock(skip_changes=skip_changes)
192
193 1
            if source_locked is not None:
194 1
                try:
195 1
                    index = self.sources_locked.index(source)
196
                except ValueError:
197 1
                    self.sources_locked.append(source_locked)
198
                else:
199 1
                    self.sources_locked[index] = source_locked
200 1
                count += 1
201 1
202 1
            shell.cd(self.location_path, _show=False)
203
204
        if count:
205
            self.datafile.save()
206 1
207
        common.dedent()
208 1
209
        return count
210 1
211
    def uninstall_dependencies(self):
212 1
        """Delete the dependency storage location."""
213
        shell.cd(self.root)
214 1
        shell.rm(self.location_path)
215 1
        common.newline()
216
217 1
    def clean_dependencies(self):
218
        """Delete the dependency storage location."""
219 1
        for path in self.get_top_level_dependencies():
220 1
221 1
            if path == self.location_path:
222
                log.info("Skipped dependency: %s", path)
223 1
            else:
224 1
                shell.rm(path)
225
226 1
            common.newline()
227 1
228 1
        shell.rm(self.log_path)
229
230 1
    def get_top_level_dependencies(self):
231 1
        """Yield the path, repository, and hash of top-level dependencies."""
232 1
        if not os.path.exists(self.location_path):
233
            return
234 1
235 1
        shell.cd(self.location_path)
236
        common.newline()
237 1
        common.indent()
238 1
239 1
        for source in self.sources:
240 1
241 1
            assert source.name
242
            yield os.path.join(self.location_path, source.name)
243 1
244
            shell.cd(self.location_path, _show=False)
245
246 1
        common.dedent()
247
248 1
    def get_dependencies(self, depth=None, allow_dirty=True):
249 1
        """Yield the path, repository, and hash of each dependency."""
250
        if not os.path.exists(self.location_path):
251 1
            return
252
253 1
        shell.cd(self.location_path)
254 1
        common.newline()
255
        common.indent()
256 1
257 1
        for source in self.sources:
258 1
259
            if depth == 0:
260 1
                log.info("Skipped dependency: %s", source.name)
261 1
                continue
262 1
263 1
            yield source.identify(allow_dirty=allow_dirty)
264 1
265
            config = load_config(search=False)
266 1
            if config:
267 1
                common.indent()
268
                yield from config.get_dependencies(
269 1
                    depth=None if depth is None else max(0, depth - 1),
270
                    allow_dirty=allow_dirty,
271 1
                )
272 1
                common.dedent()
273
274 1
            shell.cd(self.location_path, _show=False)
275
276 1
        common.dedent()
277
278
    def log(self, message="", *args):
279 1
        """Append a message to the log file."""
280 1
        with open(self.log_path, 'a') as outfile:
281 1
            outfile.write(message.format(*args) + '\n')
282 1
283 1
    def _get_sources(self, *, use_locked=None):
284
        """Merge source lists using the requested section as the base."""
285
        if use_locked is True:
286
            if self.sources_locked:
287
                return self.sources_locked
288
            log.info("No locked sources, defaulting to none...")
289
            return []
290
291
        sources: List[Source] = []
292
        if use_locked is False:
293
            sources = self.sources
294
        else:
295
            if self.sources_locked:
296
                log.info("Defaulting to locked sources...")
297
                sources = self.sources_locked
298
            else:
299
                log.info("No locked sources, using latest...")
300
                sources = self.sources
301
302
        extras = []
303
        for source in self.sources + self.sources_locked:
304
            if source not in sources:
305
                log.info("Source %r missing from selected section", source.name)
306
                extras.append(source)
307
308
        return sources + extras
309
310
    def _get_sources_filter(self, *names, sources):
311
        """Get filtered sublist of sources."""
312
        sources_filter = None
313
314
        groups_filter = [group for group in self.groups if group.name in list(names)]
315
316
        if groups_filter:
317
            sources_filter = [
318
                members for group in groups_filter for members in group.members
319
            ]
320
        else:
321
            sources_filter = list(names) if names else [s.name for s in sources]
322
323
        return sources_filter
324
325
326
def load_config(start=None, *, search=True):
327
    """Load the config for the current project."""
328
    if start:
329
        start = os.path.abspath(start)
330
    else:
331
        start = os.getcwd()
332
333
    if search:
334
        log.debug("Searching for config...")
335
336
    path = start
337
    while path != os.path.dirname(path):
338
        log.debug("Looking for config in: %s", path)
339
340
        for filename in os.listdir(path):
341
            if _valid_filename(filename):
342
                config = Config(path, filename)
343
                config._on_post_load()  # pylint: disable=protected-access
344
                log.debug("Found config: %s", config.path)
345
                return config
346
347
        if search:
348
            path = os.path.dirname(path)
349
        else:
350
            break
351
352
    if search:
353
        log.debug("No config found starting from: %s", start)
354
    else:
355
        log.debug("No config found in: %s", start)
356
357
    return None
358
359
360
def _valid_filename(filename):
361
    name, ext = os.path.splitext(filename.lower())
362
    if name.startswith('.'):
363
        name = name[1:]
364
    return name in ['gitman', 'gdm'] and ext in ['.yml', '.yaml']
365