Passed
Pull Request — master (#1445)
by
unknown
01:16
created

cookiecutter.vcs._delete_old_and_clone_new()   A

Complexity

Conditions 5

Size

Total Lines 13
Code Lines 12

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 12
dl 0
loc 13
rs 9.3333
c 0
b 0
f 0
cc 5
nop 5
1
"""Helper functions for working with version control systems."""
2
import logging
3
import os
4
import subprocess
5
from shutil import (
6
    which,
7
    move,
8
)
9
import sys
10
import tempfile
11
12
from cookiecutter.exceptions import (
13
    RepositoryCloneFailed,
14
    RepositoryNotFound,
15
    UnknownRepoType,
16
    VCSNotInstalled,
17
)
18
from cookiecutter.utils import (
19
    make_sure_path_exists,
20
    prompt_ok_to_delete,
21
    prompt_ok_to_reuse,
22
    rmtree,
23
)
24
25
logger = logging.getLogger(__name__)
26
27
28
BRANCH_ERRORS = [
29
    'error: pathspec',
30
    'unknown revision',
31
]
32
33
34
def identify_repo(repo_url):
35
    """Determine if `repo_url` should be treated as a URL to a git or hg repo.
36
37
    Repos can be identified by prepending "hg+" or "git+" to the repo URL.
38
39
    :param repo_url: Repo URL of unknown type.
40
    :returns: ('git', repo_url), ('hg', repo_url), or None.
41
    """
42
    repo_url_values = repo_url.split('+')
43
    if len(repo_url_values) == 2:
44
        repo_type = repo_url_values[0]
45
        if repo_type in ["git", "hg"]:
46
            return repo_type, repo_url_values[1]
47
        else:
48
            raise UnknownRepoType
49
    else:
50
        if 'git' in repo_url:
51
            return 'git', repo_url
52
        elif 'bitbucket' in repo_url:
53
            return 'hg', repo_url
54
        else:
55
            raise UnknownRepoType
56
57
58
def is_vcs_installed(repo_type):
59
    """
60
    Check if the version control system for a repo type is installed.
61
62
    :param repo_type:
63
    """
64
    return bool(which(repo_type))
65
66
67
def clone(repo_url, checkout=None, clone_to_dir='.', no_input=False):
68
    """Clone a repo to the current directory.
69
70
    :param repo_url: Repo URL of unknown type.
71
    :param checkout: The branch, tag or commit ID to checkout after clone.
72
    :param clone_to_dir: The directory to clone to.
73
                         Defaults to the current directory.
74
    :param no_input: Suppress all user prompts when calling via API.
75
    :returns: str with path to the new directory of the repository.
76
    """
77
    # Ensure that clone_to_dir exists
78
    clone_to_dir = os.path.expanduser(clone_to_dir)
79
    make_sure_path_exists(clone_to_dir)
80
81
    # identify the repo_type
82
    repo_type, repo_url = identify_repo(repo_url)
83
84
    # check that the appropriate VCS for the repo_type is installed
85
    if not is_vcs_installed(repo_type):
86
        msg = "'{0}' is not installed.".format(repo_type)
87
        raise VCSNotInstalled(msg)
88
89
    repo_url = repo_url.rstrip('/')
90
    repo_name = os.path.split(repo_url)[1]
91
    if repo_type == 'git':
92
        repo_name = repo_name.split(':')[-1].rsplit('.git')[0]
93
        repo_dir = os.path.normpath(os.path.join(clone_to_dir, repo_name))
94
    elif repo_type == 'hg':
95
        repo_dir = os.path.normpath(os.path.join(clone_to_dir, repo_name))
96
    logger.debug('repo_dir is {0}'.format(repo_dir))
0 ignored issues
show
introduced by
The variable repo_dir does not seem to be defined for all execution paths.
Loading history...
97
98
    if _need_to_clone(repo_dir, no_input):
99
        _delete_old_and_clone_new(
100
            repo_dir, repo_url, repo_type, clone_to_dir, checkout,
101
        )
102
    return repo_dir
103
104
105
def _need_to_clone(repo_dir, no_input):
106
    ok_to_delete = prompt_ok_to_delete(repo_dir, no_input=no_input)
107
108
    if ok_to_delete:
109
        ok_to_reuse = False
110
    else:
111
        ok_to_reuse = prompt_ok_to_reuse(repo_dir, no_input=no_input)
112
113
    if not ok_to_delete and not ok_to_reuse:
114
        sys.exit()
115
116
    need_to_clone = ok_to_delete and not ok_to_reuse
117
    return need_to_clone
118
119
120
def _delete_old_and_clone_new(repo_dir, repo_url, repo_type, clone_to_dir, checkout):
121
    with tempfile.TemporaryDirectory() as tmp_dir:
122
        backup_performed = os.path.exists(repo_dir)
123
        if backup_performed:
124
            backup_dir = os.path.join(tmp_dir, os.path.basename(repo_dir))
125
            _backup_and_delete_repo(repo_dir, backup_dir)
126
127
        try:
128
            _clone_repo(repo_dir, repo_url, repo_type, clone_to_dir, checkout)
129
        except subprocess.CalledProcessError as clone_error:
130
            if backup_performed:
131
                _restore_old_repo(repo_dir, backup_dir)
0 ignored issues
show
introduced by
The variable backup_dir does not seem to be defined in case backup_performed on line 123 is False. Are you sure this can never be the case?
Loading history...
132
            _handle_clone_error(clone_error, repo_url, checkout)
133
134
135
def _clone_repo(repo_dir, repo_url, repo_type, clone_to_dir, checkout):
136
    subprocess.check_output(
137
        [repo_type, 'clone', repo_url], cwd=clone_to_dir, stderr=subprocess.STDOUT,
138
    )
139
    if checkout is not None:
140
        subprocess.check_output(
141
            [repo_type, 'checkout', checkout], cwd=repo_dir, stderr=subprocess.STDOUT,
142
        )
143
144
145
def _handle_clone_error(clone_error, repo_url, checkout):
146
    output = clone_error.output.decode('utf-8')
147
    if 'not found' in output.lower():
148
        raise RepositoryNotFound(
149
            'The repository {} could not be found, '
150
            'have you made a typo?'.format(repo_url)
151
        )
152
    if any(error in output for error in BRANCH_ERRORS):
153
        raise RepositoryCloneFailed(
154
            'The {} branch of repository {} could not found, '
155
            'have you made a typo?'.format(checkout, repo_url)
156
        )
157
    raise
158
159
160
def _backup_and_delete_repo(path, backup_path):
161
    logger.info('Backing up repo {} to {}'.format(path, backup_path))
162
    move(path, backup_path)
163
    logger.info('Moving repo {} to {}'.format(path, backup_path))
164
165
166
def _restore_old_repo(path, backup_path):
167
    try:
168
        rmtree(path)
169
        logger.info('Cleaning {}'.format(path))
170
    except FileNotFoundError:
171
        pass
172
173
    logger.info('Restoring backup repo {}'.format(backup_path))
174
    move(backup_path, path)
175
    logger.info('Restored {} successfully'.format(path))
176