tyrannosaurus.new   A
last analyzed

Complexity

Total Complexity 40

Size/Duplication

Total Lines 265
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 188
dl 0
loc 265
rs 9.2
c 0
b 0
f 0
wmc 40

9 Methods

Rating   Name   Duplication   Size   Complexity  
B New.__init__() 0 45 3
C New.create() 0 56 11
A New._track() 0 22 3
A New._parse_tyranno_vr() 0 13 5
A New._checkout_rev() 0 5 1
C New._call() 0 24 9
A New._checkout() 0 11 2
A New._set_tyranno_vr() 0 21 4
A New._murder_evil_path_for_sure() 0 18 2

How to fix   Complexity   

Complexity

Complex classes like tyrannosaurus.new often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""
2
Module that generates new projects.
3
4
Original source: https://github.com/dmyersturnbull/tyrannosaurus
5
Copyright 2020–2021 Douglas Myers-Turnbull
6
Licensed under the Apache License, Version 2.0 (the "License");
7
you may not use this file except in compliance with the License.
8
You may obtain a copy of the License at https://www.apache.org/licenses/LICENSE-2.0
9
"""
10
11
from __future__ import annotations
12
13
import logging
14
import os
15
import shutil
16
import stat
17
from pathlib import Path
18
from subprocess import PIPE, CalledProcessError, check_output  # nosec
19
from typing import Dict, List, Optional, Sequence, Union
20
21
import typer
22
23
from tyrannosaurus.enums import DevStatus, License
24
from tyrannosaurus.parser import LiteralParser
25
26
tyranno_url = "https://github.com/dmyersturnbull/tyrannosaurus.git"
27
logger = logging.getLogger(__package__)
28
cli = typer.Typer()
29
30
31
class VersionNotFoundError(LookupError):
32
    """
33
    The Git tag corresponding to the version was not found.
