Completed
Push — develop ( f5b93c...24eb3d )
by Jace
11s
created

gitman.models.config.Config.install_dependencies()   C

Complexity

Conditions 10

Size

Total Lines 51
Code Lines 40

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 33
CRAP Score 10

Importance

Changes 0
Metric Value
cc 10
eloc 40
nop 8
dl 0
loc 51
ccs 33
cts 33
cp 1
crap 10
rs 5.9999
c 0
b 0
f 0

How to fix   Long Method    Complexity    Many Parameters   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like gitman.models.config.Config.install_dependencies() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

Many Parameters

Methods with many parameters are not only hard to understand, but their parameters also often become inconsistent when you need more, or different data.

There are several approaches to avoid long parameter lists:

1 1
import logging
2 1
import os
3
4 1
import yorm
5 1
from yorm.types import SortedList, String
6
7 1
from . import Source
8 1
from .. import common, shell
9 1
10
11 1
log = logging.getLogger(__name__)
12
13
14 1
@yorm.attr(location=String)
15 1
@yorm.attr(sources=SortedList.of_type(Source))
16 1
@yorm.attr(sources_locked=SortedList.of_type(Source))
17 1
@yorm.sync("{self.root}/{self.filename}", auto_save=False)
18 1
class Config(yorm.ModelMixin):
19
    """Specifies all dependencies for a project."""
20
21 1
    LOG = "gitman.log"
22
23 1
    def __init__(self, root=None,
24
                 filename="gitman.yml", location="gitman_sources"):
25 1
        super().__init__()
26 1
        self.root = root or os.getcwd()
27 1
        self.filename = filename
28 1
        self.location = location
29 1
        self.sources = []
30 1
        self.sources_locked = []
31
32 1
    def _on_post_load(self):
33
        for source in self.sources:
34
            source._on_post_load()  # pylint: disable=protected-access
35 1
        for source in self.sources_locked:
36 1
            source._on_post_load()  # pylint: disable=protected-access
37
38 1
    @property
39
    def config_path(self):
40
        """Get the full path to the config file."""
41 1
        return os.path.normpath(os.path.join(self.root, self.filename))
42
    path = config_path
43 1
44
    @property
45
    def log_path(self):
46 1
        """Get the full path to the log file."""
47
        return os.path.normpath(os.path.join(self.location_path, self.LOG))
48 1
49
    @property
50 1
    def location_path(self):
51 1
        """Get the full path to the dependency storage location."""
52 1
        return os.path.normpath(os.path.join(self.root, self.location))
53 1
54 1
    def get_path(self, name=None):
55 1
        """Get the full path to a dependency or internal file."""
56 1
        base = self.location_path
57
        if name == '__config__':
58 1
            return self.path
59
        if name == '__log__':
60 1
            return self.log_path
61
        if name:
62
            return os.path.normpath(os.path.join(base, name))
63
        return base
64 1
65 1
    def install_dependencies(self, *names, depth=None,
66 1
                             update=True, recurse=False,
67
                             force=False, fetch=False, clean=True):
68 1
        """Download or update the specified dependencies."""
69 1
        if depth == 0:
70
            log.info("Skipped directory: %s", self.location_path)
71 1
            return 0
72 1
73 1
        sources = self._get_sources(use_locked=False if update else None)
74 1
        sources_filter = list(names) if names else [s.name for s in sources]
75 1
76
        if not os.path.isdir(self.location_path):
77 1
            shell.mkdir(self.location_path)
78 1
        shell.cd(self.location_path)
79 1
        common.newline()
80 1
        common.indent()
81
82 1
        count = 0
83 1
        for source in sources:
84
            if source.name in sources_filter:
85 1
                sources_filter.remove(source.name)
86 1
            else:
87 1
                log.info("Skipped dependency: %s", source.name)
88 1
                continue
89
90 1
            source.update_files(force=force, fetch=fetch, clean=clean)
91 1
            source.create_link(self.root, force=force)
92 1
            common.newline()
93 1
            count += 1
94
95
            config = load_config(search=False)
96
            if config:
97
                common.indent()
98
                count += config.install_dependencies(
99
                    depth=None if depth is None else max(0, depth - 1),
100
                    update=update and recurse,
101 1
                    recurse=recurse,
102
                    force=force,
103 1
                    fetch=fetch,
104
                    clean=clean,
105 1
                )
106 1
                common.dedent()
107 1
108 1
            shell.cd(self.location_path, _show=False)
109
110 1
        common.dedent()
111
        if sources_filter:
112 1
            log.error("No such dependency: %s", ' '.join(sources_filter))
113
            return 0
114 1
115 1
        return count
116 1
117
    def run_scripts(self, *names, depth=None, force=False):
118 1
        """Run scripts for the specified dependencies."""
119 1
        if depth == 0:
120
            log.info("Skipped directory: %s", self.location_path)
121 1
            return 0
122 1
123 1
        sources = self._get_sources()
124
        sources_filter = list(names) if names else [s.name for s in sources]
125 1
126 1
        shell.cd(self.location_path)
127 1
        common.newline()
128 1
        common.indent()
129 1
130
        count = 0
131 1
        for source in sources:
132 1
            if source.name in sources_filter:
133 1
                source.run_scripts(force=force)
134 1
                count += 1
135
136
                config = load_config(search=False)
137
                if config:
138 1
                    common.indent()
139
                    count += config.run_scripts(
140 1
                        depth=None if depth is None else max(0, depth - 1),
141
                        force=force,
142 1
                    )
