tyrannosaurus.new.New.__init__()   B
last analyzed

Complexity

Conditions 3

Size

Total Lines 45
Code Lines 42

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 3
eloc 42
nop 13
dl 0
loc 45
rs 8.872
c 0
b 0
f 0

How to fix   Many Parameters   

Many Parameters

Methods with many parameters are not only hard to understand, but their parameters also often become inconsistent when you need more, or different data.

There are several approaches to avoid long parameter lists:

1
"""
2
Module that generates new projects.
3
4
Original source: https://github.com/dmyersturnbull/tyrannosaurus
5
Copyright 2020–2021 Douglas Myers-Turnbull
6
Licensed under the Apache License, Version 2.0 (the "License");
7
you may not use this file except in compliance with the License.
8
You may obtain a copy of the License at https://www.apache.org/licenses/LICENSE-2.0
9
"""
10
11
from __future__ import annotations
12
13
import logging
14
import os
15
import shutil
16
import stat
17
from pathlib import Path
18
from subprocess import PIPE, CalledProcessError, check_output  # nosec
19
from typing import Dict, List, Optional, Sequence, Union
20
21
import typer
22
23
from tyrannosaurus.enums import DevStatus, License
24
from tyrannosaurus.parser import LiteralParser
25
26
tyranno_url = "https://github.com/dmyersturnbull/tyrannosaurus.git"
27
logger = logging.getLogger(__package__)
28
cli = typer.Typer()
29
30
31
class VersionNotFoundError(LookupError):
32
    """
33
    The Git tag corresponding to the version was not found.
34
    """
35
36
37
class New:
38
    def __init__(
39
        self,
40
        name: str,
41
        license_name: Union[str, License],
42
        username: str,
43
        authors: Sequence[str],
44
        description: str,
45
        keywords: Sequence[str],
46
        version: str,
47
        status: DevStatus,
48
        should_track: bool,
49
        extras: bool,
50
        tyranno_vr: str,
51
        debug: bool = False,
52
    ):
53
        if isinstance(license_name, str):
54
            license_name = License[license_name.lower()]
55
        # check for historical reasons; can remove in the future
56
        if not isinstance(tyranno_vr, str):  # pragma: no cover
57
            raise ValueError(f"{tyranno_vr} has type {type(tyranno_vr)}")
58
        self.project_name = name.lower()
59
        self.pkg_name = name.replace("_", "").replace("-", "").replace(".", "").lower()
60
        self.license_name = license_name
61
        self.username = username
62
        self.authors = authors
63
        self.description = description
64
        self.keywords = keywords
65
        self.version = version
66
        self.status = status
67
        self.should_track = should_track
68
        self.extras = extras
69
        self.repo_to_track = f"https://github.com/{username}/{name.lower()}.git"
70
        self.tyranno_vr = str(tyranno_vr)
71
        self.parser = LiteralParser(
72
            project=self.project_name,
73
            user=self.username,
74
            authors=self.authors,
75
            description=self.description,
76
            keywords=self.keywords,
77
            version=self.version,
78
            status=self.status,
79
            license_name=self.license_name.name,
80
            tyranno_vr=self.tyranno_vr,
81
        )
82
        self.debug = debug
83
84
    def create(self, path: Path) -> None:
85
        self._checkout(Path(str(path).lower()))
86
        logger.info("Got git checkout. Fixing...")
87
        # remove tyrannosaurus-specific files
88
        Path(path / "poetry.lock").unlink()
89
        _recipe_path = Path(path / "recipes" / "tyrannosaurus" / "meta.yaml")
90
        if _recipe_path.exists():
91
            _recipe_path.unlink()
92
        Path(path / "recipes" / "tyrannosaurus").rmdir()
93
        for p in Path(path / "docs").iterdir():
94
            if p.is_file() and p.name not in {"conf.py", "requirements.txt"}:
95
                p.unlink()
96
        shutil.rmtree(str(path / "tests"))
97
        # download license
98
        license_text = self.parser.download_license_template(header=False)
99
        Path(path / "LICENSE.txt").write_text(license_text, encoding="utf8")
100
        # copy resources, overwriting
101
        for source in (path / "tyrannosaurus" / "resources").iterdir():
102
            source = Path(source)
103
            if not source.is_file():
104
                continue
105
            resource = source.name
106
            # $dot so we can circumvent the .gitignore
107
            resource = resource.replace("$dot", ".")
108
            resource = resource.replace("$project", self.project_name)
109
            resource = resource.replace("$pkg", self.pkg_name)
110
            # Remove .{other-extension}.txt at the end, with some restrictions
111
            # Don't fix, e.g. beautiful.butterfly.txt
112
            # But do replace .json.txt
113
            # Our ad-hoc rule is that an "extension" contains between 1 and 5 characters
114
            # (Also forbid a @ in the extension -- that's a path separator.)
115
            if resource.endswith(".txt"):
116
                resource = resource[:-4]
117
            # TODO: Fix this
118
            # resource = re.compile(r"^.*?(\.[^.@]{1,5})\.txt$").sub(r"\1", resource)
119
            dest = path / Path(*resource.split("@"))
120
            dest.parent.mkdir(parents=True, exist_ok=True)
121
            text = self.parser.parse(source.read_text(encoding="utf8"))
122
            # Also replace ${LICENSE.HEADER}
123
            # This is a special one: We don't currently do this in the context
124
            dest.write_text(text, encoding="utf8")
125
        # remove unneeded tyrannosaurus source dir
126
        # we already copied the files in tyrannosaurus/resources/
