bbarchivist.gpgutils.gpg_prepends()   A
last analyzed

Complexity

Conditions 2

Size

Total Lines 14
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 5
CRAP Score 2

Importance

Changes 0
Metric Value
cc 2
eloc 5
nop 2
dl 0
loc 14
ccs 5
cts 5
cp 1
crap 2
rs 10
c 0
b 0
f 0
1
#!/usr/bin/env python3
2 5
"""This module is used to generate file PGP signatures."""
3
4 5
import concurrent.futures  # parallelization
5 5
import os  # path work
6
7 5
import gnupg  # interface b/w Python, GPG
8 5
from bbarchivist import bbconstants  # premade stuff
9 5
from bbarchivist import exceptions  # exceptions
10 5
from bbarchivist import hashutils  # filter stuff
11 5
from bbarchivist import iniconfig  # config parsing
12 5
from bbarchivist import utilities  # cores
13
14 5
__author__ = "Thurask"
15 5
__license__ = "WTFPL v2"
16 5
__copyright__ = "2015-2019 Thurask"
17
18
19 5
def gpgrunner(workingdir, keyid=None, pword=None, selective=False):
20
    """
21
    Create ASCII-armored PGP signatures for all files in a given directory, in parallel.
22
23
    :param workingdir: Path containing files you wish to verify.
24
    :type workingdir: str
25
26
    :param keyid: Key to use. 8-character hexadecimal, with or without 0x.
27
    :type keyid: str
28
29
    :param pword: Passphrase for given key.
30
    :type pword: str
31
32
    :param selective: Filtering filenames/extensions. Default is false.
33
    :type selective: bool
34
    """
35 5
    try:
36 5
        gpg = gnupg.GPG(options=["--no-use-agent"])
37 5
    except ValueError:
38 5
        print("COULD NOT FIND GnuPG!")
39 5
        raise SystemExit
40
    else:
41 5
        gpgrunner_clean(gpg, workingdir, keyid, pword, selective)
42
43
44 5
def gpgfile(filepath, gpginst, key=None, pword=None):
45
    """
46
    Make ASCII-armored signature files with a given private key.
47
    Takes an instance of gnupg.GPG().
48
49
    :param filepath: File you wish to verify.
50
    :type filepath: str
51
52
    :param gpginst: Instance of Python GnuPG executable.
53
    :type gpginst: gnupg.GPG()
54
55
    :param key: Key ID. 0xABCDEF01
56
    :type key: str
57
58
    :param pword: Passphrase for key.
59
    :type pword: str
60
    """
61 5
    with open(filepath, "rb") as file:
62 5
        fname = file.name + ".asc"
63 5
        gpginst.sign_file(file, detach=True, keyid=key, passphrase=pword, output=fname)
64
65
66 5
def prep_gpgkeyid(keyid=None):
67
    """
68
    Prepare GPG key ID.
69
70
    :param keyid: Key to use. 8-character hexadecimal, with or without 0x.
71
    :type keyid: str
72
    """
73 5
    return "0x" + keyid.upper() if not keyid.startswith("0x") else keyid.upper().replace("X", "x")
74
75
76 5
def prep_gpgrunner(workingdir, keyid=None):
77
    """
78
    Prepare key and files for gpgrunner function.
79
80
    :param workingdir: Path containing files you wish to verify.
81
    :type workingdir: str
82
83
    :param keyid: Key to use. 8-character hexadecimal, with or without 0x.
84
    :type keyid: str
85
    """
86 5
    keyid = prep_gpgkeyid(keyid)
87 5
    dirlist = os.listdir(workingdir)
88 5
    files = [file for file in dirlist if hashutils.filefilter(file, workingdir)]
89 5
    return keyid, files
90
91
92 5
def gpgrunner_clean(gpg, workingdir, keyid=None, pword=None, selective=False):
93
    """
94
    Run GPG signature generation after filtering out errors.
95
96
    :param gpg: Instance of Python GnuPG executable.
97
    :type gpg: gnupg.GPG()
98
99
    :param workingdir: Path containing files you wish to verify.
100
    :type workingdir: str
101
102
    :param keyid: Key to use. 8-character hexadecimal, with or without 0x.
103
    :type keyid: str
104
105
    :param pword: Passphrase for given key.
106
    :type pword: str
107
108
    :param selective: Filtering filenames/extensions. Default is false.
109
    :type selective: bool
110
    """
111 5
    keyid, files = prep_gpgrunner(workingdir, keyid)
