gitman.models.config.Config.run_scripts()   B
last analyzed

Complexity

Conditions 6

Size

Total Lines 32
Code Lines 23

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 21
CRAP Score 6

Importance

Changes 0
Metric Value
cc 6
eloc 23
nop 5
dl 0
loc 32
ccs 21
cts 21
cp 1
crap 6
rs 8.3946
c 0
b 0
f 0
1 1
import os
2 1
from typing import List, Optional
3
4 1
import log
5 1
from datafiles import datafile, field
6
7 1
from .. import common, exceptions, shell
8 1
from .group import Group
9 1
from .source import Source
10
11 1
12
@datafile("{self.root}/{self.filename}", defaults=True, manual=True)
13
class Config:
14 1
    """Specifies all dependencies for a project."""
15 1
16 1
    root: Optional[str] = None
17 1
    filename: str = "gitman.yml"
18 1
19
    location: str = "gitman_sources"
20
    sources: List[Source] = field(default_factory=list)
21 1
    sources_locked: List[Source] = field(default_factory=list)
22
    groups: List[Group] = field(default_factory=list)
23 1
24
    def __post_init__(self):
25 1
        if self.root is None:
26 1
            self.root = os.getcwd()
27 1
28 1
    @property
29 1
    def config_path(self):
30 1
        """Get the full path to the config file."""
31
        assert self.root
32 1
        return os.path.normpath(os.path.join(self.root, self.filename))
33
34
    path = config_path
35 1
36 1
    @property
37
    def log_path(self):
38 1
        """Get the full path to the log file."""
39
        return os.path.normpath(os.path.join(self.location_path, "gitman.log"))
40
41 1
    @property
42
    def location_path(self):
43 1
        """Get the full path to the dependency storage location."""
44
        assert self.root
45
        return os.path.normpath(os.path.join(self.root, self.location))
46 1
47
    def validate(self):
48 1
        """Check for conflicts between source names and group names."""
49
        for source in self.sources:
50 1
            for group in self.groups:
51 1
                if source.name == group.name:
52 1
                    msg = (
53 1
                        "Name conflict detected between source name and "
54 1
                        "group name \"{}\""
55 1
                    ).format(source.name)
56 1
                    raise exceptions.InvalidConfig(msg)
57
58 1
    def get_path(self, name=None):
59
        """Get the full path to a dependency or internal file."""
60 1
        base = self.location_path
61
        if name == '__config__':
62
            return self.path
63
        if name == '__log__':
64 1
            return self.log_path
65 1
        if name:
66 1
            return os.path.normpath(os.path.join(base, name))
67
        return base
68 1
69 1
    def install_dependencies(
70
        self,
71 1
        *names,
72 1
        depth=None,
73 1
        update=True,
74 1
        recurse=False,
75 1
        force=False,
76
        force_interactive=False,
77 1
        fetch=False,
78 1
        clean=True,
79 1
        skip_changes=False,
80 1
    ):  # pylint: disable=too-many-locals
81
        """Download or update the specified dependencies."""
82 1
        if depth == 0:
83 1
            log.info("Skipped directory: %s", self.location_path)
84
            return 0
85 1
86 1
        sources = self._get_sources(use_locked=False if update else None)
87 1
        sources_filter = self._get_sources_filter(*names, sources=sources)
88 1
89
        if not os.path.isdir(self.location_path):
90 1
            shell.mkdir(self.location_path)
91 1
        shell.cd(self.location_path)
92 1
        common.newline()
93 1
        common.indent()
94
95
        count = 0
96
        for source in sources:
97
            if source.name in sources_filter:
98
                sources_filter.remove(source.name)
99
            else:
100
                log.info("Skipped dependency: %s", source.name)
101 1
                continue
102
103 1
            source.update_files(
104
                force=force,
105 1
                force_interactive=force_interactive,
106 1
                fetch=fetch,
107 1
                clean=clean,
108 1
                skip_changes=skip_changes,
109
            )
