gitman.models.config.Config.install_dependencies()   C
last analyzed

Complexity

Conditions 9

Size

Total Lines 67
Code Lines 54

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 45
CRAP Score 9

Importance

Changes 0
Metric Value
cc 9
eloc 54
nop 10
dl 0
loc 67
ccs 45
cts 45
cp 1
crap 9
rs 6.1721
c 0
b 0
f 0

How to fix   Long Method    Many Parameters   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Many Parameters

Methods with many parameters are not only hard to understand, but their parameters also often become inconsistent when you need more, or different data.

There are several approaches to avoid long parameter lists:

1 1
import os
2 1
from typing import List, Optional
3
4 1
import log
5 1
from datafiles import datafile, field
6
7 1
from .. import common, exceptions, shell
8 1
from .group import Group
9 1
from .source import Source
10
11 1
12
@datafile("{self.root}/{self.filename}", defaults=True, manual=True)
13
class Config:
14 1
    """Specifies all dependencies for a project."""
15 1
16 1
    root: Optional[str] = None
17 1
    filename: str = "gitman.yml"
18 1
19
    location: str = "gitman_sources"
20
    sources: List[Source] = field(default_factory=list)
21 1
    sources_locked: List[Source] = field(default_factory=list)
22
    groups: List[Group] = field(default_factory=list)
23 1
24
    def __post_init__(self):
25 1
        if self.root is None:
26 1
            self.root = os.getcwd()
27 1
28 1
    @property
29 1
    def config_path(self):
30 1
        """Get the full path to the config file."""
31
        assert self.root
32 1
        return os.path.normpath(os.path.join(self.root, self.filename))
33
34
    path = config_path
35 1
36 1
    @property
37
    def log_path(self):
38 1
        """Get the full path to the log file."""
39
        return os.path.normpath(os.path.join(self.location_path, "gitman.log"))
40
41 1
    @property
42
    def location_path(self):
43 1
        """Get the full path to the dependency storage location."""
44
        assert self.root
45
        return os.path.normpath(os.path.join(self.root, self.location))
46 1
47
    def validate(self):
48 1
        """Check for conflicts between source names and group names."""
49
        for source in self.sources:
50 1
            for group in self.groups:
51 1
                if source.name == group.name:
52 1
                    msg = (
53 1
                        "Name conflict detected between source name and "
54 1
                        "group name \"{}\""
55 1
                    ).format(source.name)
56 1
                    raise exceptions.InvalidConfig(msg)
57
58 1
    def get_path(self, name=None):
59
        """Get the full path to a dependency or internal file."""
60 1
        base = self.location_path
61
        if name == '__config__':
62
            return self.path
63
        if name == '__log__':
64 1
            return self.log_path
65 1
        if name:
66 1
            return os.path.normpath(os.path.join(base, name))
67
        return base
68 1
69 1
    def install_dependencies(
70
        self,
71 1
        *names,
72 1
        depth=None,
73 1
        update=True,
74 1
        recurse=False,
75 1
        force=False,
76
        force_interactive=False,
77 1
        fetch=False,
78 1
        clean=True,
79 1
        skip_changes=False,
80 1
    ):  # pylint: disable=too-many-locals
81
        """Download or update the specified dependencies."""
82 1
        if depth == 0:
83 1
            log.info("Skipped directory: %s", self.location_path)
84
            return 0
85 1
86 1
        sources = self._get_sources(use_locked=False if update else None)
87 1
        sources_filter = self._get_sources_filter(*names, sources=sources)
88 1
89
        if not os.path.isdir(self.location_path):
90 1
            shell.mkdir(self.location_path)
91 1
        shell.cd(self.location_path)
92 1
        common.newline()
93 1
        common.indent()
94
95
        count = 0
96
        for source in sources:
97
            if source.name in sources_filter:
98
                sources_filter.remove(source.name)
99
            else:
100
                log.info("Skipped dependency: %s", source.name)
101 1
                continue
