Completed
Push — master ( 35a5b7...cfb57c )
by John
03:59
created

gpg_prepends()   A

Complexity

Conditions 2

Size

Total Lines 14

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 5
CRAP Score 2

Importance

Changes 0
Metric Value
cc 2
dl 0
loc 14
ccs 5
cts 5
cp 1
crap 2
rs 9.4285
c 0
b 0
f 0
1
#!/usr/bin/env python3
2 4
"""This module is used to generate file PGP signatures."""
3
4 5
import os  # path work
5 5
import concurrent.futures  # parallelization
6 5
import gnupg  # interface b/w Python, GPG
7 5
from bbarchivist import bbconstants  # premade stuff
8 5
from bbarchivist import exceptions  # exceptions
9 5
from bbarchivist import hashutils  # filter stuff
10 5
from bbarchivist import utilities  # cores
11 5
from bbarchivist import iniconfig  # config parsing
12
13 5
__author__ = "Thurask"
14 5
__license__ = "WTFPL v2"
15 5
__copyright__ = "2015-2017 Thurask"
16
17
18 5
def gpgrunner(workingdir, keyid=None, pword=None, selective=False):
19
    """
20
    Create ASCII-armored PGP signatures for all files in a given directory, in parallel.
21
22
    :param workingdir: Path containing files you wish to verify.
23
    :type workingdir: str
24
25
    :param keyid: Key to use. 8-character hexadecimal, with or without 0x.
26
    :type keyid: str
27
28
    :param pword: Passphrase for given key.
29
    :type pword: str
30
31
    :param selective: Filtering filenames/extensions. Default is false.
32
    :type selective: bool
33
    """
34 5
    try:
35 5
        gpg = gnupg.GPG(options=["--no-use-agent"])
36 5
    except ValueError:
37 5
        print("COULD NOT FIND GnuPG!")
38 5
        raise SystemExit
39
    else:
40 5
        gpgrunner_clean(gpg, workingdir, keyid, pword, selective)
41
42
43 5
def gpgfile(filepath, gpginst, key=None, pword=None):
44
    """
45
    Make ASCII-armored signature files with a given private key.
46
    Takes an instance of gnupg.GPG().
47
48
    :param filepath: File you wish to verify.
49
    :type filepath: str
50
51
    :param gpginst: Instance of Python GnuPG executable.
52
    :type gpginst: gnupg.GPG()
53
54
    :param key: Key ID. 0xABCDEF01
55
    :type key: str
56
57
    :param pword: Passphrase for key.
58
    :type pword: str
59
    """
60 5
    with open(filepath, "rb") as file:
61 5
        fname = file.name + ".asc"
62 5
        gpginst.sign_file(file, detach=True, keyid=key, passphrase=pword, output=fname)
63
64
65 5
def prep_gpgkeyid(keyid=None):
66
    """
67
    Prepare GPG key ID.
68
69
    :param keyid: Key to use. 8-character hexadecimal, with or without 0x.
70
    :type keyid: str
71
    """
72 5
    return "0x" + keyid.upper() if not keyid.startswith("0x") else keyid.upper().replace("X", "x")
73
74
75 5
def prep_gpgrunner(workingdir, keyid=None):
76
    """
77
    Prepare key and files for gpgrunner function.
78
79
    :param workingdir: Path containing files you wish to verify.
80
    :type workingdir: str
81
82
    :param keyid: Key to use. 8-character hexadecimal, with or without 0x.
83
    :type keyid: str
84
    """
85 5
    keyid = prep_gpgkeyid(keyid)
86 5
    dirlist = os.listdir(workingdir)
87 5
    files = [file for file in dirlist if hashutils.filefilter(file, workingdir)]
88 5
    return keyid, files
89
90
91 5
def gpgrunner_clean(gpg, workingdir, keyid=None, pword=None, selective=False):
92
    """
93
    Run GPG signature generation after filtering out errors.
94
95
    :param gpg: Instance of Python GnuPG executable.
96
    :type gpg: gnupg.GPG()
97
98
    :param workingdir: Path containing files you wish to verify.
99
    :type workingdir: str
100
101
    :param keyid: Key to use. 8-character hexadecimal, with or without 0x.
102
    :type keyid: str
103
104
    :param pword: Passphrase for given key.
105
    :type pword: str
106
107
    :param selective: Filtering filenames/extensions. Default is false.
108
    :type selective: bool
109
    """
110 5
    keyid, files = prep_gpgrunner(workingdir, keyid)