110 1
            source.create_link(self.root, force=force)
111
            common.newline()
112 1
            count += 1
113
114 1
            config = load_config(search=False)
115 1
            if config:
116 1
                common.indent()
117
                count += config.install_dependencies(
118 1
                    depth=None if depth is None else max(0, depth - 1),
119 1
                    update=update and recurse,
120
                    recurse=recurse,
121 1
                    force=force,
122 1
                    fetch=fetch,
123 1
                    clean=clean,
124
                    skip_changes=skip_changes,
125 1
                )
126 1
                common.dedent()
127 1
128 1
            shell.cd(self.location_path, _show=False)
129 1
130
        common.dedent()
131 1
        if sources_filter:
132 1
            log.error("No such dependency: %s", ' '.join(sources_filter))
133 1
            return 0
134 1
135
        return count
136
137
    def run_scripts(self, *names, depth=None, force=False, show_shell_stdout=False):
138 1
        """Run scripts for the specified dependencies."""
139
        if depth == 0:
140 1
            log.info("Skipped directory: %s", self.location_path)
141
            return 0
142 1
143
        sources = self._get_sources()
144 1
        sources_filter = self._get_sources_filter(*names, sources=sources)
145
146 1
        shell.cd(self.location_path)
147
        common.newline()
148 1
        common.indent()
149 1
150
        count = 0
151 1
        for source in sources:
152 1
            if source.name in sources_filter:
153 1
                source.run_scripts(force=force, show_shell_stdout=show_shell_stdout)
154
                count += 1
155 1
156 1
                config = load_config(search=False)
157 1
                if config:
158 1
                    common.indent()
159 1
                    count += config.run_scripts(
160
                        depth=None if depth is None else max(0, depth - 1), force=force
161 1
                    )
162 1
                    common.dedent()
163 1
164 1
                shell.cd(self.location_path, _show=False)
165
166 1
        common.dedent()
167 1
168
        return count
169 1
170
    def lock_dependencies(self, *names, obey_existing=True, skip_changes=False):
171 1
        """Lock down the immediate dependency versions."""
172 1
        sources = self._get_sources(use_locked=obey_existing).copy()
173
        sources_filter = self._get_sources_filter(*names, sources=sources)
174 1
175
        shell.cd(self.location_path)
176 1
        common.newline()
177
        common.indent()
178 1
179 1
        count = 0
180 1
        for source in sources:
181
            if source.name not in sources_filter:
182 1
                log.info("Skipped dependency: %s", source.name)
183
                continue
184 1
185 1
            source_locked = source.lock(skip_changes=skip_changes)
186
187 1
            if source_locked is not None:
188 1
                try:
189 1
                    index = self.sources_locked.index(source)
190
                except ValueError:
191 1
                    self.sources_locked.append(source_locked)
192
                else:
193 1
                    self.sources_locked[index] = source_locked
194 1
                count += 1
195 1
196
            shell.cd(self.location_path, _show=False)
197 1
198
        if count:
199 1
            self.datafile.save()
200 1
201 1
        common.dedent()
202 1
203
        return count
204
205
    def uninstall_dependencies(self):
206 1
        """Delete the dependency storage location."""
207
        shell.cd(self.root)
208 1
        shell.rm(self.location_path)
209
        common.newline()
210 1
211
    def clean_dependencies(self):
212 1
        """Delete the dependency storage location."""
213
        for path in self.get_top_level_dependencies():
214 1
215 1
            if path == self.location_path:
216
                log.info("Skipped dependency: %s", path)
217 1
            else:
218
                shell.rm(path)
219 1
220 1
            common.newline()
221 1
222
        shell.rm(self.log_path)
223 1
224 1
    def get_top_level_dependencies(self):
225
        """Yield the path, repository, and hash of top-level dependencies."""
226 1
        if not os.path.exists(self.location_path):
227 1
            return
