Passed
Pull Request — develop (#193)
by Jace
01:40
created

gitman.models.config.Config.install_dependencies()   C

Complexity

Conditions 9

Size

Total Lines 62
Code Lines 49

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 38
CRAP Score 9

Importance

Changes 0
Metric Value
cc 9
eloc 49
nop 9
dl 0
loc 62
ccs 38
cts 38
cp 1
crap 9
rs 6.3357
c 0
b 0
f 0

How to fix   Long Method    Many Parameters   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Many Parameters

Methods with many parameters are not only hard to understand, but their parameters also often become inconsistent when you need more, or different data.

There are several approaches to avoid long parameter lists:

1 1
import logging
2 1
import os
3
from typing import List
4 1
5 1
import yorm
0 ignored issues
show
introduced by
Unable to import 'yorm'
Loading history...
6
from yorm.types import SortedList, String
0 ignored issues
show
introduced by
Unable to import 'yorm.types'
Loading history...
7 1
8 1
from .. import common, exceptions, shell
9 1
from .group import Group
10
from .source import Source
11 1
12
13
log = logging.getLogger(__name__)
14 1
15 1
16 1
@yorm.attr(location=String)
17 1
@yorm.attr(sources=SortedList.of_type(Source))
18 1
@yorm.attr(sources_locked=SortedList.of_type(Source))
19
@yorm.attr(groups=SortedList.of_type(Group))
20
@yorm.sync("{self.root}/{self.filename}", auto_save=False)
21 1
class Config(yorm.ModelMixin):
22
    """Specifies all dependencies for a project."""
23 1
24
    LOG = "gitman.log"
25 1
26 1
    def __init__(self, root=None, filename="gitman.yml", location="gitman_sources"):
27 1
        super().__init__()
28 1
        self.root = root or os.getcwd()
29 1
        self.filename = filename
30 1
        self.location = location
31
        self.sources: List[Source] = []
32 1
        self.sources_locked: List[Source] = []
33
        self.groups: List[Group] = []
34
35 1
    def _on_post_load(self):
36 1
        for source in self.sources:
37
            source._on_post_load()  # pylint: disable=protected-access
38 1
39
        for source in self.sources_locked:
40
            source._on_post_load()  # pylint: disable=protected-access
41 1
42
        # check for conflicts between source names and group names
43 1
        for source in self.sources:
44
            for group in self.groups:
45
                if source.name == group.name:
46 1
                    msg = (
47
                        "Name conflict detected between source name and "
48 1
                        "group name \"{}\""
49
                    ).format(source.name)
50 1
                    raise exceptions.InvalidConfig(msg)
51 1
52 1
    @property
53 1
    def config_path(self):
54 1
        """Get the full path to the config file."""
55 1
        return os.path.normpath(os.path.join(self.root, self.filename))
56 1
57
    path = config_path
58 1
59
    @property
60 1
    def log_path(self):
61
        """Get the full path to the log file."""
62
        return os.path.normpath(os.path.join(self.location_path, self.LOG))
63
64 1
    @property
65 1
    def location_path(self):
66 1
        """Get the full path to the dependency storage location."""
67
        return os.path.normpath(os.path.join(self.root, self.location))
68 1
69 1
    def get_path(self, name=None):
70
        """Get the full path to a dependency or internal file."""
71 1
        base = self.location_path
72 1
        if name == '__config__':
73 1
            return self.path
74 1
        if name == '__log__':
75 1
            return self.log_path
76
        if name:
77 1
            return os.path.normpath(os.path.join(base, name))
78 1
        return base
79 1
80 1
    def install_dependencies(
81
        self,
82 1
        *names,
83 1
        depth=None,
84
        update=True,
85 1
        recurse=False,
86 1
        force=False,
87 1
        fetch=False,
88 1
        clean=True,
89
        skip_changes=False,
90 1
    ):
91 1
        """Download or update the specified dependencies."""
92 1
        if depth == 0:
93 1
            log.info("Skipped directory: %s", self.location_path)
94
            return 0
95
96
        sources = self._get_sources(use_locked=False if update else None)
97
        sources_filter = self._get_sources_filter(*names, sources=sources)
98
99
        if not os.path.isdir(self.location_path):
100
            shell.mkdir(self.location_path)
101 1
        shell.cd(self.location_path)
102
        common.newline()
103 1
        common.indent()
104
105 1
        count = 0
106 1
        for source in sources:
107 1
            if source.name in sources_filter:
108 1
                sources_filter.remove(source.name)
109
            else:
110 1
                log.info("Skipped dependency: %s", source.name)
111
                continue
112 1
113
            source.update_files(
114 1
                force=force, fetch=fetch, clean=clean, skip_changes=skip_changes
115 1
            )
116 1
            source.create_link(self.root, force=force)
117
            common.newline()
118 1
            count += 1
119 1
120
            config = load_config(search=False)
121 1
            if config:
122 1
                common.indent()
123 1
                count += config.install_dependencies(
124
                    depth=None if depth is None else max(0, depth - 1),
125 1
                    update=update and recurse,
126 1
                    recurse=recurse,
127 1
                    force=force,
128 1
                    fetch=fetch,
129 1
                    clean=clean,
130
                    skip_changes=skip_changes,
131 1
                )
132 1
                common.dedent()
133 1
134 1
            shell.cd(self.location_path, _show=False)
135
136
        common.dedent()
137
        if sources_filter:
138 1
            log.error("No such dependency: %s", ' '.join(sources_filter))
139
            return 0
140 1
141
        return count
142 1
143
    def run_scripts(self, *names, depth=None, force=False):
144 1
        """Run scripts for the specified dependencies."""
145
        if depth == 0:
146 1
            log.info("Skipped directory: %s", self.location_path)
147
            return 0
148 1
149 1
        sources = self._get_sources()
150
        sources_filter = self._get_sources_filter(*names, sources=sources)
151 1
152 1
        shell.cd(self.location_path)
153 1
        common.newline()
154
        common.indent()
155 1
156 1
        count = 0
157 1
        for source in sources:
158 1
            if source.name in sources_filter:
159 1
                source.run_scripts(force=force)
160
                count += 1
161 1
162 1
                config = load_config(search=False)
163 1
                if config:
164 1
                    common.indent()
165
                    count += config.run_scripts(
166 1
                        depth=None if depth is None else max(0, depth - 1), force=force
167 1
                    )
168
                    common.dedent()
169 1
170
                shell.cd(self.location_path, _show=False)
171 1
172 1
        common.dedent()
173
174 1
        return count
175
176 1
    def lock_dependencies(self, *names, obey_existing=True, skip_changes=False):
177
        """Lock down the immediate dependency versions."""
178 1
        sources = self._get_sources(use_locked=obey_existing).copy()
179 1
        sources_filter = self._get_sources_filter(*names, sources=sources)
180 1
181
        shell.cd(self.location_path)
182 1
        common.newline()
183
        common.indent()
184 1
185 1
        count = 0
186
        for source in sources:
187 1
            if source.name not in sources_filter:
188 1
                log.info("Skipped dependency: %s", source.name)
189 1
                continue
190
191 1
            source_locked = source.lock(skip_changes=skip_changes)
192
193 1
            if source_locked is not None:
194 1
                try:
195 1
                    index = self.sources_locked.index(source)
196
                except ValueError:
197 1
                    self.sources_locked.append(source_locked)
198
                else:
199 1
                    self.sources_locked[index] = source_locked
200 1
                count += 1
201 1
202 1
            shell.cd(self.location_path, _show=False)
203
204
        if count:
205
            self.save()
206 1
207
        common.dedent()
208 1
209
        return count
210 1
211
    def uninstall_dependencies(self):
212 1
        """Delete the dependency storage location."""
213
        shell.cd(self.root)
214 1
        shell.rm(self.location_path)
215 1
        common.newline()
216
217 1
    def clean_dependencies(self):
218
        """Delete the dependency storage location."""
219 1
        for path in self.get_top_level_dependencies():
220 1
221 1
            if path == self.location_path:
222
                log.info("Skipped dependency: %s", path)
223 1
            else:
224 1
                shell.rm(path)
225
226 1
            common.newline()
227 1
228 1
        shell.rm(self.log_path)
229
230 1
    def get_top_level_dependencies(self):
231 1
        """Yield the path, repository, and hash of top-level dependencies."""
232 1
        if not os.path.exists(self.location_path):
233
            return
234 1
235 1
        shell.cd(self.location_path)
236
        common.newline()
237 1
        common.indent()
238 1
239 1
        for source in self.sources:
240 1
241 1
            yield os.path.join(self.location_path, source.name)
242
243 1
            shell.cd(self.location_path, _show=False)
244
245
        common.dedent()
246 1
247
    def get_dependencies(self, depth=None, allow_dirty=True):
248 1
        """Yield the path, repository, and hash of each dependency."""
249 1
        if not os.path.exists(self.location_path):
250
            return
251 1
252
        shell.cd(self.location_path)
253 1
        common.newline()
254 1
        common.indent()
255
256 1
        for source in self.sources:
257 1
258 1
            if depth == 0:
259
                log.info("Skipped dependency: %s", source.name)
260 1
                continue
261 1
262 1
            yield source.identify(allow_dirty=allow_dirty)
263 1
264 1
            config = load_config(search=False)
265
            if config:
266 1
                common.indent()
267 1
                yield from config.get_dependencies(
268
                    depth=None if depth is None else max(0, depth - 1),
269 1
                    allow_dirty=allow_dirty,
270
                )
271 1
                common.dedent()
272 1
273
            shell.cd(self.location_path, _show=False)
274 1
275
        common.dedent()
276 1
277
    def log(self, message="", *args):
278
        """Append a message to the log file."""
279 1
        with open(self.log_path, 'a') as outfile:
280 1
            outfile.write(message.format(*args) + '\n')
281 1
282 1
    def _get_sources(self, *, use_locked=None):
283 1
        """Merge source lists using the requested section as the base."""
284
        if use_locked is True:
285
            if self.sources_locked:
286
                return self.sources_locked
287
            log.info("No locked sources, defaulting to none...")
288
            return []
289
290
        sources: List[Source] = []
291
        if use_locked is False:
292
            sources = self.sources
293
        else:
294
            if self.sources_locked:
295
                log.info("Defaulting to locked sources...")
296
                sources = self.sources_locked
297
            else:
298
                log.info("No locked sources, using latest...")
299
                sources = self.sources
300
301
        extras = []
302
        for source in self.sources + self.sources_locked:
303
            if source not in sources:
304
                log.info("Source %r missing from selected section", source.name)
305
                extras.append(source)
306
307
        return sources + extras
308
309
    def _get_sources_filter(self, *names, sources):
310
        """Get filtered sublist of sources."""
311
        sources_filter = None
312
313
        groups_filter = [group for group in self.groups if group.name in list(names)]
314
315
        if groups_filter:
316
            sources_filter = [
317
                members for group in groups_filter for members in group.members
318
            ]
319
        else:
320
            sources_filter = list(names) if names else [s.name for s in sources]
321
322
        return sources_filter
323
324
325
def load_config(start=None, *, search=True):
326
    """Load the config for the current project."""
327
    if start:
328
        start = os.path.abspath(start)
329
    else:
330
        start = os.getcwd()
331
332
    if search:
333
        log.debug("Searching for config...")
334
335
    path = start
336
    while path != os.path.dirname(path):
337
        log.debug("Looking for config in: %s", path)
338
339
        for filename in os.listdir(path):
340
            if _valid_filename(filename):
341
                config = Config(path, filename)
342
                config._on_post_load()  # pylint: disable=protected-access
343
                log.debug("Found config: %s", config.path)
344
                return config
345
346
        if search:
347
            path = os.path.dirname(path)
348
        else:
349
            break
350
351
    if search:
352
        log.debug("No config found starting from: %s", start)
353
    else:
354
        log.debug("No config found in: %s", start)
355
356
    return None
357
358
359
def _valid_filename(filename):
360
    name, ext = os.path.splitext(filename.lower())
361
    if name.startswith('.'):
362
        name = name[1:]
363
    return name in ['gitman', 'gdm'] and ext in ['.yml', '.yaml']
364