111 5
    with concurrent.futures.ThreadPoolExecutor(max_workers=utilities.cpu_workers(files)) as xec:
112 5
        for file in files:
113 5
            gpgwriter(gpg, xec, file, workingdir, selective, keyid, pword)
114
115
116 5
def gpg_supps(selective=False):
117
    """
118
    Prepare list of support files.
119
120
    :param selective: Filtering filenames/extensions. Default is false.
121
    :type selective: bool
122
    """
123 5
    sup = bbconstants.SUPPS + (".txt",) if selective else bbconstants.SUPPS
124 5
    return sup
125
126
127 5
def gpg_prepends(file, selective=False):
128
    """
129
    Check if file matches certain criteria.
130
131
    :param file: File inside workingdir that is being verified.
132
    :type file: str
133
134
    :param selective: Filtering filenames/extensions. Default is false.
135
    :type selective: bool
136
    """
137 5
    aps = bbconstants.ARCSPLUS
138 5
    pfx = bbconstants.PREFIXES
139 5
    sentinel = (utilities.prepends(file, pfx, aps)) if selective else True
140 5
    return sentinel
141
142
143 5
def gpgwriter(gpg, xec, file, workingdir, selective=False, keyid=None, pword=None):
144
    """
145
    Write individual GPG signatures.
146
147
    :param gpg: Instance of Python GnuPG executable.
148
    :type gpg: gnupg.GPG()
149
150
    :param xec: ThreadPoolExecutor instance.
151
    :type xec: concurrent.futures.ThreadPoolExecutor
152
153
    :param file: File inside workingdir that is being verified.
154
    :type file: str
155
156
    :param workingdir: Path containing files you wish to verify.
157
    :type workingdir: str
158
159
    :param selective: Filtering filenames/extensions. Default is false.
160
    :type selective: bool
161
162
    :param keyid: Key to use. 8-character hexadecimal, with or without 0x.
163
    :type keyid: str
164
165
    :param pword: Passphrase for given key.
166
    :type pword: str
167
    """
168 5
    sup = gpg_supps(selective)
169 5
    if not file.endswith(sup):
170 5
        if gpg_prepends(file, selective):
171 5
            gpgwriter_clean(gpg, xec, file, workingdir, keyid, pword)
172
173
174 5
def gpgwriter_clean(gpg, xec, file, workingdir, keyid=None, pword=None):
175
    """
176
    Write individual GPG signatures after filtering file list.
177
178
    :param gpg: Instance of Python GnuPG executable.
179
    :type gpg: gnupg.GPG()
180
181
    :param xec: ThreadPoolExecutor instance.
182
    :type xec: concurrent.futures.ThreadPoolExecutor
183
184
    :param file: File inside workingdir that is being verified.
185
    :type file: str
186
187
    :param workingdir: Path containing files you wish to verify.
188
    :type workingdir: str
189
190
    :param keyid: Key to use. 8-character hexadecimal, with or without 0x.
191
    :type keyid: str
192
193
    :param pword: Passphrase for given key.
194
    :type pword: str
195
    """
196 5
    print("VERIFYING:", os.path.basename(file))
197 5
    thepath = os.path.join(workingdir, file)
198 5
    try:
199 5
        xec.submit(gpgfile, thepath, gpg, keyid, pword)
200 5
    except Exception as exc:
201 5
        exceptions.handle_exception(exc)
202
203
204 5
def gpg_config_loader(homepath=None):
205
    """
206
    Read a ConfigParser file to get PGP key, password (optional)
207
208
    :param homepath: Folder containing ini file. Default is user directory.
209
    :type homepath: str
210
    """
211 5
    config = iniconfig.generic_loader('gpgrunner', homepath)
212 5
    gpgkey = config.get('key', fallback=None)
213 5
    gpgpass = config.get('pass', fallback=None)
214 5
    return gpgkey, gpgpass
215
216
217 5
def gpg_config_writer(key=None, password=None, homepath=None):
218
    """
219
    Write a ConfigParser file to store PGP key, password (optional)
220
221
    :param key: Key ID, leave as None to not write.
222
    :type key: str
223
224
    :param password: Key password, leave as None to not write.
225
    :type password: str
226
227
    :param homepath: Folder containing ini file. Default is user directory.
228
    :type homepath: str
229
    """
230 5
    results = {}
231 5
    if key is not None:
232 5
        results["key"] = key
233 5
    if password is not None:
234 5
        results["pass"] = password
235
    iniconfig.generic_writer("gpgrunner", results, homepath)
236