Passed
Pull Request — develop (#203)
by Jace
02:44
created

gitman.models.config.Config.__init__()   A

Complexity

Conditions 1

Size

Total Lines 8
Code Lines 8

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 7
CRAP Score 1

Importance

Changes 0
Metric Value
cc 1
eloc 8
nop 4
dl 0
loc 8
ccs 7
cts 7
cp 1
crap 1
rs 10
c 0
b 0
f 0
1 1
import os
2 1
from typing import List, Optional
3
4 1
import log
5 1
6
from datafiles import datafile, field
7 1
8 1
from .. import common, exceptions, shell
9 1
from .group import Group
10
from .source import Source
11 1
12
13
@datafile("{self.root}/{self.filename}", manual=True)
14 1
class Config:
15 1
    """Specifies all dependencies for a project."""
16 1
17 1
    root: Optional[str] = None
0 ignored issues
show
introduced by
Variable annotation syntax is only supported in Python 3.6 and greater (<unknown>, line 17)
Loading history...
18 1
    filename: str = "gitman.yml"
19
20
    location: str = "gitman_sources"
21 1
    sources: List[Source] = field(default_factory=list)
22
    sources_locked: List[Source] = field(default_factory=list)
23 1
    groups: List[Group] = field(default_factory=list)
24
25 1
    LOG = "gitman.log"
26 1
27 1
    def __post_init__(self):
28 1
        if self.root is None:
29 1
            self.root = os.getcwd()
30 1
31
    def _on_post_load(self):
32 1
        for source in self.sources:
33
            source._on_post_load()  # pylint: disable=protected-access
34
35 1
        for source in self.sources_locked:
36 1
            source._on_post_load()  # pylint: disable=protected-access
37
38 1
        # check for conflicts between source names and group names
39
        for source in self.sources:
40
            for group in self.groups:
41 1
                if source.name == group.name:
42
                    msg = (
43 1
                        "Name conflict detected between source name and "
44
                        "group name \"{}\""
45
                    ).format(source.name)
46 1
                    raise exceptions.InvalidConfig(msg)
47
48 1
    @property
49
    def config_path(self):
50 1
        """Get the full path to the config file."""
51 1
        return os.path.normpath(os.path.join(self.root, self.filename))
52 1
53 1
    path = config_path
54 1
55 1
    @property
56 1
    def log_path(self):
57
        """Get the full path to the log file."""
58 1
        return os.path.normpath(os.path.join(self.location_path, self.LOG))
59
60 1
    @property
61
    def location_path(self):
62
        """Get the full path to the dependency storage location."""
63
        return os.path.normpath(os.path.join(self.root, self.location))
64 1
65 1
    def get_path(self, name=None):
66 1
        """Get the full path to a dependency or internal file."""
67
        base = self.location_path
68 1
        if name == '__config__':
69 1
            return self.path
70
        if name == '__log__':
71 1
            return self.log_path
72 1
        if name:
73 1
            return os.path.normpath(os.path.join(base, name))
74 1
        return base
75 1
76
    def install_dependencies(
77 1
        self,
78 1
        *names,
79 1
        depth=None,
80 1
        update=True,
81
        recurse=False,
82 1
        force=False,
83 1
        force_interactive=False,
84
        fetch=False,
85 1
        clean=True,
86 1
        skip_changes=False,
87 1
    ):  # pylint: disable=too-many-locals
88 1
        """Download or update the specified dependencies."""
89
        if depth == 0:
90 1
            log.info("Skipped directory: %s", self.location_path)
91 1
            return 0
92 1
93 1
        sources = self._get_sources(use_locked=False if update else None)
94
        sources_filter = self._get_sources_filter(*names, sources=sources)
95
96
        if not os.path.isdir(self.location_path):
97
            shell.mkdir(self.location_path)
98
        shell.cd(self.location_path)
99
        common.newline()
100
        common.indent()
101 1
102
        count = 0
103 1
        for source in sources:
104
            if source.name in sources_filter:
105 1
                sources_filter.remove(source.name)
106 1
            else:
107 1
                log.info("Skipped dependency: %s", source.name)
108 1
                continue
109
110 1
            source.update_files(
111
                force=force,
112 1
                force_interactive=force_interactive,
113
                fetch=fetch,
114 1
                clean=clean,
115 1
                skip_changes=skip_changes,
116 1
            )
117
            source.create_link(self.root, force=force)
118 1
            common.newline()
119 1
            count += 1
120
121 1
            config = load_config(search=False)
122 1
            if config:
123 1
                common.indent()
124
                count += config.install_dependencies(
125 1
                    depth=None if depth is None else max(0, depth - 1),
126 1
                    update=update and recurse,
127 1
                    recurse=recurse,
128 1
                    force=force,
129 1
                    fetch=fetch,
130
                    clean=clean,
131 1
                    skip_changes=skip_changes,
132 1
                )
133 1
                common.dedent()
134 1
135
            shell.cd(self.location_path, _show=False)
136
137
        common.dedent()
138 1
        if sources_filter:
139
            log.error("No such dependency: %s", ' '.join(sources_filter))
140 1
            return 0
141
142 1
        return count
143
144 1
    def run_scripts(self, *names, depth=None, force=False):
145
        """Run scripts for the specified dependencies."""
146 1
        if depth == 0:
147
            log.info("Skipped directory: %s", self.location_path)
148 1
            return 0
149 1
150
        sources = self._get_sources()
151 1
        sources_filter = self._get_sources_filter(*names, sources=sources)
152 1
153 1
        shell.cd(self.location_path)
154
        common.newline()
155 1
        common.indent()
156 1
157 1
        count = 0
158 1
        for source in sources:
159 1
            if source.name in sources_filter:
160
                source.run_scripts(force=force)
161 1
                count += 1
162 1
163 1
                config = load_config(search=False)
164 1
                if config:
165
                    common.indent()
166 1
                    count += config.run_scripts(
167 1
                        depth=None if depth is None else max(0, depth - 1), force=force
168
                    )
169 1
                    common.dedent()
170
171 1
                shell.cd(self.location_path, _show=False)
172 1
173
        common.dedent()
174 1
175
        return count
176 1
177
    def lock_dependencies(self, *names, obey_existing=True, skip_changes=False):
178 1
        """Lock down the immediate dependency versions."""
179 1
        sources = self._get_sources(use_locked=obey_existing).copy()
180 1
        sources_filter = self._get_sources_filter(*names, sources=sources)
181
182 1
        shell.cd(self.location_path)
183
        common.newline()
184 1
        common.indent()
185 1
186
        count = 0
187 1
        for source in sources:
188 1
            if source.name not in sources_filter:
189 1
                log.info("Skipped dependency: %s", source.name)
190
                continue
191 1
192
            source_locked = source.lock(skip_changes=skip_changes)
193 1
194 1
            if source_locked is not None:
195 1
                try:
196
                    index = self.sources_locked.index(source)
197 1
                except ValueError:
198
                    self.sources_locked.append(source_locked)
199 1
                else:
200 1
                    self.sources_locked[index] = source_locked
201 1
                count += 1
202 1
203
            shell.cd(self.location_path, _show=False)
204
205
        if count:
206 1
            self.datafile.save()
207
208 1
        common.dedent()
209
210 1
        return count
211
212 1
    def uninstall_dependencies(self):
213
        """Delete the dependency storage location."""
214 1
        shell.cd(self.root)
215 1
        shell.rm(self.location_path)
216
        common.newline()
217 1
218
    def clean_dependencies(self):
219 1
        """Delete the dependency storage location."""
220 1
        for path in self.get_top_level_dependencies():
221 1
222
            if path == self.location_path:
223 1
                log.info("Skipped dependency: %s", path)
224 1
            else:
225
                shell.rm(path)
226 1
227 1
            common.newline()
228 1
229
        shell.rm(self.log_path)
230 1
231 1
    def get_top_level_dependencies(self):
232 1
        """Yield the path, repository, and hash of top-level dependencies."""
233
        if not os.path.exists(self.location_path):
234 1
            return
235 1
236
        shell.cd(self.location_path)
237 1
        common.newline()
238 1
        common.indent()
239 1
240 1
        for source in self.sources:
241 1
242
            yield os.path.join(self.location_path, source.name)
243 1
244
            shell.cd(self.location_path, _show=False)
245
246 1
        common.dedent()
247
248 1
    def get_dependencies(self, depth=None, allow_dirty=True):
249 1
        """Yield the path, repository, and hash of each dependency."""
250
        if not os.path.exists(self.location_path):
251 1
            return
252
253 1
        shell.cd(self.location_path)
254 1
        common.newline()
255
        common.indent()
256 1
257 1
        for source in self.sources:
258 1
259
            if depth == 0:
260 1
                log.info("Skipped dependency: %s", source.name)
261 1
                continue
262 1
263 1
            yield source.identify(allow_dirty=allow_dirty)
264 1
265
            config = load_config(search=False)
266 1
            if config:
267 1
                common.indent()
268
                yield from config.get_dependencies(
269 1
                    depth=None if depth is None else max(0, depth - 1),
270
                    allow_dirty=allow_dirty,
271 1
                )
272 1
                common.dedent()
273
274 1
            shell.cd(self.location_path, _show=False)
275
276 1
        common.dedent()
277
278
    def log(self, message="", *args):
279 1
        """Append a message to the log file."""
280 1
        with open(self.log_path, 'a') as outfile:
281 1
            outfile.write(message.format(*args) + '\n')
282 1
283 1
    def _get_sources(self, *, use_locked=None):
284
        """Merge source lists using the requested section as the base."""
285
        if use_locked is True:
286
            if self.sources_locked:
287
                return self.sources_locked
288
            log.info("No locked sources, defaulting to none...")
289
            return []
290
291
        sources: List[Source] = []
292
        if use_locked is False:
293
            sources = self.sources
294
        else:
295
            if self.sources_locked:
296
                log.info("Defaulting to locked sources...")
297
                sources = self.sources_locked
298
            else:
299
                log.info("No locked sources, using latest...")
300
                sources = self.sources
301
302
        extras = []
303
        for source in self.sources + self.sources_locked:
304
            if source not in sources:
305
                log.info("Source %r missing from selected section", source.name)
306
                extras.append(source)
307
308
        return sources + extras
309
310
    def _get_sources_filter(self, *names, sources):
311
        """Get filtered sublist of sources."""
312
        sources_filter = None
313
314
        groups_filter = [group for group in self.groups if group.name in list(names)]
315
316
        if groups_filter:
317
            sources_filter = [
318
                members for group in groups_filter for members in group.members
319
            ]
320
        else:
321
            sources_filter = list(names) if names else [s.name for s in sources]
322
323
        return sources_filter
324
325
326
def load_config(start=None, *, search=True):
327
    """Load the config for the current project."""
328
    if start:
329
        start = os.path.abspath(start)
330
    else:
331
        start = os.getcwd()
332
333
    if search:
334
        log.debug("Searching for config...")
335
336
    path = start
337
    while path != os.path.dirname(path):
338
        log.debug("Looking for config in: %s", path)
339
340
        for filename in os.listdir(path):
341
            if _valid_filename(filename):
342
                config = Config(path, filename)
343
                config._on_post_load()  # pylint: disable=protected-access
344
                log.debug("Found config: %s", config.path)
345
                return config
346
347
        if search:
348
            path = os.path.dirname(path)
349
        else:
350
            break
351
352
    if search:
353
        log.debug("No config found starting from: %s", start)
354
    else:
355
        log.debug("No config found in: %s", start)
356
357
    return None
358
359
360
def _valid_filename(filename):
361
    name, ext = os.path.splitext(filename.lower())
362
    if name.startswith('.'):
363
        name = name[1:]
364
    return name in ['gitman', 'gdm'] and ext in ['.yml', '.yaml']
365