Passed
Pull Request — develop (#193)
by
unknown
01:35
created

gitman.models.config.Config.install_dependencies()   C

Complexity

Conditions 9

Size

Total Lines 62
Code Lines 49

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 38
CRAP Score 9

Importance

Changes 0
Metric Value
cc 9
eloc 49
nop 9
dl 0
loc 62
ccs 38
cts 38
cp 1
crap 9
rs 6.3357
c 0
b 0
f 0

How to fix   Long Method    Many Parameters   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Many Parameters

Methods with many parameters are not only hard to understand, but their parameters also often become inconsistent when you need more, or different data.

There are several approaches to avoid long parameter lists:

1 1
import logging
2 1
import os
3
from typing import List
4 1
5 1
import yorm
0 ignored issues
show
introduced by
Unable to import 'yorm'
Loading history...
6
from yorm.types import SortedList, String
0 ignored issues
show
introduced by
Unable to import 'yorm.types'
Loading history...
7 1
8 1
from .. import common, exceptions, shell
9 1
from .group import Group
10
from .source import Source
11 1
12
13
log = logging.getLogger(__name__)
14 1
15 1
16 1
@yorm.attr(location=String)
17 1
@yorm.attr(sources=SortedList.of_type(Source))
18 1
@yorm.attr(sources_locked=SortedList.of_type(Source))
19
@yorm.attr(groups=SortedList.of_type(Group))
20
@yorm.sync("{self.root}/{self.filename}", auto_save=False)
21 1
class Config(yorm.ModelMixin):
22
    """Specifies all dependencies for a project."""
23 1
24
    LOG = "gitman.log"
25 1
26 1
    def __init__(self, root=None, filename="gitman.yml", location="gitman_sources"):
27 1
        super().__init__()
28 1
        self.root = root or os.getcwd()
29 1
        self.filename = filename
30 1
        self.location = location
31
        self.sources: List[Source] = []
32 1
        self.sources_locked: List[Source] = []
33
        self.groups: List[Group] = []
34
35 1
    def _on_post_load(self):
36 1
        for source in self.sources:
37
            source._on_post_load()  # pylint: disable=protected-access
38 1
39
        for source in self.sources_locked:
40
            source._on_post_load()  # pylint: disable=protected-access
41 1
42
        # check for conflicts between source names and group names
43 1
        for source in self.sources:
44
            for group in self.groups:
45
                if source.name == group.name:
46 1
                    msg = (
47
                        "Name conflict detected between source name and "
48 1
                        "group name \"{}\""
49
                    ).format(source.name)
50 1
                    raise exceptions.InvalidConfig(msg)
51 1
52 1
    @property
53 1
    def config_path(self):
54 1
        """Get the full path to the config file."""
55 1
        return os.path.normpath(os.path.join(self.root, self.filename))
56 1
57
    path = config_path
58 1
59
    @property
60 1
    def log_path(self):
61
        """Get the full path to the log file."""
62
        return os.path.normpath(os.path.join(self.location_path, self.LOG))
63
64 1
    @property
65 1
    def location_path(self):
66 1
        """Get the full path to the dependency storage location."""
67
        return os.path.normpath(os.path.join(self.root, self.location))
68 1
69 1
    def get_path(self, name=None):
70
        """Get the full path to a dependency or internal file."""
71 1
        base = self.location_path
72 1
        if name == '__config__':
73 1
            return self.path
74 1
        if name == '__log__':
75 1
            return self.log_path
76
        if name:
77 1
            return os.path.normpath(os.path.join(base, name))
78 1
        return base
79 1
80 1
    def install_dependencies(
81
        self,
82 1
        *names,
83 1
        depth=None,
84
        update=True,
85 1
        recurse=False,
86 1
        force=False,
87 1
        fetch=False,
88 1
        clean=True,
89
        skip_changes=False,
90 1
    ):
91 1
        """Download or update the specified dependencies."""
92 1
        if depth == 0:
93 1
            log.info("Skipped directory: %s", self.location_path)
94
            return 0
95
96
        sources = self._get_sources(use_locked=False if update else None)
97
        sources_filter = self._get_sources_filter(*names, sources=sources)
98
99
        if not os.path.isdir(self.location_path):
100
            shell.mkdir(self.location_path)
101 1
        shell.cd(self.location_path)
102
        common.newline()
103 1
        common.indent()
104
105 1
        count = 0
106 1
        for source in sources:
107 1
            if source.name in sources_filter:
108 1
                sources_filter.remove(source.name)
109
            else:
110 1
                log.info("Skipped dependency: %s", source.name)
111
                continue
112 1
113
            source.update_files(
114 1
                force=force, fetch=fetch, clean=clean, skip_changes=skip_changes
115 1
            )
116 1
            source.create_link(self.root, force=force)
117
            common.newline()
118 1
            count += 1
119 1
120
            config = load_config(search=False)
121 1
            if config:
122 1
                common.indent()
123 1
                count += config.install_dependencies(
124
                    depth=None if depth is None else max(0, depth - 1),
125 1
                    update=update and recurse,
126 1
                    recurse=recurse,
127 1
                    force=force,
128 1
                    fetch=fetch,
129 1
                    clean=clean,
130
                    skip_changes=skip_changes,
131 1
                )
132 1
                common.dedent()
133 1
134 1
            shell.cd(self.location_path, _show=False)
135
136
        common.dedent()
137
        if sources_filter:
138 1
            log.error("No such dependency: %s", ' '.join(sources_filter))
139
            return 0
140 1
141
        return count
142 1
143
    def run_scripts(self, *names, depth=None, force=False):
144 1
        """Run scripts for the specified dependencies."""
145
        if depth == 0:
146 1
            log.info("Skipped directory: %s", self.location_path)
147
            return 0
148 1
149 1
        sources = self._get_sources()
150
        sources_filter = self._get_sources_filter(*names, sources=sources)
151 1
152 1
        shell.cd(self.location_path)
153 1
        common.newline()
154
        common.indent()
155 1
156 1
        count = 0
157 1
        for source in sources:
158 1
            if source.name in sources_filter:
159 1
                source.run_scripts(force=force)
160
                count += 1
161 1
162 1
                config = load_config(search=False)
163 1
                if config:
164 1
                    common.indent()
165
                    count += config.run_scripts(
166 1
                        depth=None if depth is None else max(0, depth - 1), force=force
167 1
                    )
168
                    common.dedent()
169 1
170
                shell.cd(self.location_path, _show=False)
171 1
172 1
        common.dedent()
173
174 1
        return count
175
176 1
    def lock_dependencies(self, *names, obey_existing=True, skip_changes=False):
177
        """Lock down the immediate dependency versions."""
178 1
        sources = self._get_sources(use_locked=obey_existing).copy()
179 1
        sources_filter = self._get_sources_filter(*names, sources=sources)
180 1
181
        shell.cd(self.location_path)
182 1
        common.newline()
183
        common.indent()
184 1
185 1
        count = 0
186
        for source in sources:
187 1
            if source.name not in sources_filter:
188 1
                log.info("Skipped dependency: %s", source.name)
189 1
                continue
190
191 1
            source_locked = source.lock(skip_changes=skip_changes)
192
193 1
            if source_locked is not None:
194 1
                try:
195 1
                    index = self.sources_locked.index(source)
196
                except ValueError:
197 1
                    self.sources_locked.append(source_locked)
198
                else:
199 1
                    self.sources_locked[index] = source_locked
200 1
                count += 1
201 1
202 1
            shell.cd(self.location_path, _show=False)
203
204
        if count:
205
            self.save()
206 1
207
        return count
208 1
209
    def uninstall_dependencies(self):
210 1
        """Delete the dependency storage location."""
211
        shell.cd(self.root)
212 1
        shell.rm(self.location_path)
213
        common.newline()
214 1
215 1
    def clean_dependencies(self):
216
        """Delete the dependency storage location."""
217 1
        for path in self.get_top_level_dependencies():
218
219 1
            if path == self.location_path:
220 1
                log.info("Skipped dependency: %s", path)
221 1
            else:
222
                shell.rm(path)
223 1
224 1
            common.newline()
225
226 1
        shell.rm(self.log_path)
227 1
228 1
    def get_top_level_dependencies(self):
229
        """Yield the path, repository, and hash of top-level dependencies."""
230 1
        if not os.path.exists(self.location_path):
231 1
            return
232 1
233
        shell.cd(self.location_path)
234 1
        common.newline()
235 1
        common.indent()
236
237 1
        for source in self.sources:
238 1
239 1
            yield os.path.join(self.location_path, source.name)
240 1
241 1
            shell.cd(self.location_path, _show=False)
242
243 1
        common.dedent()
244
245
    def get_dependencies(self, depth=None, allow_dirty=True):
246 1
        """Yield the path, repository, and hash of each dependency."""
247
        if not os.path.exists(self.location_path):
248 1
            return
249 1
250
        shell.cd(self.location_path)
251 1
        common.newline()
252
        common.indent()
253 1
254 1
        for source in self.sources:
255
256 1
            if depth == 0:
257 1
                log.info("Skipped dependency: %s", source.name)
258 1
                continue
259
260 1
            yield source.identify(allow_dirty=allow_dirty)
261 1
262 1
            config = load_config(search=False)
263 1
            if config:
264 1
                common.indent()
265
                yield from config.get_dependencies(
266 1
                    depth=None if depth is None else max(0, depth - 1),
267 1
                    allow_dirty=allow_dirty,
268
                )
269 1
                common.dedent()
270
271 1
            shell.cd(self.location_path, _show=False)
272 1
273
        common.dedent()
274 1
275
    def log(self, message="", *args):
276 1
        """Append a message to the log file."""
277
        with open(self.log_path, 'a') as outfile:
278
            outfile.write(message.format(*args) + '\n')
279 1
280 1
    def _get_sources(self, *, use_locked=None):
281 1
        """Merge source lists using the requested section as the base."""
282 1
        if use_locked is True:
283 1
            if self.sources_locked:
284
                return self.sources_locked
285
            log.info("No locked sources, defaulting to none...")
286
            return []
287
288
        sources: List[Source] = []
289
        if use_locked is False:
290
            sources = self.sources
291
        else:
292
            if self.sources_locked:
293
                log.info("Defaulting to locked sources...")
294
                sources = self.sources_locked
295
            else:
296
                log.info("No locked sources, using latest...")
297
                sources = self.sources
298
299
        extras = []
300
        for source in self.sources + self.sources_locked:
301
            if source not in sources:
302
                log.info("Source %r missing from selected section", source.name)
303
                extras.append(source)
304
305
        return sources + extras
306
307
    def _get_sources_filter(self, *names, sources):
308
        """Get filtered sublist of sources."""
309
        sources_filter = None
310
311
        groups_filter = [group for group in self.groups if group.name in list(names)]
312
313
        if groups_filter:
314
            sources_filter = [
315
                members for group in groups_filter for members in group.members
316
            ]
317
        else:
318
            sources_filter = list(names) if names else [s.name for s in sources]
319
320
        return sources_filter
321
322
323
def load_config(start=None, *, search=True):
324
    """Load the config for the current project."""
325
    if start:
326
        start = os.path.abspath(start)
327
    else:
328
        start = os.getcwd()
329
330
    if search:
331
        log.debug("Searching for config...")
332
333
    path = start
334
    while path != os.path.dirname(path):
335
        log.debug("Looking for config in: %s", path)
336
337
        for filename in os.listdir(path):
338
            if _valid_filename(filename):
339
                config = Config(path, filename)
340
                config._on_post_load()  # pylint: disable=protected-access
341
                log.debug("Found config: %s", config.path)
342
                return config
343
344
        if search:
345
            path = os.path.dirname(path)
346
        else:
347
            break
348
349
    if search:
350
        log.debug("No config found starting from: %s", start)
351
    else:
352
        log.debug("No config found in: %s", start)
353
354
    return None
355
356
357
def _valid_filename(filename):
358
    name, ext = os.path.splitext(filename.lower())
359
    if name.startswith('.'):
360
        name = name[1:]
361
    return name in ['gitman', 'gdm'] and ext in ['.yml', '.yaml']
362