Completed
Push — develop ( 9ba97b...51e53a )
by Jace
14s queued 11s
created

gitman.models.config.Config.install_dependencies()   C

Complexity

Conditions 9

Size

Total Lines 67
Code Lines 54

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 42
CRAP Score 9

Importance

Changes 0
Metric Value
cc 9
eloc 54
nop 10
dl 0
loc 67
ccs 42
cts 42
cp 1
crap 9
rs 6.1721
c 0
b 0
f 0

How to fix   Long Method    Many Parameters   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Many Parameters

Methods with many parameters are not only hard to understand, but their parameters also often become inconsistent when you need more, or different data.

There are several approaches to avoid long parameter lists:

1 1
import os
2 1
from typing import List, Optional
3
4 1
import log
5 1
from datafiles import datafile, field
6
7 1
from .. import common, exceptions, shell
8 1
from .group import Group
9 1
from .source import Source
10
11 1
12
@datafile("{self.root}/{self.filename}", defaults=True, manual=True)
13
class Config:
14 1
    """Specifies all dependencies for a project."""
15 1
16 1
    root: Optional[str] = None
17 1
    filename: str = "gitman.yml"
18 1
19
    location: str = "gitman_sources"
20
    sources: List[Source] = field(default_factory=list)
21 1
    sources_locked: List[Source] = field(default_factory=list)
22
    groups: List[Group] = field(default_factory=list)
23 1
24
    def __post_init__(self):
25 1
        if self.root is None:
26 1
            self.root = os.getcwd()
27 1
28 1
    def _on_post_load(self):
29 1
        for source in self.sources:
30 1
            source._on_post_load()  # pylint: disable=protected-access
31
32 1
        for source in self.sources_locked:
33
            source._on_post_load()  # pylint: disable=protected-access
34
35 1
        # check for conflicts between source names and group names
36 1
        for source in self.sources:
37
            for group in self.groups:
38 1
                if source.name == group.name:
39
                    msg = (
40
                        "Name conflict detected between source name and "
41 1
                        "group name \"{}\""
42
                    ).format(source.name)
43 1
                    raise exceptions.InvalidConfig(msg)
44
45
    @property
46 1
    def config_path(self):
47
        """Get the full path to the config file."""
48 1
        assert self.root
49
        return os.path.normpath(os.path.join(self.root, self.filename))
50 1
51 1
    path = config_path
52 1
53 1
    @property
54 1
    def log_path(self):
55 1
        """Get the full path to the log file."""
56 1
        return os.path.normpath(os.path.join(self.location_path, "gitman.log"))
57
58 1
    @property
59
    def location_path(self):
60 1
        """Get the full path to the dependency storage location."""
61
        assert self.root
62
        return os.path.normpath(os.path.join(self.root, self.location))
63
64 1
    def get_path(self, name=None):
65 1
        """Get the full path to a dependency or internal file."""
66 1
        base = self.location_path
67
        if name == '__config__':
68 1
            return self.path
69 1
        if name == '__log__':
70
            return self.log_path
71 1
        if name:
72 1
            return os.path.normpath(os.path.join(base, name))
73 1
        return base
74 1
75 1
    def install_dependencies(
76
        self,
77 1
        *names,
78 1
        depth=None,
79 1
        update=True,
80 1
        recurse=False,
81
        force=False,
82 1
        force_interactive=False,
83 1
        fetch=False,
84
        clean=True,
85 1
        skip_changes=False,
86 1
    ):  # pylint: disable=too-many-locals
87 1
        """Download or update the specified dependencies."""
88 1
        if depth == 0:
89
            log.info("Skipped directory: %s", self.location_path)
90 1
            return 0
91 1
92 1
        sources = self._get_sources(use_locked=False if update else None)
93 1
        sources_filter = self._get_sources_filter(*names, sources=sources)
94
95
        if not os.path.isdir(self.location_path):
96
            shell.mkdir(self.location_path)
97
        shell.cd(self.location_path)
98
        common.newline()
99
        common.indent()
100
101 1
        count = 0
102
        for source in sources:
103 1
            if source.name in sources_filter:
104
                sources_filter.remove(source.name)
105 1
            else:
106 1
                log.info("Skipped dependency: %s", source.name)
107 1
                continue
108 1
109
            source.update_files(
110 1
                force=force,
111
                force_interactive=force_interactive,
112 1
                fetch=fetch,
113
                clean=clean,
114 1
                skip_changes=skip_changes,
115 1
            )
116 1
            source.create_link(self.root, force=force)
117
            common.newline()
118 1
            count += 1
119 1
120
            config = load_config(search=False)
121 1
            if config:
122 1
                common.indent()
123 1
                count += config.install_dependencies(
124
                    depth=None if depth is None else max(0, depth - 1),
125 1
                    update=update and recurse,
126 1
                    recurse=recurse,
127 1
                    force=force,
128 1
                    fetch=fetch,
129 1
                    clean=clean,
130
                    skip_changes=skip_changes,
131 1
                )
132 1
                common.dedent()
133 1
134 1
            shell.cd(self.location_path, _show=False)
135
136
        common.dedent()
137
        if sources_filter:
138 1
            log.error("No such dependency: %s", ' '.join(sources_filter))
139
            return 0
140 1
141
        return count
142 1
143
    def run_scripts(self, *names, depth=None, force=False):
144 1
        """Run scripts for the specified dependencies."""
145
        if depth == 0:
146 1
            log.info("Skipped directory: %s", self.location_path)
147
            return 0
148 1
149 1
        sources = self._get_sources()
150
        sources_filter = self._get_sources_filter(*names, sources=sources)
151 1
152 1
        shell.cd(self.location_path)
153 1
        common.newline()
154
        common.indent()
155 1
156 1
        count = 0
157 1
        for source in sources:
158 1
            if source.name in sources_filter:
159 1
                source.run_scripts(force=force)
160
                count += 1
161 1
162 1
                config = load_config(search=False)
163 1
                if config:
164 1
                    common.indent()
165
                    count += config.run_scripts(
166 1
                        depth=None if depth is None else max(0, depth - 1), force=force
167 1
                    )
168
                    common.dedent()
169 1
170
                shell.cd(self.location_path, _show=False)
171 1
172 1
        common.dedent()
173
174 1
        return count
175
176 1
    def lock_dependencies(self, *names, obey_existing=True, skip_changes=False):
177
        """Lock down the immediate dependency versions."""
178 1
        sources = self._get_sources(use_locked=obey_existing).copy()
179 1
        sources_filter = self._get_sources_filter(*names, sources=sources)
180 1
181
        shell.cd(self.location_path)
182 1
        common.newline()
183
        common.indent()
184 1
185 1
        count = 0
186
        for source in sources:
187 1
            if source.name not in sources_filter:
188 1
                log.info("Skipped dependency: %s", source.name)
189 1
                continue
190
191 1
            source_locked = source.lock(skip_changes=skip_changes)
192
193 1
            if source_locked is not None:
194 1
                try:
195 1
                    index = self.sources_locked.index(source)
196
                except ValueError:
197 1
                    self.sources_locked.append(source_locked)
198
                else:
199 1
                    self.sources_locked[index] = source_locked
200 1
                count += 1
201 1
202 1
            shell.cd(self.location_path, _show=False)
203
204
        if count:
205
            self.datafile.save()
206 1
207
        common.dedent()
208 1
209
        return count
210 1
211
    def uninstall_dependencies(self):
212 1
        """Delete the dependency storage location."""
213
        shell.cd(self.root)
214 1
        shell.rm(self.location_path)
215 1
        common.newline()
216
217 1
    def clean_dependencies(self):
218
        """Delete the dependency storage location."""
219 1
        for path in self.get_top_level_dependencies():
220 1
221 1
            if path == self.location_path:
222
                log.info("Skipped dependency: %s", path)
223 1
            else:
224 1
                shell.rm(path)
225
226 1
            common.newline()
227 1
228 1
        shell.rm(self.log_path)
229
230 1
    def get_top_level_dependencies(self):