127
        shutil.rmtree(str(path / "tyrannosaurus"))
128
        if not self.extras:
129
            for f in {
130
                "azure-pipelines.yml",
131
                "codemeta.json",
132
                "Vagrantfile",
133
                "environment.yml",
134
                ".travis.yml",
135
            }:
136
                (path / f).unlink(missing_ok=True)
137
        # track remote via git
138
        if self.should_track:
139
            self._track(path)
140
141
    def _track(self, path: Path) -> None:
142
        is_initialized = self._call(
143
            ["git", "init", "--initial-branch=main"],
144
            cwd=path,
145
            fail="Failed calling git init. Giving up.",
146
        )
147
        if is_initialized:
148
            self._call(
149
                ["pre-commit", "install"], cwd=path, fail="Failed calling pre-commit install."
150
            )
151
            is_tracked = self._call(
152
                ["git", "remote", "add", "origin", self.repo_to_track],
153
                cwd=path,
154
                fail=f"Failed tracking {self.repo_to_track}",
155
            )
156
            if is_tracked:
157
                self._call(
158
                    ["git", "branch", "--set-upstream-to=origin/main", "main"],
159
                    cwd=path,
160
                    fail=f"Failed setting upstream to {self.repo_to_track}",
161
                )
162
        logger.info(f"Initialized new git repo tracking remote {self.repo_to_track}")
163
164
    def _checkout(self, path: Path) -> None:
165
        if path.exists():
166
            raise FileExistsError(f"Path {path} already exists")
167
        try:
168
            path.parent.mkdir(exist_ok=True, parents=True)
169
            logger.info("Running git clone...")
170
            self._call(["git", "clone", tyranno_url, str(path)])
171
            # FYI this would fail if we had deleted .git first
172
            self._set_tyranno_vr(path)
173
        finally:
174
            self._murder_evil_path_for_sure(path / ".git")
175
176
    def _set_tyranno_vr(self, path: Path):
177
        # if it's None, just leave it as HEAD
178
        if self.tyranno_vr == "latest":
179
            logger.info("Using HEAD for tyrannosaurus template version")
180
        else:
181
            tyranno_vr = self._parse_tyranno_vr(path, self.tyranno_vr)
182
            try:
183
                self._checkout_rev(path, tyranno_vr)
184
            except VersionNotFoundError:
185
                # if it was set as 'current', we might have failed because we're testing an unreleased version
186
                if self.tyranno_vr == "current":
187
                    logger.warning(
188
                        f"Installed tyrannosaurus version {tyranno_vr} not found; using stable"
189
                    )
190
                    tyranno_vr = self._parse_tyranno_vr(path, "stable")
191
                    self._checkout_rev(path, tyranno_vr)
192
                    # if that still doesn't work, let it fail
193
                else:
194
                    # let everything else fail, including for "stable"
195
                    raise
196
            logger.info(f"Using tyrannosaurus template version {tyranno_vr}")
197
198
    def _checkout_rev(self, path: Path, tyranno_vr: str):
199
        self._call(
200
            ["git", "checkout", f"tags/{tyranno_vr}".strip()],
201
            cwd=path,
202
            fail=VersionNotFoundError(f"Git tag '{tyranno_vr}' was not found."),
203
        )
204
205
    def _parse_tyranno_vr(self, path: Path, version: str) -> Optional[str]:
206
        version = version.lower().strip()
207
        if version == "latest":
208
            return None
209
        elif version == "current":
210
            from tyrannosaurus import TyrannoInfo
211
212
            return "v" + TyrannoInfo.version
213
        elif version == "stable":
214
            return self._call(["git", "describe", "--abbrev=0", "--tags"], cwd=path)
215
        elif version.startswith("v"):
216
            return version
217
        return "v" + version
218
219
    def _call(
220
        self,
221
        cmd: List[str],
222
        cwd: Optional[Path] = None,
223
        succeed: Optional[str] = None,
224
        fail: Union[None, str, BaseException] = None,
225
    ) -> Optional[str]:
226
        kwargs: Dict[str, str] = {} if cwd is None else {"cwd": str(cwd)}
227
        if not self.debug:
228
            kwargs["stderr"] = PIPE
229
        try:
230
            output = check_output(cmd, encoding="utf8", **kwargs)  # nosec
231
        except CalledProcessError:
232
            logger.debug(f"Failed calling {' '.join(cmd)} in {cwd}", exc_info=True)
233
            if fail is not None and isinstance(fail, BaseException):
234
                raise fail
235
            elif fail is not None:
236
                logger.error(fail)
237
            return None
238
        else:
239
            logger.debug(f"Succeeded calling {' '.join(cmd)} in {cwd}", exc_info=True)
240
            if succeed is not None:
241
                logger.info(succeed)
242
            return output
243
244
    def _murder_evil_path_for_sure(self, evil_path: Path) -> None:
245
        """
246
        There are likely to be permission issues with .git directories.
247
248
        Args:
249
            evil_path: The .git directory
250
        """
251
        try:
252
            shutil.rmtree(str(evil_path))
253
        except OSError:
254
            logger.debug("Could not delete .git with rmtree", exc_info=True)
255
256
            def on_rm_error(func, path, exc_info):
257
                # from: https://stackoverflow.com/questions/4829043/how-to-remove-read-only-attrib-directory-with-python-in-windows
258
                os.chmod(path, stat.S_IWRITE)
259
                os.unlink(path)
260
261
            shutil.rmtree(str(evil_path), onerror=on_rm_error)
262
263
264
__all__ = ["New"]
265