143
                    common.dedent()
144 1
145
                shell.cd(self.location_path, _show=False)
146 1
147
        common.dedent()
148 1
149 1
        return count
150
151 1
    def lock_dependencies(self, *names, obey_existing=True):
152 1
        """Lock down the immediate dependency versions."""
153 1
        sources = self._get_sources(use_locked=obey_existing).copy()
154
        sources_filter = list(names) if names else [s.name for s in sources]
155 1
156 1
        shell.cd(self.location_path)
157 1
        common.newline()
158 1
        common.indent()
159 1
160
        count = 0
161 1
        for source in sources:
162 1
            if source.name not in sources_filter:
163 1
                log.info("Skipped dependency: %s", source.name)
164 1
                continue
165
166 1
            try:
167 1
                index = self.sources_locked.index(source)
168
            except ValueError:
169 1
                self.sources_locked.append(source.lock())
170
            else:
171 1
                self.sources_locked[index] = source.lock()
172 1
            count += 1
173
174 1
            shell.cd(self.location_path, _show=False)
175
176 1
        if count:
177
            self.save()
178 1
179 1
        return count
180 1
181
    def uninstall_dependencies(self):
182 1
        """Delete the dependency storage location."""
183
        shell.cd(self.root)
184 1
        shell.rm(self.location_path)
185 1
        common.newline()
186
187 1
    def clean_dependencies(self):
188 1
        """Delete the dependency storage location."""
189 1
        for path in self.get_top_level_dependencies():
190
191 1
            if path == self.location_path:
192
                log.info("Skipped dependency: %s", path)
193 1
            else:
194 1
                shell.rm(path)
195 1
196
            common.newline()
197 1
198
        shell.rm(self.log_path)
199 1
200 1
    def get_top_level_dependencies(self):
201 1
        """Yield the path, repository, and hash of top-level dependencies."""
202 1
        if not os.path.exists(self.location_path):
203
            return
204
205
        shell.cd(self.location_path)
206 1
        common.newline()
207
        common.indent()
208 1
209
        for source in self.sources:
210 1
211
            yield os.path.join(self.location_path, source.name)
212 1
213
            shell.cd(self.location_path, _show=False)
214 1
215 1
        common.dedent()
216
217 1
    def get_dependencies(self, depth=None, allow_dirty=True):
218
        """Yield the path, repository, and hash of each dependency."""
219 1
        if not os.path.exists(self.location_path):
220 1
            return
221 1
222
        shell.cd(self.location_path)
223 1
        common.newline()
224 1
        common.indent()
225
226 1
        for source in self.sources:
227 1
228 1
            if depth == 0:
229
                log.info("Skipped dependency: %s", source.name)
230 1
                continue
231 1
232 1
            yield source.identify(allow_dirty=allow_dirty)
233
234 1
            config = load_config(search=False)
235 1
            if config:
236
                common.indent()
237 1
                yield from config.get_dependencies(
238 1
                    depth=None if depth is None else max(0, depth - 1),
239 1
                    allow_dirty=allow_dirty,
240 1
                )
241 1
                common.dedent()
242
243 1
            shell.cd(self.location_path, _show=False)
244
245
        common.dedent()
246 1
247
    def log(self, message="", *args):
248 1
        """Append a message to the log file."""
249 1
        with open(self.log_path, 'a') as outfile:
250
            outfile.write(message.format(*args) + '\n')
251 1
252
    def _get_sources(self, *, use_locked=None):
253 1
        """Merge source lists using the requested section as the base."""
254 1
        if use_locked is True:
255
            if self.sources_locked:
256 1
                return self.sources_locked
257 1
            log.info("No locked sources, defaulting to none...")
258 1
            return []
259
260 1
        sources = []
261 1
        if use_locked is False:
262 1
            sources = self.sources
263 1
        else:
264 1
            if self.sources_locked:
265
                log.info("Defaulting to locked sources...")
266 1
                sources = self.sources_locked
267 1
            else:
268
                log.info("No locked sources, using latest...")
269 1
                sources = self.sources
270
271 1
        extras = []
272 1
        for source in self.sources + self.sources_locked:
273
            if source not in sources:
274 1
                log.info("Source %r missing from selected section",
275
                         source.name)
276 1
                extras.append(source)
277
278
        return sources + extras
279 1
280 1
281 1
def load_config(start=None, *, search=True):
282 1
    """Load the config for the current project."""
283 1
    if start:
284
        start = os.path.abspath(start)
285
    else:
286
        start = os.getcwd()
287
288
    if search:
289
        log.debug("Searching for config...")
290
291
    path = start
292
    while path != os.path.dirname(path):
293
        log.debug("Looking for config in: %s", path)
294
295
        for filename in os.listdir(path):
296
            if _valid_filename(filename):
297
                config = Config(path, filename)
298
                config._on_post_load()  # pylint: disable=protected-access
299
                log.debug("Found config: %s", config.path)
300
                return config
301
302
        if search:
303
            path = os.path.dirname(path)
304
        else:
305
            break
306
307
    if search:
308
        log.debug("No config found starting from: %s", start)
309
    else:
310
        log.debug("No config found in: %s", start)
311
312
    return None
313
314
315
def _valid_filename(filename):
316
    name, ext = os.path.splitext(filename.lower())
317
    if name.startswith('.'):
318
        name = name[1:]
319
    return name in ['gitman', 'gdm'] and ext in ['.yml', '.yaml']
320