231 1
        """Yield the path, repository, and hash of top-level dependencies."""
232 1
        if not os.path.exists(self.location_path):
233
            return
234 1
235 1
        shell.cd(self.location_path)
236
        common.newline()
237 1
        common.indent()
238 1
239 1
        for source in self.sources:
240 1
241 1
            assert source.name
242
            yield os.path.join(self.location_path, source.name)
243 1
244
            shell.cd(self.location_path, _show=False)
245
246 1
        common.dedent()
247
248 1
    def get_dependencies(self, depth=None, allow_dirty=True):
249 1
        """Yield the path, repository, and hash of each dependency."""
250
        if not os.path.exists(self.location_path):
251 1
            return
252
253 1
        shell.cd(self.location_path)
254 1
        common.newline()
255
        common.indent()
256 1
257 1
        for source in self.sources:
258 1
259
            if depth == 0:
260 1
                log.info("Skipped dependency: %s", source.name)
261 1
                continue
262 1
263 1
            yield source.identify(allow_dirty=allow_dirty)
264 1
265
            config = load_config(search=False)
266 1
            if config:
267 1
                common.indent()
268
                yield from config.get_dependencies(
269 1
                    depth=None if depth is None else max(0, depth - 1),
270
                    allow_dirty=allow_dirty,
271 1
                )
272 1
                common.dedent()
273
274 1
            shell.cd(self.location_path, _show=False)
275
276 1
        common.dedent()
277
278
    def log(self, message="", *args):
279 1
        """Append a message to the log file."""
280 1
        with open(self.log_path, 'a') as outfile:
281 1
            outfile.write(message.format(*args) + '\n')
282 1
283 1
    def _get_sources(self, *, use_locked=None):
284
        """Merge source lists using the requested section as the base."""
285
        if use_locked is True:
286
            if self.sources_locked:
287
                return self.sources_locked
288
            log.info("No locked sources, defaulting to none...")
289
            return []
290
291
        sources: List[Source] = []
292
        if use_locked is False:
293
            sources = self.sources
294
        else:
295
            if self.sources_locked:
296
                log.info("Defaulting to locked sources...")
297
                sources = self.sources_locked
298
            else:
299
                log.info("No locked sources, using latest...")
300
                sources = self.sources
301
302
        extras = []
303
        for source in self.sources + self.sources_locked:
304
            if source not in sources:
305
                log.info("Source %r missing from selected section", source.name)
306
                extras.append(source)
307
308
        return sources + extras
309
310
    def _get_sources_filter(self, *names, sources):
311
        """Get filtered sublist of sources."""
312
        sources_filter = None
313
314
        groups_filter = [group for group in self.groups if group.name in list(names)]
315
316
        if groups_filter:
317
            sources_filter = [
318
                members for group in groups_filter for members in group.members
319
            ]
320
        else:
321
            sources_filter = list(names) if names else [s.name for s in sources]
322
323
        return sources_filter
324
325
326
def load_config(start=None, *, search=True):
327
    """Load the config for the current project."""
328
    if start:
329
        start = os.path.abspath(start)
330
    else:
331
        start = os.getcwd()
332
333
    if search:
334
        log.debug("Searching for config...")
335
336
    path = start
337
    while path != os.path.dirname(path):
338
        log.debug("Looking for config in: %s", path)
339
340
        for filename in os.listdir(path):
341
            if _valid_filename(filename):
342
                config = Config(path, filename)
343
                config._on_post_load()  # pylint: disable=protected-access
344
                log.debug("Found config: %s", config.path)
345
                return config
346
347
        if search:
348
            path = os.path.dirname(path)
349
        else:
350
            break
351
352
    if search:
353
        log.debug("No config found starting from: %s", start)
354
    else:
355
        log.debug("No config found in: %s", start)
356
357
    return None
358
359
360
def _valid_filename(filename):
361
    name, ext = os.path.splitext(filename.lower())
362
    if name.startswith('.'):
363
        name = name[1:]
364
    return name in ['gitman', 'gdm'] and ext in ['.yml', '.yaml']
365