228 1
229
        shell.cd(self.location_path)
230 1
        common.newline()
231 1
        common.indent()
232 1
233
        for source in self.sources:
234 1
235 1
            assert source.name
236
            yield os.path.join(self.location_path, source.name)
237 1
238 1
            shell.cd(self.location_path, _show=False)
239 1
240 1
        common.dedent()
241 1
242
    def get_dependencies(self, depth=None, allow_dirty=True):
243 1
        """Yield the path, repository, and hash of each dependency."""
244
        if not os.path.exists(self.location_path):
245
            return
246 1
247
        shell.cd(self.location_path)
248 1
        common.newline()
249 1
        common.indent()
250
251 1
        for source in self.sources:
252
253 1
            if depth == 0:
254 1
                log.info("Skipped dependency: %s", source.name)
255
                continue
256 1
257 1
            yield source.identify(allow_dirty=allow_dirty)
258 1
259
            config = load_config(search=False)
260 1
            if config:
261 1
                common.indent()
262 1
                yield from config.get_dependencies(
263 1
                    depth=None if depth is None else max(0, depth - 1),
264 1
                    allow_dirty=allow_dirty,
265
                )
266 1
                common.dedent()
267 1
268
            shell.cd(self.location_path, _show=False)
269 1
270
        common.dedent()
271 1
272 1
    def log(self, message="", *args):
273
        """Append a message to the log file."""
274 1
        with open(self.log_path, 'a') as outfile:
275
            outfile.write(message.format(*args) + '\n')
276 1
277
    def _get_sources(self, *, use_locked=None):
278
        """Merge source lists using the requested section as the base."""
279 1
        if use_locked is True:
280 1
            if self.sources_locked:
281 1
                return self.sources_locked
282 1
            log.info("No locked sources, defaulting to none...")
283 1
            return []
284
285
        sources: List[Source] = []
286
        if use_locked is False:
287
            sources = self.sources
288
        else:
289
            if self.sources_locked:
290
                log.info("Defaulting to locked sources...")
291
                sources = self.sources_locked
292
            else:
293
                log.info("No locked sources, using latest...")
294
                sources = self.sources
295
296
        extras = []
297
        for source in self.sources + self.sources_locked:
298
            if source not in sources:
299
                log.info("Source %r missing from selected section", source.name)
300
                extras.append(source)
301
302
        return sources + extras
303
304
    def _get_sources_filter(self, *names, sources):
305
        """Get filtered sublist of sources."""
306
        sources_filter = None
307
308
        groups_filter = [group for group in self.groups if group.name in list(names)]
309
310
        if groups_filter:
311
            sources_filter = [
312
                members for group in groups_filter for members in group.members
313
            ]
314
        else:
315
            sources_filter = list(names) if names else [s.name for s in sources]
316
317
        return sources_filter
318
319
320
def load_config(start=None, *, search=True):
321
    """Load the config for the current project."""
322
    if start:
323
        start = os.path.abspath(start)
324
    else:
325
        start = os.getcwd()
326
327
    if search:
328
        log.debug("Searching for config...")
329
330
    path = start
331
    while path != os.path.dirname(path):
332
        log.debug("Looking for config in: %s", path)
333
334
        for filename in os.listdir(path):
335
            if _valid_filename(filename):
336
                config = Config(path, filename)
337
                config.validate()
338
                log.debug("Found config: %s", config.path)
339
                return config
340
341
        if search:
342
            path = os.path.dirname(path)
343
        else:
344
            break
345
346
    if search:
347
        log.debug("No config found starting from: %s", start)
348
    else:
349
        log.debug("No config found in: %s", start)
350
351
    return None
352
353
354
def _valid_filename(filename):
355
    name, ext = os.path.splitext(filename.lower())
356
    if name.startswith('.'):
357
        name = name[1:]
358
    return name in {'gitman', 'gdm'} and ext in {'.yml', '.yaml'}
359