1 | """Helper functions for working with version control systems.""" |
||
2 | import logging |
||
3 | import os |
||
4 | import subprocess |
||
5 | from shutil import which |
||
6 | |||
7 | from cookiecutter.exceptions import ( |
||
8 | RepositoryCloneFailed, |
||
9 | RepositoryNotFound, |
||
10 | UnknownRepoType, |
||
11 | VCSNotInstalled, |
||
12 | ) |
||
13 | from cookiecutter.utils import make_sure_path_exists, prompt_and_delete |
||
14 | |||
15 | logger = logging.getLogger(__name__) |
||
16 | |||
17 | |||
18 | BRANCH_ERRORS = [ |
||
19 | 'error: pathspec', |
||
20 | 'unknown revision', |
||
21 | ] |
||
22 | |||
23 | |||
24 | def identify_repo(repo_url): |
||
25 | """Determine if `repo_url` should be treated as a URL to a git or hg repo. |
||
26 | |||
27 | Repos can be identified by prepending "hg+" or "git+" to the repo URL. |
||
28 | |||
29 | :param repo_url: Repo URL of unknown type. |
||
30 | :returns: ('git', repo_url), ('hg', repo_url), or None. |
||
31 | """ |
||
32 | repo_url_values = repo_url.split('+') |
||
33 | if len(repo_url_values) == 2: |
||
34 | repo_type = repo_url_values[0] |
||
35 | if repo_type in ["git", "hg"]: |
||
36 | return repo_type, repo_url_values[1] |
||
37 | else: |
||
38 | raise UnknownRepoType |
||
39 | else: |
||
40 | if 'git' in repo_url: |
||
41 | return 'git', repo_url |
||
42 | elif 'bitbucket' in repo_url: |
||
43 | return 'hg', repo_url |
||
44 | else: |
||
45 | raise UnknownRepoType |
||
46 | |||
47 | |||
48 | def is_vcs_installed(repo_type): |
||
49 | """ |
||
50 | Check if the version control system for a repo type is installed. |
||
51 | |||
52 | :param repo_type: |
||
53 | """ |
||
54 | return bool(which(repo_type)) |
||
55 | |||
56 | |||
57 | def clone(repo_url, checkout=None, clone_to_dir='.', no_input=False): |
||
58 | """Clone a repo to the current directory. |
||
59 | |||
60 | :param repo_url: Repo URL of unknown type. |
||
61 | :param checkout: The branch, tag or commit ID to checkout after clone. |
||
62 | :param clone_to_dir: The directory to clone to. |
||
63 | Defaults to the current directory. |
||
64 | :param no_input: Suppress all user prompts when calling via API. |
||
65 | :returns: str with path to the new directory of the repository. |
||
66 | """ |
||
67 | # Ensure that clone_to_dir exists |
||
68 | clone_to_dir = os.path.expanduser(clone_to_dir) |
||
69 | make_sure_path_exists(clone_to_dir) |
||
70 | |||
71 | # identify the repo_type |
||
72 | repo_type, repo_url = identify_repo(repo_url) |
||
73 | |||
74 | # check that the appropriate VCS for the repo_type is installed |
||
75 | if not is_vcs_installed(repo_type): |
||
76 | msg = "'{0}' is not installed.".format(repo_type) |
||
77 | raise VCSNotInstalled(msg) |
||
78 | |||
79 | repo_url = repo_url.rstrip('/') |
||
80 | repo_name = os.path.split(repo_url)[1] |
||
81 | if repo_type == 'git': |
||
82 | repo_name = repo_name.split(':')[-1].rsplit('.git')[0] |
||
83 | repo_dir = os.path.normpath(os.path.join(clone_to_dir, repo_name)) |
||
84 | elif repo_type == 'hg': |
||
85 | repo_dir = os.path.normpath(os.path.join(clone_to_dir, repo_name)) |
||
86 | logger.debug('repo_dir is {0}'.format(repo_dir)) |
||
0 ignored issues
–
show
introduced
by
Loading history...
|
|||
87 | |||
88 | if os.path.isdir(repo_dir): |
||
89 | clone = prompt_and_delete(repo_dir, no_input=no_input) |
||
90 | else: |
||
91 | clone = True |
||
92 | |||
93 | if clone: |
||
94 | try: |
||
95 | subprocess.check_output( |
||
96 | [repo_type, 'clone', repo_url], |
||
97 | cwd=clone_to_dir, |
||
98 | stderr=subprocess.STDOUT, |
||
99 | ) |
||
100 | if checkout is not None: |
||
101 | subprocess.check_output( |
||
102 | [repo_type, 'checkout', checkout], |
||
103 | cwd=repo_dir, |
||
104 | stderr=subprocess.STDOUT, |
||
105 | ) |
||
106 | except subprocess.CalledProcessError as clone_error: |
||
107 | output = clone_error.output.decode('utf-8') |
||
108 | if 'not found' in output.lower(): |
||
109 | raise RepositoryNotFound( |
||
110 | 'The repository {} could not be found, ' |
||
111 | 'have you made a typo?'.format(repo_url) |
||
112 | ) |
||
113 | if any(error in output for error in BRANCH_ERRORS): |
||
114 | raise RepositoryCloneFailed( |
||
115 | 'The {} branch of repository {} could not found, ' |
||
116 | 'have you made a typo?'.format(checkout, repo_url) |
||
117 | ) |
||
118 | raise |
||
119 | |||
120 | return repo_dir |
||
121 |