Passed
Pull Request — develop (#168)
by
unknown
02:28
created

gitman.models.config.Config.install_dependencies()   C

Complexity

Conditions 10

Size

Total Lines 51
Code Lines 40

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 33
CRAP Score 10

Importance

Changes 0
Metric Value
cc 10
eloc 40
nop 8
dl 0
loc 51
ccs 33
cts 33
cp 1
crap 10
rs 5.9999
c 0
b 0
f 0

How to fix   Long Method    Complexity    Many Parameters   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like gitman.models.config.Config.install_dependencies() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

Many Parameters

Methods with many parameters are not only hard to understand, but their parameters also often become inconsistent when you need more, or different data.

There are several approaches to avoid long parameter lists:

1 1
import logging
2 1
import os
3
4 1
import yorm
5 1
from yorm.types import SortedList, String
6
7 1
from . import Source
8 1
from .. import common, shell
9 1
10
11 1
log = logging.getLogger(__name__)
12
13
14 1
@yorm.attr(location=String)
15 1
@yorm.attr(sources=SortedList.of_type(Source))
16 1
@yorm.attr(sources_locked=SortedList.of_type(Source))
17 1
@yorm.sync("{self.root}/{self.filename}", auto_save=False)
18 1
class Config(yorm.ModelMixin):
19
    """Specifies all dependencies for a project."""
20
21 1
    LOG = "gitman.log"
22
23 1
    def __init__(self, root=None,
24
                 filename="gitman.yml", location="gitman_sources"):
25 1
        super().__init__()
26 1
        self.root = root or os.getcwd()
27 1
        self.filename = filename
28 1
        self.location = location
29 1
        self.sources = []
30 1
        self.sources_locked = []
31
32 1
33
    def _on_post_load(self):
34
        for source in self.sources:
35 1
            source._on_post_load()
0 ignored issues
show
Coding Style Best Practice introduced by
It seems like _on_post_load was declared protected and should not be accessed from this context.

Prefixing a member variable _ is usually regarded as the equivalent of declaring it with protected visibility that exists in other languages. Consequentially, such a member should only be accessed from the same class or a child class:

class MyParent:
    def __init__(self):
        self._x = 1;
        self.y = 2;

class MyChild(MyParent):
    def some_method(self):
        return self._x    # Ok, since accessed from a child class

class AnotherClass:
    def some_method(self, instance_of_my_child):
        return instance_of_my_child._x   # Would be flagged as AnotherClass is not
                                         # a child class of MyParent
Loading history...
36 1
        for source in self.sources_locked:
37
            source._on_post_load()
0 ignored issues
show
Coding Style Best Practice introduced by
It seems like _on_post_load was declared protected and should not be accessed from this context.

Prefixing a member variable _ is usually regarded as the equivalent of declaring it with protected visibility that exists in other languages. Consequentially, such a member should only be accessed from the same class or a child class:

class MyParent:
    def __init__(self):
        self._x = 1;
        self.y = 2;

class MyChild(MyParent):
    def some_method(self):
        return self._x    # Ok, since accessed from a child class

class AnotherClass:
    def some_method(self, instance_of_my_child):
        return instance_of_my_child._x   # Would be flagged as AnotherClass is not
                                         # a child class of MyParent
Loading history...
38 1
39
    @property
40
    def config_path(self):
41 1
        """Get the full path to the config file."""
42
        return os.path.normpath(os.path.join(self.root, self.filename))
43 1
    path = config_path
44
45
    @property
46 1
    def log_path(self):
47
        """Get the full path to the log file."""
48 1
        return os.path.normpath(os.path.join(self.location_path, self.LOG))
49
50 1
    @property
51 1
    def location_path(self):
52 1
        """Get the full path to the dependency storage location."""
53 1
        return os.path.normpath(os.path.join(self.root, self.location))
54 1
55 1
    def get_path(self, name=None):
56 1
        """Get the full path to a dependency or internal file."""
57
        base = self.location_path
58 1
        if name == '__config__':
59
            return self.path
60 1
        if name == '__log__':
61
            return self.log_path
62
        if name:
63
            return os.path.normpath(os.path.join(base, name))
64 1
        return base
65 1
66 1
    def install_dependencies(self, *names, depth=None,
67
                             update=True, recurse=False,
68 1
                             force=False, fetch=False, clean=True):
69 1
        """Download or update the specified dependencies."""
70
        if depth == 0:
71 1
            log.info("Skipped directory: %s", self.location_path)
72 1
            return 0
73 1
74 1
        sources = self._get_sources(use_locked=False if update else None)
75 1
        sources_filter = list(names) if names else [s.name for s in sources]
76
77 1
        if not os.path.isdir(self.location_path):
78 1
            shell.mkdir(self.location_path)
79 1
        shell.cd(self.location_path)
80 1
        common.newline()
81
        common.indent()
82 1
83 1
        count = 0
84
        for source in sources:
85 1
            if source.name in sources_filter:
86 1
                sources_filter.remove(source.name)
87 1
            else:
88 1
                log.info("Skipped dependency: %s", source.name)
89
                continue
90 1
91 1
            source.update_files(force=force, fetch=fetch, clean=clean)
92 1
            source.create_link(self.root, force=force)
93 1
            common.newline()
94
            count += 1
95
96
            config = load_config(search=False)
97
            if config:
98
                common.indent()
99
                count += config.install_dependencies(
100
                    depth=None if depth is None else max(0, depth - 1),
101 1
                    update=update and recurse,
102
                    recurse=recurse,
103 1
                    force=force,
104
                    fetch=fetch,
105 1
                    clean=clean,
106 1
                )
107 1
                common.dedent()
108 1
109
            shell.cd(self.location_path, _show=False)
110 1
111
        common.dedent()
112 1
        if sources_filter:
113
            log.error("No such dependency: %s", ' '.join(sources_filter))
114 1
            return 0
115 1
116 1
        return count
117
118 1
    def run_scripts(self, *names, depth=None, force=False):
119 1
        """Run scripts for the specified dependencies."""
120
        if depth == 0:
121 1
            log.info("Skipped directory: %s", self.location_path)
122 1
            return 0
123 1
124
        sources = self._get_sources()
125 1
        sources_filter = list(names) if names else [s.name for s in sources]
126 1
127 1
        shell.cd(self.location_path)
128 1
        common.newline()
129 1
        common.indent()
130
131 1
        count = 0
132 1
        for source in sources:
133 1
            if source.name in sources_filter:
134 1
                source.run_scripts(force=force)
135
                count += 1
136
137
                config = load_config(search=False)
138 1
                if config:
139
                    common.indent()
140 1
                    count += config.run_scripts(
141
                        depth=None if depth is None else max(0, depth - 1),
142 1
                        force=force,
143
                    )
144 1
                    common.dedent()
145
146 1
                shell.cd(self.location_path, _show=False)
147
148 1
        common.dedent()
149 1
150
        return count
151 1
152 1
    def lock_dependencies(self, *names, obey_existing=True):
153 1
        """Lock down the immediate dependency versions."""
154
        sources = self._get_sources(use_locked=obey_existing).copy()
155 1
        sources_filter = list(names) if names else [s.name for s in sources]
156 1
157 1
        shell.cd(self.location_path)
158 1
        common.newline()
159 1
        common.indent()
160
161 1
        count = 0
162 1
        for source in sources:
163 1
            if source.name not in sources_filter:
164 1
                log.info("Skipped dependency: %s", source.name)
165
                continue
166 1
167 1
            try:
168
                index = self.sources_locked.index(source)
169 1
            except ValueError:
170
                self.sources_locked.append(source.lock())
171 1
            else:
172 1
                self.sources_locked[index] = source.lock()
173
            count += 1
174 1
175
            shell.cd(self.location_path, _show=False)
176 1
177
        if count:
178 1
            self.save()
179 1
180 1
        return count
181
182 1
    def uninstall_dependencies(self):
183
        """Delete the dependency storage location."""
184 1
        shell.cd(self.root)
185 1
        shell.rm(self.location_path)
186
        common.newline()
187 1
188 1
    def clean_dependencies(self):
189 1
        """Delete the dependency storage location."""
190
        for path in self.get_top_level_dependencies():
191 1
192
            if path == self.location_path:
193 1
                log.info("Skipped dependency: %s", path)
194 1
            else:
195 1
                shell.rm(path)
196
197 1
            common.newline()
198
199 1
        shell.rm(self.log_path)
200 1
201 1
    def get_top_level_dependencies(self):
202 1
        """Yield the path, repository, and hash of top-level dependencies."""
203
        if not os.path.exists(self.location_path):
204
            return
205
206 1
        shell.cd(self.location_path)
207
        common.newline()
208 1
        common.indent()
209
210 1
        for source in self.sources:
211
212 1
            yield os.path.join(self.location_path, source.name)
213
214 1
            shell.cd(self.location_path, _show=False)
215 1
216
        common.dedent()
217 1
218
    def get_dependencies(self, depth=None, allow_dirty=True):
219 1
        """Yield the path, repository, and hash of each dependency."""
220 1
        if not os.path.exists(self.location_path):
221 1
            return
222
223 1
        shell.cd(self.location_path)
224 1
        common.newline()
225
        common.indent()
226 1
227 1
        for source in self.sources:
228 1
229
            if depth == 0:
230 1
                log.info("Skipped dependency: %s", source.name)
231 1
                continue
232 1
233
            yield source.identify(allow_dirty=allow_dirty)
234 1
235 1
            config = load_config(search=False)
236
            if config:
237 1
                common.indent()
238 1
                yield from config.get_dependencies(
239 1
                    depth=None if depth is None else max(0, depth - 1),
240 1
                    allow_dirty=allow_dirty,
241 1
                )
242
                common.dedent()
243 1
244
            shell.cd(self.location_path, _show=False)
245
246 1
        common.dedent()
247
248 1
    def log(self, message="", *args):
249 1
        """Append a message to the log file."""
250
        with open(self.log_path, 'a') as outfile:
251 1
            outfile.write(message.format(*args) + '\n')
252
253 1
    def _get_sources(self, *, use_locked=None):
254 1
        """Merge source lists using the requested section as the base."""
255
        if use_locked is True:
256 1
            if self.sources_locked:
257 1
                return self.sources_locked
258 1
            log.info("No locked sources, defaulting to none...")
259
            return []
260 1
261 1
        sources = []
262 1
        if use_locked is False:
263 1
            sources = self.sources
264 1
        else:
265
            if self.sources_locked:
266 1
                log.info("Defaulting to locked sources...")
267 1
                sources = self.sources_locked
268
            else:
269 1
                log.info("No locked sources, using latest...")
270
                sources = self.sources
271 1
272 1
        extras = []
273
        for source in self.sources + self.sources_locked:
274 1
            if source not in sources:
275
                log.info("Source %r missing from selected section",
276 1
                         source.name)
277
                extras.append(source)
278
279 1
        return sources + extras
280 1
281 1
282 1
def load_config(start=None, *, search=True):
283 1
    """Load the config for the current project."""
284
    if start:
285
        start = os.path.abspath(start)
286
    else:
287
        start = os.getcwd()
288
289
    if search:
290
        log.debug("Searching for config...")
291
292
    path = start
293
    while path != os.path.dirname(path):
294
        log.debug("Looking for config in: %s", path)
295
296
        for filename in os.listdir(path):
297
            if _valid_filename(filename):
298
                config = Config(path, filename)
299
                config._on_post_load()
0 ignored issues
show
Coding Style Best Practice introduced by
It seems like _on_post_load was declared protected and should not be accessed from this context.

Prefixing a member variable _ is usually regarded as the equivalent of declaring it with protected visibility that exists in other languages. Consequentially, such a member should only be accessed from the same class or a child class:

class MyParent:
    def __init__(self):
        self._x = 1;
        self.y = 2;

class MyChild(MyParent):
    def some_method(self):
        return self._x    # Ok, since accessed from a child class

class AnotherClass:
    def some_method(self, instance_of_my_child):
        return instance_of_my_child._x   # Would be flagged as AnotherClass is not
                                         # a child class of MyParent
Loading history...
300
                log.debug("Found config: %s", config.path)
301
                return config
302
303
        if search:
304
            path = os.path.dirname(path)
305
        else:
306
            break
307
308
    if search:
309
        log.debug("No config found starting from: %s", start)
310
    else:
311
        log.debug("No config found in: %s", start)
312
313
    return None
314
315
316
def _valid_filename(filename):
317
    name, ext = os.path.splitext(filename.lower())
318
    if name.startswith('.'):
319
        name = name[1:]
320
    return name in ['gitman', 'gdm'] and ext in ['.yml', '.yaml']
321