34
    """
35
36
37
class New:
38
    def __init__(
39
        self,
40
        name: str,
41
        license_name: Union[str, License],
42
        username: str,
43
        authors: Sequence[str],
44
        description: str,
45
        keywords: Sequence[str],
46
        version: str,
47
        status: DevStatus,
48
        should_track: bool,
49
        extras: bool,
50
        tyranno_vr: str,
51
        debug: bool = False,
52
    ):
53
        if isinstance(license_name, str):
54
            license_name = License[license_name.lower()]
55
        # check for historical reasons; can remove in the future
56
        if not isinstance(tyranno_vr, str):  # pragma: no cover
57
            raise ValueError(f"{tyranno_vr} has type {type(tyranno_vr)}")
58
        self.project_name = name.lower()
59
        self.pkg_name = name.replace("_", "").replace("-", "").replace(".", "").lower()
60
        self.license_name = license_name
61
        self.username = username
62
        self.authors = authors
63
        self.description = description
64
        self.keywords = keywords
65
        self.version = version
66
        self.status = status
67
        self.should_track = should_track
68
        self.extras = extras
69
        self.repo_to_track = f"https://github.com/{username}/{name.lower()}.git"
70
        self.tyranno_vr = str(tyranno_vr)
71
        self.parser = LiteralParser(
72
            project=self.project_name,
73
            user=self.username,
74
            authors=self.authors,
75
            description=self.description,
76
            keywords=self.keywords,
77
            version=self.version,
78
            status=self.status,
79
            license_name=self.license_name.name,
80
            tyranno_vr=self.tyranno_vr,
81
        )
82
        self.debug = debug
83
84
    def create(self, path: Path) -> None:
85
        self._checkout(Path(str(path).lower()))
86
        logger.info("Got git checkout. Fixing...")
87
        # remove tyrannosaurus-specific files
88
        Path(path / "poetry.lock").unlink()
89
        _recipe_path = Path(path / "recipes" / "tyrannosaurus" / "meta.yaml")
90
        if _recipe_path.exists():
91
            _recipe_path.unlink()
92
        Path(path / "recipes" / "tyrannosaurus").rmdir()
93
        for p in Path(path / "docs").iterdir():
94
            if p.is_file() and p.name not in {"conf.py", "requirements.txt"}:
95
                p.unlink()
96
        shutil.rmtree(str(path / "tests"))
97
        # download license
98
        license_text = self.parser.download_license_template(header=False)
99
        Path(path / "LICENSE.txt").write_text(license_text, encoding="utf8")
100
        # copy resources, overwriting
101
        for source in (path / "tyrannosaurus" / "resources").iterdir():
102
            source = Path(source)
103
            if not source.is_file():
104
                continue
105
            resource = source.name
106
            # $dot so we can circumvent the .gitignore
107
            resource = resource.replace("$dot", ".")
108
            resource = resource.replace("$project", self.project_name)
109
            resource = resource.replace("$pkg", self.pkg_name)
110
            # Remove .{other-extension}.txt at the end, with some restrictions
111
            # Don't fix, e.g. beautiful.butterfly.txt
112
            # But do replace .json.txt
113
            # Our ad-hoc rule is that an "extension" contains between 1 and 5 characters
114
            # (Also forbid a @ in the extension -- that's a path separator.)
115
            if resource.endswith(".txt"):
116
                resource = resource[:-4]
117
            # TODO: Fix this
118
            # resource = re.compile(r"^.*?(\.[^.@]{1,5})\.txt$").sub(r"\1", resource)
119
            dest = path / Path(*resource.split("@"))
120
            dest.parent.mkdir(parents=True, exist_ok=True)
121
            text = self.parser.parse(source.read_text(encoding="utf8"))
122
            # Also replace ${LICENSE.HEADER}
123
            # This is a special one: We don't currently do this in the context
124
            dest.write_text(text, encoding="utf8")
125
        # remove unneeded tyrannosaurus source dir
126
        # we already copied the files in tyrannosaurus/resources/
127
        shutil.rmtree(str(path / "tyrannosaurus"))
128
        if not self.extras:
129
            for f in {
130
                "azure-pipelines.yml",
131
                "codemeta.json",
132
                "Vagrantfile",
133
                "environment.yml",
134
                ".travis.yml",
135
            }:
136
                (path / f).unlink(missing_ok=True)
137
        # track remote via git
138
        if self.should_track:
139
            self._track(path)
140
141
    def _track(self, path: Path) -> None:
142
        is_initialized = self._call(
143
            ["git", "init", "--initial-branch=main"],
144
            cwd=path,
145
            fail="Failed calling git init. Giving up.",
146
        )
147
        if is_initialized:
148
            self._call(
149
                ["pre-commit", "install"], cwd=path, fail="Failed calling pre-commit install."
150
            )
151
            is_tracked = self._call(
152
                ["git", "remote", "add", "origin", self.repo_to_track],
153
                cwd=path,
154
                fail=f"Failed tracking {self.repo_to_track}",
155
            )
156
            if is_tracked:
157
                self._call(
158
                    ["git", "branch", "--set-upstream-to=origin/main", "main"],
159
                    cwd=path,
160
                    fail=f"Failed setting upstream to {self.repo_to_track}",
161
                )
162
        logger.info(f"Initialized new git repo tracking remote {self.repo_to_track}")
163
164
    def _checkout(self, path: Path) -> None:
165
        if path.exists():
166
            raise FileExistsError(f"Path {path} already exists")
167
        try:
168
            path.parent.mkdir(exist_ok=True, parents=True)
169
            logger.info("Running git clone...")
170
            self._call(["git", "clone", tyranno_url, str(path)])
171
            # FYI this would fail if we had deleted .git first
172
            self._set_tyranno_vr(path)
173
        finally:
174
            self._murder_evil_path_for_sure(path / ".git")
175
176
    def _set_tyranno_vr(self, path: Path):
177
        # if it's None, just leave it as HEAD
178
        if self.tyranno_vr == "latest":
179
            logger.info("Using HEAD for tyrannosaurus template version")
180
        else:
181
            tyranno_vr = self._parse_tyranno_vr(path, self.tyranno_vr)
182
            try:
183
                self._checkout_rev(path, tyranno_vr)
184
            except VersionNotFoundError:
185
                # if it was set as 'current', we might have failed because we're testing an unreleased version
186
                if self.tyranno_vr == "current":
187
                    logger.warning(
188
                        f"Installed tyrannosaurus version {tyranno_vr} not found; using stable"
189
                    )
190
                    tyranno_vr = self._parse_tyranno_vr(path, "stable")
191
                    self._checkout_rev(path, tyranno_vr)
192
                    # if that still doesn't work, let it fail
193
                else:
194
                    # let everything else fail, including for "stable"
195
                    raise
196
            logger.info(f"Using tyrannosaurus template version {tyranno_vr}")
197
198
    def _checkout_rev(self, path: Path, tyranno_vr: str):
199
        self._call(
200
            ["git", "checkout", f"tags/{tyranno_vr}".strip()],
201
            cwd=path,
202
            fail=VersionNotFoundError(f"Git tag '{tyranno_vr}' was not found."),
203
        )
204
205
    def _parse_tyranno_vr(self, path: Path, version: str) -> Optional[str]:
206
        version = version.lower().strip()
207
        if version == "latest":
208
            return None
209
        elif version == "current":
210
            from tyrannosaurus import TyrannoInfo
211
212
            return "v" + TyrannoInfo.version
213
        elif version == "stable":
214
            return self._call(["git", "describe", "--abbrev=0", "--tags"], cwd=path)
215
        elif version.startswith("v"):
216
            return version
217
        return "v" + version
218
219
    def _call(
220
        self,
221
        cmd: List[str],
222
        cwd: Optional[Path] = None,
223
        succeed: Optional[str] = None,
224
        fail: Union[None, str, BaseException] = None,
225
    ) -> Optional[str]:
226
        kwargs: Dict[str, str] = {} if cwd is None else {"cwd": str(cwd)}
227
        if not self.debug:
228
            kwargs["stderr"] = PIPE
229
        try:
230
            output = check_output(cmd, encoding="utf8", **kwargs)  # nosec
231
        except CalledProcessError:
232
            logger.debug(f"Failed calling {' '.join(cmd)} in {cwd}", exc_info=True)
233
            if fail is not None and isinstance(fail, BaseException):
234
                raise fail
235
            elif fail is not None:
236
                logger.error(fail)
237
            return None
238
        else:
239
            logger.debug(f"Succeeded calling {' '.join(cmd)} in {cwd}", exc_info=True)
240
            if succeed is not None:
241
                logger.info(succeed)
242
            return output
243
244
    def _murder_evil_path_for_sure(self, evil_path: Path) -> None:
245
        """
246
        There are likely to be permission issues with .git directories.
247
248
        Args:
249
            evil_path: The .git directory
250
        """
251
        try:
252
            shutil.rmtree(str(evil_path))
253
        except OSError:
254
            logger.debug("Could not delete .git with rmtree", exc_info=True)
255
256
            def on_rm_error(func, path, exc_info):
257
                # from: https://stackoverflow.com/questions/4829043/how-to-remove-read-only-attrib-directory-with-python-in-windows
258
                os.chmod(path, stat.S_IWRITE)
259
                os.unlink(path)
260
261
            shutil.rmtree(str(evil_path), onerror=on_rm_error)
262
263
264
__all__ = ["New"]
265