102
103 1
            source.update_files(
104
                force=force,
105 1
                force_interactive=force_interactive,
106 1
                fetch=fetch,
107 1
                clean=clean,
108 1
                skip_changes=skip_changes,
109
            )
110 1
            source.create_link(self.root, force=force)
111
            common.newline()
112 1
            count += 1
113
114 1
            config = load_config(search=False)
115 1
            if config:
116 1
                common.indent()
117
                count += config.install_dependencies(
118 1
                    depth=None if depth is None else max(0, depth - 1),
119 1
                    update=update and recurse,
120
                    recurse=recurse,
121 1
                    force=force,
122 1
                    fetch=fetch,
123 1
                    clean=clean,
124
                    skip_changes=skip_changes,
125 1
                )
126 1
                common.dedent()
127 1
128 1
            shell.cd(self.location_path, _show=False)
129 1
130
        common.dedent()
131 1
        if sources_filter:
132 1
            log.error("No such dependency: %s", ' '.join(sources_filter))
133 1
            return 0
134 1
135
        return count
136
137
    def run_scripts(self, *names, depth=None, force=False, show_shell_stdout=False):
138 1
        """Run scripts for the specified dependencies."""
139
        if depth == 0:
140 1
            log.info("Skipped directory: %s", self.location_path)
141
            return 0
142 1
143
        sources = self._get_sources()
144 1
        sources_filter = self._get_sources_filter(*names, sources=sources)
145
146 1
        shell.cd(self.location_path)
147
        common.newline()
148 1
        common.indent()
149 1
150
        count = 0
151 1
        for source in sources:
152 1
            if source.name in sources_filter:
153 1
                source.run_scripts(force=force, show_shell_stdout=show_shell_stdout)
154
                count += 1
155 1
156 1
                config = load_config(search=False)
157 1
                if config:
158 1
                    common.indent()
159 1
                    count += config.run_scripts(
160
                        depth=None if depth is None else max(0, depth - 1), force=force
161 1
                    )
162 1
                    common.dedent()
163 1
164 1
                shell.cd(self.location_path, _show=False)
165
166 1
        common.dedent()
167 1
168
        return count
169 1
170
    def lock_dependencies(self, *names, obey_existing=True, skip_changes=False):
171 1
        """Lock down the immediate dependency versions."""
172 1
        sources = self._get_sources(use_locked=obey_existing).copy()
173
        sources_filter = self._get_sources_filter(*names, sources=sources)
174 1
175
        shell.cd(self.location_path)
176 1
        common.newline()
177
        common.indent()
178 1
179 1
        count = 0
180 1
        for source in sources:
181
            if source.name not in sources_filter:
182 1
                log.info("Skipped dependency: %s", source.name)
183
                continue
184 1
185 1
            source_locked = source.lock(skip_changes=skip_changes)
186
187 1
            if source_locked is not None:
188 1
                try:
189 1
                    index = self.sources_locked.index(source)
190
                except ValueError:
191 1
                    self.sources_locked.append(source_locked)
192
                else:
193 1
                    self.sources_locked[index] = source_locked
194 1
                count += 1
195 1
196
            shell.cd(self.location_path, _show=False)
197 1
198
        if count:
199 1
            self.datafile.save()
200 1
201 1
        common.dedent()
202 1
203
        return count
204
205
    def uninstall_dependencies(self):
206 1
        """Delete the dependency storage location."""
207
        shell.cd(self.root)
208 1
        shell.rm(self.location_path)
209
        common.newline()
210 1
211
    def clean_dependencies(self):
212 1
        """Delete the dependency storage location."""
213
        for path in self.get_top_level_dependencies():
214 1
215 1
            if path == self.location_path:
216
                log.info("Skipped dependency: %s", path)
217 1
            else:
218
                shell.rm(path)
219 1
220 1
            common.newline()
221 1
222
        shell.rm(self.log_path)
223 1
224 1
    def get_top_level_dependencies(self):
225
        """Yield the path, repository, and hash of top-level dependencies."""
