Passed
Pull Request — develop (#203)
by Jace
01:54
created

gitman.models.config.Config.install_dependencies()   C

Complexity

Conditions 9

Size

Total Lines 67
Code Lines 54

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 42
CRAP Score 9

Importance

Changes 0
Metric Value
cc 9
eloc 54
nop 10
dl 0
loc 67
ccs 42
cts 42
cp 1
crap 9
rs 6.1721
c 0
b 0
f 0

How to fix   Long Method    Many Parameters   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Many Parameters

Methods with many parameters are not only hard to understand, but their parameters also often become inconsistent when you need more, or different data.

There are several approaches to avoid long parameter lists:

1 1
import os
2 1
from typing import List, Optional
3
4 1
import log
5 1
6
from datafiles import datafile, field
7 1
8 1
from .. import common, exceptions, shell
9 1
from .group import Group
10
from .source import Source
11 1
12
13
@datafile("{self.root}/{self.filename}", manual=True)
14 1
class Config:
15 1
    """Specifies all dependencies for a project."""
16 1
17 1
    root: Optional[str] = None
0 ignored issues
show
introduced by
Variable annotation syntax is only supported in Python 3.6 and greater (<unknown>, line 17)
Loading history...
18 1
    filename: str = "gitman.yml"
19
20
    location: str = "gitman_sources"
21 1
    sources: List[Source] = field(default_factory=list)
22
    sources_locked: List[Source] = field(default_factory=list)
23 1
    groups: List[Group] = field(default_factory=list)
24
25 1
    LOG = "gitman.log"
26 1
27 1
    def __post_init__(self):
28 1
        if self.root is None:
29 1
            self.root = os.getcwd()
30 1
31
    def _on_post_load(self):
32 1
        for source in self.sources:
33
            source._on_post_load()  # pylint: disable=protected-access
34
35 1
        for source in self.sources_locked:
36 1
            source._on_post_load()  # pylint: disable=protected-access
37
38 1
        # check for conflicts between source names and group names
39
        for source in self.sources:
40
            for group in self.groups:
41 1
                if source.name == group.name:
42
                    msg = (
43 1
                        "Name conflict detected between source name and "
44
                        "group name \"{}\""
45
                    ).format(source.name)
46 1
                    raise exceptions.InvalidConfig(msg)
47
48 1
    @property
49
    def config_path(self):
50 1
        """Get the full path to the config file."""
51 1
        return os.path.normpath(os.path.join(self.root, self.filename))
52 1
53 1
    path = config_path
54 1
55 1
    @property
56 1
    def log_path(self):
57
        """Get the full path to the log file."""
58 1
        return os.path.normpath(os.path.join(self.location_path, self.LOG))
59
60 1
    @property
61
    def location_path(self):
62
        """Get the full path to the dependency storage location."""
63
        return os.path.normpath(os.path.join(self.root, self.location))
64 1
65 1
    def get_path(self, name=None):
66 1
        """Get the full path to a dependency or internal file."""
67
        base = self.location_path
68 1
        if name == '__config__':
69 1
            return self.path
70
        if name == '__log__':
71 1
            return self.log_path
72 1
        if name:
73 1
            return os.path.normpath(os.path.join(base, name))
74 1
        return base
75 1
76
    def install_dependencies(
77 1
        self,
78 1
        *names,
79 1
        depth=None,
80 1
        update=True,
81
        recurse=False,
82 1
        force=False,
83 1
        force_interactive=False,
84
        fetch=False,
85 1
        clean=True,
86 1
        skip_changes=False,
87 1
    ):  # pylint: disable=too-many-locals
88 1
        """Download or update the specified dependencies."""
89
        if depth == 0:
90 1
            log.info("Skipped directory: %s", self.location_path)
91 1
            return 0
92 1
93 1
        sources = self._get_sources(use_locked=False if update else None)
94
        sources_filter = self._get_sources_filter(*names, sources=sources)
95
96
        if not os.path.isdir(self.location_path):
97
            shell.mkdir(self.location_path)
98
        shell.cd(self.location_path)
99
        common.newline()
100
        common.indent()
101 1
102
        count = 0
103 1
        for source in sources:
104
            if source.name in sources_filter:
105 1
                sources_filter.remove(source.name)
106 1
            else:
107 1
                log.info("Skipped dependency: %s", source.name)
108 1
                continue
109
110 1
            source.update_files(
111
                force=force,
112 1
                force_interactive=force_interactive,
113
                fetch=fetch,
114 1
                clean=clean,
115 1
                skip_changes=skip_changes,
116 1
            )
117
            source.create_link(self.root, force=force)
118 1
            common.newline()
119 1
            count += 1
120
121 1
            config = load_config(search=False)
122 1
            if config:
123 1
                common.indent()
124
                count += config.install_dependencies(
125 1
                    depth=None if depth is None else max(0, depth - 1),
126 1
                    update=update and recurse,
127 1
                    recurse=recurse,
128 1
                    force=force,
129 1
                    fetch=fetch,
130
                    clean=clean,
131 1
                    skip_changes=skip_changes,
132 1
                )
133 1
                common.dedent()
134 1
135
            shell.cd(self.location_path, _show=False)
136
137
        common.dedent()
138 1
        if sources_filter:
139
            log.error("No such dependency: %s", ' '.join(sources_filter))
140 1
            return 0
141
142 1
        return count
143
144 1
    def run_scripts(self, *names, depth=None, force=False):
145
        """Run scripts for the specified dependencies."""
146 1
        if depth == 0:
147
            log.info("Skipped directory: %s", self.location_path)
148 1
            return 0
149 1
150
        sources = self._get_sources()
151 1
        sources_filter = self._get_sources_filter(*names, sources=sources)
152 1
153 1
        shell.cd(self.location_path)
154
        common.newline()
155 1
        common.indent()
156 1
157 1
        count = 0
158 1
        for source in sources:
159 1
            if source.name in sources_filter:
160
                source.run_scripts(force=force)
161 1
                count += 1
162 1
163 1
                config = load_config(search=False)
164 1
                if config:
165
                    common.indent()
166 1
                    count += config.run_scripts(
167 1
                        depth=None if depth is None else max(0, depth - 1), force=force
168
                    )
169 1
                    common.dedent()
170
171 1
                shell.cd(self.location_path, _show=False)
172 1
173
        common.dedent()
174 1
175
        return count
176 1
177
    def lock_dependencies(self, *names, obey_existing=True, skip_changes=False):
178 1
        """Lock down the immediate dependency versions."""
179 1
        sources = self._get_sources(use_locked=obey_existing).copy()
180 1
        sources_filter = self._get_sources_filter(*names, sources=sources)
181
182 1
        shell.cd(self.location_path)
183
        common.newline()
184 1
        common.indent()
185 1
186
        count = 0
187 1
        for source in sources:
188 1
            if source.name not in sources_filter:
189 1
                log.info("Skipped dependency: %s", source.name)
190
                continue
191 1
192
            source_locked = source.lock(skip_changes=skip_changes)
193 1
194 1
            if source_locked is not None:
195 1
                try:
196
                    index = self.sources_locked.index(source)
197 1
                except ValueError:
198
                    self.sources_locked.append(source_locked)
199 1
                else:
200 1
                    self.sources_locked[index] = source_locked
201 1
                count += 1
202 1
203
            shell.cd(self.location_path, _show=False)
204
205
        if count:
206 1
            self.datafile.save()
207
208 1
        common.dedent()
209
210 1
        return count
211
212 1
    def uninstall_dependencies(self):
213
        """Delete the dependency storage location."""
214 1
        shell.cd(self.root)
215 1
        shell.rm(self.location_path)
216
        common.newline()
217 1
218
    def clean_dependencies(self):
219 1
        """Delete the dependency storage location."""
220 1
        for path in self.get_top_level_dependencies():
221 1
222
            if path == self.location_path:
223 1
                log.info("Skipped dependency: %s", path)
224 1
            else:
225
                shell.rm(path)
226 1
227 1
            common.newline()
228 1
229
        shell.rm(self.log_path)
230 1
231 1
    def get_top_level_dependencies(self):
232 1
        """Yield the path, repository, and hash of top-level dependencies."""
233
        if not os.path.exists(self.location_path):
234 1
            return
235 1
236
        shell.cd(self.location_path)
237 1
        common.newline()
238 1
        common.indent()
239 1
240 1
        for source in self.sources:
241 1
242
            yield os.path.join(self.location_path, source.name)
243 1
244
            shell.cd(self.location_path, _show=False)
245
246 1
        common.dedent()
247
248 1
    def get_dependencies(self, depth=None, allow_dirty=True):
249 1
        """Yield the path, repository, and hash of each dependency."""
250
        if not os.path.exists(self.location_path):
251 1
            return
252
253 1
        shell.cd(self.location_path)
254 1
        common.newline()
255
        common.indent()
256 1
257 1
        for source in self.sources:
258 1
259
            if depth == 0:
260 1
                log.info("Skipped dependency: %s", source.name)
261 1
                continue
262 1
263 1
            yield source.identify(allow_dirty=allow_dirty)
264 1
265
            config = load_config(search=False)
266 1
            if config:
267 1
                common.indent()
268
                yield from config.get_dependencies(
269 1
                    depth=None if depth is None else max(0, depth - 1),
270
                    allow_dirty=allow_dirty,
271 1
                )
272 1
                common.dedent()
273
274 1
            shell.cd(self.location_path, _show=False)
275
276 1
        common.dedent()
277
278
    def log(self, message="", *args):
279 1
        """Append a message to the log file."""
280 1
        with open(self.log_path, 'a') as outfile:
281 1
            outfile.write(message.format(*args) + '\n')
282 1
283 1
    def _get_sources(self, *, use_locked=None):
284
        """Merge source lists using the requested section as the base."""
285
        if use_locked is True:
286
            if self.sources_locked:
287
                return self.sources_locked
288
            log.info("No locked sources, defaulting to none...")
289
            return []
290
291
        sources: List[Source] = []
292
        if use_locked is False:
293
            sources = self.sources
294
        else:
295
            if self.sources_locked:
296
                log.info("Defaulting to locked sources...")
297
                sources = self.sources_locked
298
            else:
299
                log.info("No locked sources, using latest...")
300
                sources = self.sources
301
302
        extras = []
303
        for source in self.sources + self.sources_locked:
304
            if source not in sources:
305
                log.info("Source %r missing from selected section", source.name)
306
                extras.append(source)
307
308
        return sources + extras
309
310
    def _get_sources_filter(self, *names, sources):
311
        """Get filtered sublist of sources."""
312
        sources_filter = None
313
314
        groups_filter = [group for group in self.groups if group.name in list(names)]
315
316
        if groups_filter:
317
            sources_filter = [
318
                members for group in groups_filter for members in group.members
319
            ]
320
        else:
321
            sources_filter = list(names) if names else [s.name for s in sources]
322
323
        return sources_filter
324
325
326
def load_config(start=None, *, search=True):
327
    """Load the config for the current project."""
328
    if start:
329
        start = os.path.abspath(start)
330
    else:
331
        start = os.getcwd()
332
333
    if search:
334
        log.debug("Searching for config...")
335
336
    path = start
337
    while path != os.path.dirname(path):
338
        log.debug("Looking for config in: %s", path)
339
340
        for filename in os.listdir(path):
341
            if _valid_filename(filename):
342
                config = Config(path, filename)
343
                config._on_post_load()  # pylint: disable=protected-access
344
                log.debug("Found config: %s", config.path)
345
                return config
346
347
        if search:
348
            path = os.path.dirname(path)
349
        else:
350
            break
351
352
    if search:
353
        log.debug("No config found starting from: %s", start)
354
    else:
355
        log.debug("No config found in: %s", start)
356
357
    return None
358
359
360
def _valid_filename(filename):
361
    name, ext = os.path.splitext(filename.lower())
362
    if name.startswith('.'):
363
        name = name[1:]
364
    return name in ['gitman', 'gdm'] and ext in ['.yml', '.yaml']
365