Passed
Push — develop ( 4d6495...2dddb7 )
by Jace
03:19
created

gitman.models.config   F

Complexity

Total Complexity 64

Size/Duplication

Total Lines 313
Duplicated Lines 0 %

Test Coverage

Coverage 100%

Importance

Changes 0
Metric Value
eloc 217
dl 0
loc 313
ccs 183
cts 183
cp 1
rs 3.28
c 0
b 0
f 0
wmc 64

14 Methods

Rating   Name   Duplication   Size   Complexity  
A Config.__init__() 0 8 1
A Config.location_path() 0 4 1
A Config.log() 0 4 2
A Config.config_path() 0 4 1
A Config.clean_dependencies() 0 12 3
B Config.get_dependencies() 0 29 6
B Config._get_sources() 0 27 7
A Config.get_path() 0 10 4
A Config.get_top_level_dependencies() 0 16 3
A Config.uninstall_dependencies() 0 5 1
B Config.lock_dependencies() 0 29 7
C Config.install_dependencies() 0 51 10
A Config.log_path() 0 4 1
B Config.run_scripts() 0 33 7

2 Functions

Rating   Name   Duplication   Size   Complexity  
A _valid_filename() 0 5 2
B load_config() 0 31 8

How to fix   Complexity   

Complexity

Complex classes like gitman.models.config often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1 1
import logging
2 1
import os
3
4 1
import yorm
5 1
from yorm.types import SortedList, String
6
7 1
from . import Source
8 1
from .. import common, shell
9 1
10
11 1
log = logging.getLogger(__name__)
12
13
14 1
@yorm.attr(location=String)
15 1
@yorm.attr(sources=SortedList.of_type(Source))
16 1
@yorm.attr(sources_locked=SortedList.of_type(Source))
17 1
@yorm.sync("{self.root}/{self.filename}", auto_save=False)
18 1
class Config(yorm.ModelMixin):
19
    """Specifies all dependencies for a project."""
20
21 1
    LOG = "gitman.log"
22
23 1
    def __init__(self, root=None,
24
                 filename="gitman.yml", location="gitman_sources"):
25 1
        super().__init__()
26 1
        self.root = root or os.getcwd()
27 1
        self.filename = filename
28 1
        self.location = location
29 1
        self.sources = []
30 1
        self.sources_locked = []
31
32 1
    @property
33
    def config_path(self):
34
        """Get the full path to the config file."""
35 1
        return os.path.normpath(os.path.join(self.root, self.filename))
36 1
    path = config_path
37
38 1
    @property
39
    def log_path(self):
40
        """Get the full path to the log file."""
41 1
        return os.path.normpath(os.path.join(self.location_path, self.LOG))
42
43 1
    @property
44
    def location_path(self):
45
        """Get the full path to the dependency storage location."""
46 1
        return os.path.normpath(os.path.join(self.root, self.location))
47
48 1
    def get_path(self, name=None):
49
        """Get the full path to a dependency or internal file."""
50 1
        base = self.location_path
51 1
        if name == '__config__':
52 1
            return self.path
53 1
        if name == '__log__':
54 1
            return self.log_path
55 1
        if name:
56 1
            return os.path.normpath(os.path.join(base, name))
57
        return base
58 1
59
    def install_dependencies(self, *names, depth=None,
60 1
                             update=True, recurse=False,
61
                             force=False, fetch=False, clean=True):
62
        """Download or update the specified dependencies."""
63
        if depth == 0:
64 1
            log.info("Skipped directory: %s", self.location_path)
65 1
            return 0
66 1
67
        sources = self._get_sources(use_locked=False if update else None)
68 1
        sources_filter = list(names) if names else [s.name for s in sources]
69 1
70
        if not os.path.isdir(self.location_path):
71 1
            shell.mkdir(self.location_path)
72 1
        shell.cd(self.location_path)
73 1
        common.newline()
74 1
        common.indent()
75 1
76
        count = 0
77 1
        for source in sources:
78 1
            if source.name in sources_filter:
79 1
                sources_filter.remove(source.name)
80 1
            else:
81
                log.info("Skipped dependency: %s", source.name)
82 1
                continue
83 1
84
            source.update_files(force=force, fetch=fetch, clean=clean)
85 1
            source.create_link(self.root, force=force)
86 1
            common.newline()
87 1
            count += 1
88 1
89
            config = load_config(search=False)
90 1
            if config:
91 1
                common.indent()
92 1
                count += config.install_dependencies(
93 1
                    depth=None if depth is None else max(0, depth - 1),
94
                    update=update and recurse,
95
                    recurse=recurse,
96
                    force=force,
97
                    fetch=fetch,
98
                    clean=clean,
99
                )
100
                common.dedent()
101 1
102
            shell.cd(self.location_path, _show=False)
103 1
104
        common.dedent()
105 1
        if sources_filter:
106 1
            log.error("No such dependency: %s", ' '.join(sources_filter))
107 1
            return 0
108 1
109
        return count
110 1
111
    def run_scripts(self, *names, depth=None, force=False):
112 1
        """Run scripts for the specified dependencies."""
113
        if depth == 0:
114 1
            log.info("Skipped directory: %s", self.location_path)
115 1
            return 0
116 1
117
        sources = self._get_sources()
118 1
        sources_filter = list(names) if names else [s.name for s in sources]
119 1
120
        shell.cd(self.location_path)
121 1
        common.newline()
122 1
        common.indent()
123 1
124
        count = 0
125 1
        for source in sources:
126 1
            if source.name in sources_filter:
127 1
                source.run_scripts(force=force)
128 1
                count += 1