226 1
        if not os.path.exists(self.location_path):
227 1
            return
228 1
229
        shell.cd(self.location_path)
230 1
        common.newline()
231 1
        common.indent()
232 1
233
        for source in self.sources:
234 1
235 1
            assert source.name
236
            yield os.path.join(self.location_path, source.name)
237 1
238 1
            shell.cd(self.location_path, _show=False)
239 1
240 1
        common.dedent()
241 1
242
    def get_dependencies(self, depth=None, allow_dirty=True):
243 1
        """Yield the path, repository, and hash of each dependency."""
244
        if not os.path.exists(self.location_path):
245
            return
246 1
247
        shell.cd(self.location_path)
248 1
        common.newline()
249 1
        common.indent()
250
251 1
        for source in self.sources:
252
253 1
            if depth == 0:
254 1
                log.info("Skipped dependency: %s", source.name)
255
                continue
256 1
257 1
            yield source.identify(allow_dirty=allow_dirty)
258 1
259
            config = load_config(search=False)
260 1
            if config:
261 1
                common.indent()
262 1
                yield from config.get_dependencies(
263 1
                    depth=None if depth is None else max(0, depth - 1),
264 1
                    allow_dirty=allow_dirty,
265
                )
266 1
                common.dedent()
267 1
268
            shell.cd(self.location_path, _show=False)
269 1
270
        common.dedent()
271 1
272 1
    def log(self, message="", *args):
273
        """Append a message to the log file."""
274 1
        with open(self.log_path, 'a') as outfile:
275
            outfile.write(message.format(*args) + '\n')
276 1
277
    def _get_sources(self, *, use_locked=None):
278
        """Merge source lists using the requested section as the base."""
279 1
        if use_locked is True:
280 1
            if self.sources_locked:
281 1
                return self.sources_locked
282 1
            log.info("No locked sources, defaulting to none...")
283 1
            return []
284
285
        sources: List[Source] = []
286
        if use_locked is False:
287
            sources = self.sources
288
        else:
289
            if self.sources_locked:
290
                log.info("Defaulting to locked sources...")
291
                sources = self.sources_locked
292
            else:
293
                log.info("No locked sources, using latest...")
294
                sources = self.sources
295
296
        extras = []
297
        for source in self.sources + self.sources_locked:
298
            if source not in sources:
299
                log.info("Source %r missing from selected section", source.name)
300
                extras.append(source)
301
302
        return sources + extras
303
304
    def _get_sources_filter(self, *names, sources):
305
        """Get filtered sublist of sources."""
306
        sources_filter = None
307
308
        groups_filter = [group for group in self.groups if group.name in list(names)]
309
310
        if groups_filter:
311
            sources_filter = [
312
                members for group in groups_filter for members in group.members
313
            ]
314
        else:
315
            sources_filter = list(names) if names else [s.name for s in sources]
316
317
        return sources_filter
318
319
320
def load_config(start=None, *, search=True):
321
    """Load the config for the current project."""
322
    if start:
323
        start = os.path.abspath(start)
324
    else:
325
        start = os.getcwd()
326
327
    if search:
328
        log.debug("Searching for config...")
329
330
    path = start
331
    while path != os.path.dirname(path):
332
        log.debug("Looking for config in: %s", path)
333
334
        for filename in os.listdir(path):
335
            if _valid_filename(filename):
336
                config = Config(path, filename)
337
                config.validate()
338
                log.debug("Found config: %s", config.path)
339
                return config
340
341
        if search:
342
            path = os.path.dirname(path)
343
        else:
344
            break
345
346
    if search:
347
        log.debug("No config found starting from: %s", start)
348
    else:
349
        log.debug("No config found in: %s", start)
350
351
    return None
352
353
354
def _valid_filename(filename):
355
    name, ext = os.path.splitext(filename.lower())
356
    if name.startswith('.'):
357
        name = name[1:]
358
    return name in {'gitman', 'gdm'} and ext in {'.yml', '.yaml'}
359