112 5
    with concurrent.futures.ThreadPoolExecutor(max_workers=utilities.cpu_workers(files)) as xec:
113 5
        for file in files:
114 5
            gpgwriter(gpg, xec, file, workingdir, selective, keyid, pword)
115
116
117 5
def gpg_supps(selective=False):
118
    """
119
    Prepare list of support files.
120
121
    :param selective: Filtering filenames/extensions. Default is false.
122
    :type selective: bool
123
    """
124 5
    sup = bbconstants.SUPPS + (".txt",) if selective else bbconstants.SUPPS
125 5
    return sup
126
127
128 5
def gpg_prepends(file, selective=False):
129
    """
130
    Check if file matches certain criteria.
131
132
    :param file: File inside workingdir that is being verified.
133
    :type file: str
134
135
    :param selective: Filtering filenames/extensions. Default is false.
136
    :type selective: bool
137
    """
138 5
    aps = bbconstants.ARCSPLUS
139 5
    pfx = bbconstants.PREFIXES
140 5
    sentinel = (utilities.prepends(file, pfx, aps)) if selective else True
141 5
    return sentinel
142
143
144 5
def gpgwriter(gpg, xec, file, workingdir, selective=False, keyid=None, pword=None):
145
    """
146
    Write individual GPG signatures.
147
148
    :param gpg: Instance of Python GnuPG executable.
149
    :type gpg: gnupg.GPG()
150
151
    :param xec: ThreadPoolExecutor instance.
152
    :type xec: concurrent.futures.ThreadPoolExecutor
153
154
    :param file: File inside workingdir that is being verified.
155
    :type file: str
156
157
    :param workingdir: Path containing files you wish to verify.
158
    :type workingdir: str
159
160
    :param selective: Filtering filenames/extensions. Default is false.
161
    :type selective: bool
162
163
    :param keyid: Key to use. 8-character hexadecimal, with or without 0x.
164
    :type keyid: str
165
166
    :param pword: Passphrase for given key.
167
    :type pword: str
168
    """
169 5
    sup = gpg_supps(selective)
170 5
    if not file.endswith(sup):
171 5
        if gpg_prepends(file, selective):
172 5
            gpgwriter_clean(gpg, xec, file, workingdir, keyid, pword)
173
174
175 5
def gpgwriter_clean(gpg, xec, file, workingdir, keyid=None, pword=None):
176
    """
177
    Write individual GPG signatures after filtering file list.
178
179
    :param gpg: Instance of Python GnuPG executable.
180
    :type gpg: gnupg.GPG()
181
182
    :param xec: ThreadPoolExecutor instance.
183
    :type xec: concurrent.futures.ThreadPoolExecutor
184
185
    :param file: File inside workingdir that is being verified.
186
    :type file: str
187
188
    :param workingdir: Path containing files you wish to verify.
189
    :type workingdir: str
190
191
    :param keyid: Key to use. 8-character hexadecimal, with or without 0x.
192
    :type keyid: str
193
194
    :param pword: Passphrase for given key.
195
    :type pword: str
196
    """
197 5
    print("VERIFYING:", os.path.basename(file))
198 5
    thepath = os.path.join(workingdir, file)
199 5
    try:
200 5
        xec.submit(gpgfile, thepath, gpg, keyid, pword)
201 5
    except Exception as exc:
202 5
        exceptions.handle_exception(exc)
203
204
205 5
def gpg_config_loader(homepath=None):
206
    """
207
    Read a ConfigParser file to get PGP key, password (optional)
208
209
    :param homepath: Folder containing ini file. Default is user directory.
210
    :type homepath: str
211
    """
212 5
    config = iniconfig.generic_loader('gpgrunner', homepath)
213 5
    gpgkey = config.get('key', fallback=None)
214 5
    gpgpass = config.get('pass', fallback=None)
215 5
    return gpgkey, gpgpass
216
217
218 5
def gpg_config_writer(key=None, password=None, homepath=None):
219
    """
220
    Write a ConfigParser file to store PGP key, password (optional)
221
222
    :param key: Key ID, leave as None to not write.
223
    :type key: str
224
225
    :param password: Key password, leave as None to not write.
226
    :type password: str
227
228
    :param homepath: Folder containing ini file. Default is user directory.
229
    :type homepath: str
230
    """
231 5
    results = {}
232 5
    if key is not None:
233 5
        results["key"] = key
234 5
    if password is not None:
235 5
        results["pass"] = password
236
    iniconfig.generic_writer("gpgrunner", results, homepath)
237