129 1
130
                config = load_config(search=False)
131 1
                if config:
132 1
                    common.indent()
133 1
                    count += config.run_scripts(
134 1
                        depth=None if depth is None else max(0, depth - 1),
135
                        force=force,
136
                    )
137
                    common.dedent()
138 1
139
                shell.cd(self.location_path, _show=False)
140 1
141
        common.dedent()
142 1
143
        return count
144 1
145
    def lock_dependencies(self, *names, obey_existing=True):
146 1
        """Lock down the immediate dependency versions."""
147
        sources = self._get_sources(use_locked=obey_existing).copy()
148 1
        sources_filter = list(names) if names else [s.name for s in sources]
149 1
150
        shell.cd(self.location_path)
151 1
        common.newline()
152 1
        common.indent()
153 1
154
        count = 0
155 1
        for source in sources:
156 1
            if source.name not in sources_filter:
157 1
                log.info("Skipped dependency: %s", source.name)
158 1
                continue
159 1
160
            try:
161 1
                index = self.sources_locked.index(source)
162 1
            except ValueError:
163 1
                self.sources_locked.append(source.lock())
164 1
            else:
165
                self.sources_locked[index] = source.lock()
166 1
            count += 1
167 1
168
            shell.cd(self.location_path, _show=False)
169 1
170
        if count:
171 1
            self.save()
172 1
173
        return count
174 1
175
    def uninstall_dependencies(self):
176 1
        """Delete the dependency storage location."""
177
        shell.cd(self.root)
178 1
        shell.rm(self.location_path)
179 1
        common.newline()
180 1
181
    def clean_dependencies(self):
182 1
        """Delete the dependency storage location."""
183
        for path in self.get_top_level_dependencies():
184 1
185 1
            if path == self.location_path:
186
                log.info("Skipped dependency: %s", path)
187 1
            else:
188 1
                shell.rm(path)
189 1
190
            common.newline()
191 1
192
        shell.rm(self.log_path)
193 1
194 1
    def get_top_level_dependencies(self):
195 1
        """Yield the path, repository, and hash of top-level dependencies."""
196
        if not os.path.exists(self.location_path):
197 1
            return
198
199 1
        shell.cd(self.location_path)
200 1
        common.newline()
201 1
        common.indent()
202 1
203
        for source in self.sources:
204
205
            yield os.path.join(self.location_path, source.name)
206 1
207
            shell.cd(self.location_path, _show=False)
208 1
209
        common.dedent()
210 1
211
    def get_dependencies(self, depth=None, allow_dirty=True):
212 1
        """Yield the path, repository, and hash of each dependency."""
213
        if not os.path.exists(self.location_path):
214 1
            return
215 1
216
        shell.cd(self.location_path)
217 1
        common.newline()
218
        common.indent()
219 1
220 1
        for source in self.sources:
221 1
222
            if depth == 0:
223 1
                log.info("Skipped dependency: %s", source.name)
224 1
                continue
225
226 1
            yield source.identify(allow_dirty=allow_dirty)
227 1
228 1
            config = load_config(search=False)
229
            if config:
230 1
                common.indent()
231 1
                yield from config.get_dependencies(
232 1
                    depth=None if depth is None else max(0, depth - 1),
233
                    allow_dirty=allow_dirty,
234 1
                )
235 1
                common.dedent()
236
237 1
            shell.cd(self.location_path, _show=False)
238 1
239 1
        common.dedent()
240 1
241 1
    def log(self, message="", *args):
242
        """Append a message to the log file."""
243 1
        with open(self.log_path, 'a') as outfile:
244
            outfile.write(message.format(*args) + '\n')
245
246 1
    def _get_sources(self, *, use_locked=None):
247
        """Merge source lists using the requested section as the base."""
248 1
        if use_locked is True:
249 1
            if self.sources_locked:
250
                return self.sources_locked
251 1
            log.info("No locked sources, defaulting to none...")
252
            return []
253 1
254 1
        sources = []
255
        if use_locked is False:
256 1
            sources = self.sources
257 1
        else:
258 1
            if self.sources_locked:
259
                log.info("Defaulting to locked sources...")
260 1
                sources = self.sources_locked
261 1
            else:
262 1
                log.info("No locked sources, using latest...")
263 1
                sources = self.sources
264 1
265
        extras = []
266 1
        for source in self.sources + self.sources_locked:
267 1
            if source not in sources:
268
                log.info("Source %r missing from selected section",
269 1
                         source.name)
270
                extras.append(source)
271 1
272 1
        return sources + extras
273
274 1
275
def load_config(start=None, *, search=True):
276 1
    """Load the config for the current project."""
277
    if start:
278
        start = os.path.abspath(start)
279 1
    else:
280 1
        start = os.getcwd()
281 1
282 1
    if search:
283 1
        log.debug("Searching for config...")
284
285
    path = start
286
    while path != os.path.dirname(path):
287
        log.debug("Looking for config in: %s", path)
288
289
        for filename in os.listdir(path):
290
            if _valid_filename(filename):
291
                config = Config(path, filename)
292
                log.debug("Found config: %s", config.path)
293
                return config
294
295
        if search:
296
            path = os.path.dirname(path)
297
        else:
298
            break
299
300
    if search:
301
        log.debug("No config found starting from: %s", start)
302
    else:
303
        log.debug("No config found in: %s", start)
304
305
    return None
306
307
308
def _valid_filename(filename):
309
    name, ext = os.path.splitext(filename.lower())
310
    if name.startswith('.'):
311
        name = name[1:]
312
    return name in ['gitman', 'gdm'] and ext in ['.yml', '.yaml']
313