Passed
Pull Request — develop (#168)
by
unknown
02:28
created

gitman.models.config   F

Complexity

Total Complexity 67

Size/Duplication

Total Lines 321
Duplicated Lines 0 %

Test Coverage

Coverage 100%

Importance

Changes 0
Metric Value
eloc 223
dl 0
loc 321
ccs 183
cts 183
cp 1
rs 3.04
c 0
b 0
f 0
wmc 67

15 Methods

Rating   Name   Duplication   Size   Complexity  
A Config.__init__() 0 8 1
A Config.log() 0 4 2
A Config.location_path() 0 4 1
A Config.config_path() 0 4 1
A Config.clean_dependencies() 0 12 3
B Config.get_dependencies() 0 29 6
B Config._get_sources() 0 27 7
A Config.get_path() 0 10 4
A Config.get_top_level_dependencies() 0 16 3
A Config.uninstall_dependencies() 0 5 1
B Config.lock_dependencies() 0 29 7
C Config.install_dependencies() 0 51 10
A Config.log_path() 0 4 1
B Config.run_scripts() 0 33 7
A Config._on_post_load() 0 5 3

2 Functions

Rating   Name   Duplication   Size   Complexity  
A _valid_filename() 0 5 2
B load_config() 0 32 8

How to fix   Complexity   

Complexity

Complex classes like gitman.models.config often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1 1
import logging
2 1
import os
3
4 1
import yorm
5 1
from yorm.types import SortedList, String
6
7 1
from . import Source
8 1
from .. import common, shell
9 1
10
11 1
log = logging.getLogger(__name__)
12
13
14 1
@yorm.attr(location=String)
15 1
@yorm.attr(sources=SortedList.of_type(Source))
16 1
@yorm.attr(sources_locked=SortedList.of_type(Source))
17 1
@yorm.sync("{self.root}/{self.filename}", auto_save=False)
18 1
class Config(yorm.ModelMixin):
19
    """Specifies all dependencies for a project."""
20
21 1
    LOG = "gitman.log"
22
23 1
    def __init__(self, root=None,
24
                 filename="gitman.yml", location="gitman_sources"):
25 1
        super().__init__()
26 1
        self.root = root or os.getcwd()
27 1
        self.filename = filename
28 1
        self.location = location
29 1
        self.sources = []
30 1
        self.sources_locked = []
31
32 1
33
    def _on_post_load(self):
34
        for source in self.sources:
35 1
            source._on_post_load()
0 ignored issues
show
Coding Style Best Practice introduced by
It seems like _on_post_load was declared protected and should not be accessed from this context.

Prefixing a member variable _ is usually regarded as the equivalent of declaring it with protected visibility that exists in other languages. Consequentially, such a member should only be accessed from the same class or a child class:

class MyParent:
    def __init__(self):
        self._x = 1;
        self.y = 2;

class MyChild(MyParent):
    def some_method(self):
        return self._x    # Ok, since accessed from a child class

class AnotherClass:
    def some_method(self, instance_of_my_child):
        return instance_of_my_child._x   # Would be flagged as AnotherClass is not
                                         # a child class of MyParent
Loading history...
36 1
        for source in self.sources_locked:
37
            source._on_post_load()
0 ignored issues
show
Coding Style Best Practice introduced by
It seems like _on_post_load was declared protected and should not be accessed from this context.

Prefixing a member variable _ is usually regarded as the equivalent of declaring it with protected visibility that exists in other languages. Consequentially, such a member should only be accessed from the same class or a child class:

class MyParent:
    def __init__(self):
        self._x = 1;
        self.y = 2;

class MyChild(MyParent):
    def some_method(self):
        return self._x    # Ok, since accessed from a child class

class AnotherClass:
    def some_method(self, instance_of_my_child):
        return instance_of_my_child._x   # Would be flagged as AnotherClass is not
                                         # a child class of MyParent
Loading history...
38 1
39
    @property
40
    def config_path(self):
41 1
        """Get the full path to the config file."""
42
        return os.path.normpath(os.path.join(self.root, self.filename))
43 1
    path = config_path
44
45
    @property
46 1
    def log_path(self):
47
        """Get the full path to the log file."""
48 1
        return os.path.normpath(os.path.join(self.location_path, self.LOG))
49
50 1
    @property
51 1
    def location_path(self):
52 1
        """Get the full path to the dependency storage location."""
53 1
        return os.path.normpath(os.path.join(self.root, self.location))
54 1
55 1
    def get_path(self, name=None):
56 1
        """Get the full path to a dependency or internal file."""
57
        base = self.location_path
58 1
        if name == '__config__':
59
            return self.path
60 1
        if name == '__log__':
61
            return self.log_path
62
        if name:
63
            return os.path.normpath(os.path.join(base, name))
64 1
        return base
65 1
66 1
    def install_dependencies(self, *names, depth=None,
67
                             update=True, recurse=False,
68 1
                             force=False, fetch=False, clean=True):
69 1
        """Download or update the specified dependencies."""
70
        if depth == 0:
71 1
            log.info("Skipped directory: %s", self.location_path)
72 1
            return 0
73 1
74 1
        sources = self._get_sources(use_locked=False if update else None)
75 1
        sources_filter = list(names) if names else [s.name for s in sources]
76
77 1
        if not os.path.isdir(self.location_path):
78 1
            shell.mkdir(self.location_path)
79 1
        shell.cd(self.location_path)
80 1
        common.newline()
81
        common.indent()
82 1
83 1
        count = 0
84
        for source in sources:
85 1
            if source.name in sources_filter:
86 1
                sources_filter.remove(source.name)
87 1
            else:
88 1
                log.info("Skipped dependency: %s", source.name)
89
                continue
90 1
91 1
            source.update_files(force=force, fetch=fetch, clean=clean)
92 1
            source.create_link(self.root, force=force)
93 1
            common.newline()
94
            count += 1
95
96
            config = load_config(search=False)
97
            if config:
98
                common.indent()
99
                count += config.install_dependencies(
100
                    depth=None if depth is None else max(0, depth - 1),
101 1
                    update=update and recurse,
102
                    recurse=recurse,
103 1
                    force=force,
104
                    fetch=fetch,
105 1
                    clean=clean,
106 1
                )
107 1
                common.dedent()
108 1
109
            shell.cd(self.location_path, _show=False)
110 1
111
        common.dedent()
112 1
        if sources_filter:
113
            log.error("No such dependency: %s", ' '.join(sources_filter))
114 1
            return 0
115 1
116 1
        return count
117
118 1
    def run_scripts(self, *names, depth=None, force=False):
119 1
        """Run scripts for the specified dependencies."""
120
        if depth == 0:
121 1
            log.info("Skipped directory: %s", self.location_path)
122 1
            return 0
123 1
124
        sources = self._get_sources()
125 1
        sources_filter = list(names) if names else [s.name for s in sources]
126 1
127 1
        shell.cd(self.location_path)
128 1
        common.newline()
129 1
        common.indent()
130
131 1
        count = 0
132 1
        for source in sources:
133 1
            if source.name in sources_filter:
134 1
                source.run_scripts(force=force)
135
                count += 1
136
137
                config = load_config(search=False)
138 1
                if config:
139
                    common.indent()
140 1
                    count += config.run_scripts(
141
                        depth=None if depth is None else max(0, depth - 1),
142 1
                        force=force,
143
                    )
144 1
                    common.dedent()
145
146 1
                shell.cd(self.location_path, _show=False)
147
148 1
        common.dedent()
149 1
150
        return count
151 1
152 1
    def lock_dependencies(self, *names, obey_existing=True):
153 1
        """Lock down the immediate dependency versions."""
154
        sources = self._get_sources(use_locked=obey_existing).copy()
155 1
        sources_filter = list(names) if names else [s.name for s in sources]
156 1
157 1
        shell.cd(self.location_path)
158 1
        common.newline()
159 1
        common.indent()
160
161 1
        count = 0
162 1
        for source in sources:
163 1
            if source.name not in sources_filter:
164 1
                log.info("Skipped dependency: %s", source.name)
165
                continue
166 1
167 1
            try:
168
                index = self.sources_locked.index(source)
169 1
            except ValueError:
170
                self.sources_locked.append(source.lock())
171 1
            else:
172 1
                self.sources_locked[index] = source.lock()
173
            count += 1
174 1
175
            shell.cd(self.location_path, _show=False)
176 1
177
        if count:
178 1
            self.save()
179 1
180 1
        return count
181
182 1
    def uninstall_dependencies(self):
183
        """Delete the dependency storage location."""
184 1
        shell.cd(self.root)
185 1
        shell.rm(self.location_path)
186
        common.newline()
187 1
188 1
    def clean_dependencies(self):
189 1
        """Delete the dependency storage location."""
190
        for path in self.get_top_level_dependencies():
191 1
192
            if path == self.location_path:
193 1
                log.info("Skipped dependency: %s", path)
194 1
            else:
195 1
                shell.rm(path)
196
197 1
            common.newline()
198
199 1
        shell.rm(self.log_path)
200 1
201 1
    def get_top_level_dependencies(self):
202 1
        """Yield the path, repository, and hash of top-level dependencies."""
203
        if not os.path.exists(self.location_path):
204
            return
205
206 1
        shell.cd(self.location_path)
207
        common.newline()
208 1
        common.indent()
209
210 1
        for source in self.sources:
211
212 1
            yield os.path.join(self.location_path, source.name)
213
214 1
            shell.cd(self.location_path, _show=False)
215 1
216
        common.dedent()
217 1
218
    def get_dependencies(self, depth=None, allow_dirty=True):
219 1
        """Yield the path, repository, and hash of each dependency."""
220 1
        if not os.path.exists(self.location_path):
221 1
            return
222
223 1
        shell.cd(self.location_path)
224 1
        common.newline()
225
        common.indent()
226 1
227 1
        for source in self.sources:
228 1
229
            if depth == 0:
230 1
                log.info("Skipped dependency: %s", source.name)
231 1
                continue
232 1
233
            yield source.identify(allow_dirty=allow_dirty)
234 1
235 1
            config = load_config(search=False)
236
            if config:
237 1
                common.indent()
238 1
                yield from config.get_dependencies(
239 1
                    depth=None if depth is None else max(0, depth - 1),
240 1
                    allow_dirty=allow_dirty,
241 1
                )
242
                common.dedent()
243 1
244
            shell.cd(self.location_path, _show=False)
245
246 1
        common.dedent()
247
248 1
    def log(self, message="", *args):
249 1
        """Append a message to the log file."""
250
        with open(self.log_path, 'a') as outfile:
251 1
            outfile.write(message.format(*args) + '\n')
252
253 1
    def _get_sources(self, *, use_locked=None):
254 1
        """Merge source lists using the requested section as the base."""
255
        if use_locked is True:
256 1
            if self.sources_locked:
257 1
                return self.sources_locked
258 1
            log.info("No locked sources, defaulting to none...")
259
            return []
260 1
261 1
        sources = []
262 1
        if use_locked is False:
263 1
            sources = self.sources
264 1
        else:
265
            if self.sources_locked:
266 1
                log.info("Defaulting to locked sources...")
267 1
                sources = self.sources_locked
268
            else:
269 1
                log.info("No locked sources, using latest...")
270
                sources = self.sources
271 1
272 1
        extras = []
273
        for source in self.sources + self.sources_locked:
274 1
            if source not in sources:
275
                log.info("Source %r missing from selected section",
276 1
                         source.name)
277
                extras.append(source)
278
279 1
        return sources + extras
280 1
281 1
282 1
def load_config(start=None, *, search=True):
283 1
    """Load the config for the current project."""
284
    if start:
285
        start = os.path.abspath(start)
286
    else:
287
        start = os.getcwd()
288
289
    if search:
290
        log.debug("Searching for config...")
291
292
    path = start
293
    while path != os.path.dirname(path):
294
        log.debug("Looking for config in: %s", path)
295
296
        for filename in os.listdir(path):
297
            if _valid_filename(filename):
298
                config = Config(path, filename)
299
                config._on_post_load()
0 ignored issues
show
Coding Style Best Practice introduced by
It seems like _on_post_load was declared protected and should not be accessed from this context.

Prefixing a member variable _ is usually regarded as the equivalent of declaring it with protected visibility that exists in other languages. Consequentially, such a member should only be accessed from the same class or a child class:

class MyParent:
    def __init__(self):
        self._x = 1;
        self.y = 2;

class MyChild(MyParent):
    def some_method(self):
        return self._x    # Ok, since accessed from a child class

class AnotherClass:
    def some_method(self, instance_of_my_child):
        return instance_of_my_child._x   # Would be flagged as AnotherClass is not
                                         # a child class of MyParent
Loading history...
300
                log.debug("Found config: %s", config.path)
301
                return config
302
303
        if search:
304
            path = os.path.dirname(path)
305
        else:
306
            break
307
308
    if search:
309
        log.debug("No config found starting from: %s", start)
310
    else:
311
        log.debug("No config found in: %s", start)
312
313
    return None
314
315
316
def _valid_filename(filename):
317
    name, ext = os.path.splitext(filename.lower())
318
    if name.startswith('.'):
319
        name = name[1:]
320
    return name in ['gitman', 'gdm'] and ext in ['.yml', '.yaml']
321