cookiecutter.zipfile   A
last analyzed

Complexity

Total Complexity 17

Size/Duplication

Total Lines 121
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 17
eloc 70
dl 0
loc 121
rs 10
c 0
b 0
f 0

1 Function

Rating   Name   Duplication   Size   Complexity  
F unzip() 0 106 17
1
"""Utility functions for handling and fetching repo archives in zip format."""
2
import os
3
import tempfile
4
from pathlib import Path
5
from typing import Optional
6
from zipfile import BadZipFile, ZipFile
7
8
import requests
9
10
from cookiecutter.exceptions import InvalidZipRepository
11
from cookiecutter.prompt import read_repo_password
12
from cookiecutter.utils import make_sure_path_exists, prompt_and_delete
13
14
15
def unzip(
16
    zip_uri: str,
17
    is_url: bool,
18
    clone_to_dir: "os.PathLike[str]" = ".",
19
    no_input: bool = False,
20
    password: Optional[str] = None,
21
):
22
    """Download and unpack a zipfile at a given URI.
23
24
    This will download the zipfile to the cookiecutter repository,
25
    and unpack into a temporary directory.
26
27
    :param zip_uri: The URI for the zipfile.
28
    :param is_url: Is the zip URI a URL or a file?
29
    :param clone_to_dir: The cookiecutter repository directory
30
        to put the archive into.
31
    :param no_input: Do not prompt for user input and eventually force a refresh of
32
        cached resources.
33
    :param password: The password to use when unpacking the repository.
34
    """
35
    # Ensure that clone_to_dir exists
36
    clone_to_dir = Path(clone_to_dir).expanduser()
37
    make_sure_path_exists(clone_to_dir)
38
39
    if is_url:
40
        # Build the name of the cached zipfile,
41
        # and prompt to delete if it already exists.
42
        identifier = zip_uri.rsplit('/', 1)[1]
43
        zip_path = os.path.join(clone_to_dir, identifier)
44
45
        if os.path.exists(zip_path):
46
            download = prompt_and_delete(zip_path, no_input=no_input)
47
        else:
48
            download = True
49
50
        if download:
51
            # (Re) download the zipfile
52
            r = requests.get(zip_uri, stream=True)
53
            with open(zip_path, 'wb') as f:
54
                for chunk in r.iter_content(chunk_size=1024):
55
                    if chunk:  # filter out keep-alive new chunks
56
                        f.write(chunk)
57
    else:
58
        # Just use the local zipfile as-is.
59
        zip_path = os.path.abspath(zip_uri)
60
61
    # Now unpack the repository. The zipfile will be unpacked
62
    # into a temporary directory
63
    try:
64
        zip_file = ZipFile(zip_path)
65
66
        if len(zip_file.namelist()) == 0:
67
            raise InvalidZipRepository(f'Zip repository {zip_uri} is empty')
68
69
        # The first record in the zipfile should be the directory entry for
70
        # the archive. If it isn't a directory, there's a problem.
71
        first_filename = zip_file.namelist()[0]
72
        if not first_filename.endswith('/'):
73
            raise InvalidZipRepository(
74
                f"Zip repository {zip_uri} does not include a top-level directory"
75
            )
76
77
        # Construct the final target directory
78
        project_name = first_filename[:-1]
79
        unzip_base = tempfile.mkdtemp()
80
        unzip_path = os.path.join(unzip_base, project_name)
81
82
        # Extract the zip file into the temporary directory
83
        try:
84
            zip_file.extractall(path=unzip_base)
85
        except RuntimeError:
86
            # File is password protected; try to get a password from the
87
            # environment; if that doesn't work, ask the user.
88
            if password is not None:
89
                try:
90
                    zip_file.extractall(path=unzip_base, pwd=password.encode('utf-8'))
91
                except RuntimeError:
92
                    raise InvalidZipRepository(
93
                        'Invalid password provided for protected repository'
94
                    )
95
            elif no_input:
96
                raise InvalidZipRepository(
97
                    'Unable to unlock password protected repository'
98
                )
99
            else:
100
                retry = 0
101
                while retry is not None:
102
                    try:
103
                        password = read_repo_password('Repo password')
104
                        zip_file.extractall(
105
                            path=unzip_base, pwd=password.encode('utf-8')
106
                        )
107
                        retry = None
108
                    except RuntimeError:
109
                        retry += 1
110
                        if retry == 3:
111
                            raise InvalidZipRepository(
112
                                'Invalid password provided for protected repository'
113
                            )
114
115
    except BadZipFile:
116
        raise InvalidZipRepository(
117
            f'Zip repository {zip_uri} is not a valid zip archive:'
118
        )
119
120
    return unzip_path
121