bbarchivist.hashutils   A
last analyzed

Complexity

Total Complexity 39

Size/Duplication

Total Lines 409
Duplicated Lines 0 %

Test Coverage

Coverage 100%

Importance

Changes 0
Metric Value
eloc 168
dl 0
loc 409
rs 9.28
c 0
b 0
f 0
ccs 139
cts 139
cp 1
wmc 39

17 Functions

Rating   Name   Duplication   Size   Complexity  
A verifier() 0 19 4
A zlib_handler() 0 10 3
A verifier_config_loader() 0 27 1
A ssl_hash() 0 20 2
A verifier_individual() 0 23 2
A get_hashfunc() 0 24 1
A zlib_hash() 0 19 4
A hashfunc_reader() 0 19 4
A get_engine() 0 19 2
A hashlib_hash() 0 15 1
A hash_writer() 0 35 3
A hash_get() 0 25 2
A calculate_escreens() 0 33 2
A base_hash() 0 28 3
A filefilter() 0 14 1
A prep_verifier() 0 13 2
A verifier_config_writer() 0 14 2
1
#!/usr/bin/env python3
2 5
"""This module is used to generate file hashes/checksums."""
3
4 5
import concurrent.futures  # parallelization
5 5
import hashlib  # all other hashes
6 5
import hmac  # escreens is a hmac, news at 11
7 5
import os  # path work
8 5
import zlib  # crc32/adler32
9
10 5
from bbarchivist import bbconstants  # premade stuff
11 5
from bbarchivist import exceptions  # exceptions
12 5
from bbarchivist import iniconfig  # config parsing
13 5
from bbarchivist import utilities  # cores
14
15 5
__author__ = "Thurask"
16 5
__license__ = "WTFPL v2"
17 5
__copyright__ = "2015-2019 Thurask"
18
19
20 5
def zlib_hash(filepath, method, blocksize=16 * 1024 * 1024):
21
    """
22
    Return zlib-based (i.e. CRC32/Adler32) checksum of a file.
23
24
    :param filepath: File you wish to verify.
25
    :type filepath: str
26
27
    :param method: "crc32" or "adler32".
28
    :type method: str
29
30
    :param blocksize: How much of file to read at once. Default is 16MB.
31
    :type blocksize: int
32
    """
33 5
    hashfunc, seed = zlib_handler(method)
34 5
    with open(filepath, 'rb') as file:
35 5
        for chunk in iter(lambda: file.read(blocksize), b''):
36 5
            seed = hashfunc(chunk, seed)
37 5
    final = format(seed & 0xFFFFFFFF, "08x")
38 5
    return final
39
40
41 5
def zlib_handler(method):
42
    """
43
    Prepare hash method and seed depending on CRC32/Adler32.
44
45
    :param method: "crc32" or "adler32".
46
    :type method: str
47
    """
48 5
    hashfunc = zlib.crc32 if method == "crc32" else zlib.adler32
49 5
    seed = 0 if method == "crc32" else 1
50 5
    return hashfunc, seed
51
52
53 5
def hashlib_hash(filepath, engine, blocksize=16 * 1024 * 1024):
54
    """
55
    Return MD5/SHA-1/SHA-2/SHA-3 hash of a file.
56
57
    :param filepath: File you wish to verify.
58
    :type filepath: str
59
60
    :param engine: Hash object to update with file contents.
61
    :type engine: _hashlib.HASH
62
63
    :param blocksize: How much of file to read at once. Default is 16MB.
64
    :type blocksize: int
65
    """
66 5
    hashfunc_reader(filepath, engine, blocksize)
67 5
    return engine.hexdigest()
68
69
70 5
def hashfunc_reader(filepath, engine, blocksize=16 * 1024 * 1024):
71
    """
72
    Generate hash from file contents.
73
74
    :param filepath: File you wish to verify.
75
    :type filepath: str
76
77
    :param engine: Hash object to update with file contents.
78
    :type engine: _hashlib.HASH
79
80
    :param blocksize: How much of file to read at once. Default is 16MB.
81
    :type blocksize: int
82
    """
83 5
    with open(filepath, 'rb') as file:
84 5
        while True:
85 5
            data = file.read(blocksize)
86 5
            if not data:
87 5
                break
88 5
            engine.update(data)
89
90
91 5
def ssl_hash(filepath, method, blocksize=16 * 1024 * 1024):
92
    """
93
    Return SSL-library dependent hash of a file.
94
95
    :param filepath: File you wish to verify.
96
    :type filepath: str
97
98
    :param method: Method to use: algorithms in hashlib that are not guaranteed.
99
    :type method: str
100
101
    :param blocksize: How much of file to read at once. Default is 16MB.
102
    :type blocksize: int
103
    """
104 5
    try:
105 5
        engine = hashlib.new(method)
106 5
        hashfunc_reader(filepath, engine, blocksize)
107 5
        return engine.hexdigest()
108 5
    except ValueError as exc:
109 5
        msg = "{0} HASH FAILED".format(method.upper())
110 5
        exceptions.handle_exception(exc, msg, None)
111
112
113 5
def calculate_escreens(pin, app, uptime, duration=30):
114
    """
115
    Calculate key for the Engineering Screens based on input.
116
117
    :param pin: PIN to check. 8 character hexadecimal, lowercase.
118
    :type pin: str
119
120
    :param app: App version. 10.x.y.zzzz.
121
    :type app: str
122
123
    :param uptime: Uptime in ms.
124
    :type uptime: str
125
126
    :param duration: 1, 3, 6, 15, 30 (days).
127
    :type duration: str
128
    """
129
    #: Somehow, values for lifetimes for escreens.
130 5
    lifetimes = {
131
        1: "",
132
        3: "Hello my baby, hello my honey, hello my rag time gal",
133
        7: "He was a boy, and she was a girl, can I make it any more obvious?",
134
        15: "So am I, still waiting, for this world to stop hating?",
135
        30: "I love myself today, not like yesterday. I'm cool, I'm calm, I'm gonna be okay"
136
    }
137
    #: Escreens magic HMAC secret.
138 5
    ehmac = 'Up the time stream without a TARDIS'
139 5
    duration = int(duration)
140 5
    if duration not in [1, 3, 6, 15, 30]:
141 5
        duration = 1
142 5
    data = pin.lower() + app + str(uptime) + lifetimes[duration]
143 5
    newhmac = hmac.new(ehmac.encode(), data.encode(), digestmod=hashlib.sha1)
144 5
    key = newhmac.hexdigest()[:8]
145 5
    return key.upper()
146
147
148 5
def get_hashfunc(hashtype):
149
    """
150
    Get genericized hash function from hash type.
151
152
    :param hashtype: Hash type.
153
    :type hashtype: str
154
    """
155 5
    hashfuncs = {"adler32": zlib_hash,
156
                 "crc32": zlib_hash,
157
                 "md4": ssl_hash,
158
                 "sha0": ssl_hash,
159
                 "ripemd160": ssl_hash,
160
                 "whirlpool": ssl_hash,
161
                 "md5": hashlib_hash,
162
                 "sha1": hashlib_hash,
163
                 "sha224": hashlib_hash,
164
                 "sha256": hashlib_hash,
165
                 "sha384": hashlib_hash,
166
                 "sha512": hashlib_hash,
167
                 "sha3224": hashlib_hash,
168
                 "sha3256": hashlib_hash,
169
                 "sha3384": hashlib_hash,
170
                 "sha3512": hashlib_hash}
171 5
    return hashfuncs[hashtype]
172
173
174 5
def get_engine(hashtype):
175
    """
176
    Get hashlib engine from hash type.
177
178
    :param hashtype: Hash type.
179
    :type hashtype: str
180
    """
181 5
    hashengines = {"md5": hashlib.md5(),
182
                   "sha1": hashlib.sha1(),
183
                   "sha224": hashlib.sha224(),
184
                   "sha256": hashlib.sha256(),
185
                   "sha384": hashlib.sha384(),
186
                   "sha512": hashlib.sha512()}
187 5
    if utilities.new_enough(3, 6):
188 1
        hashengines.update({"sha3224": hashlib.sha3_224(),
189
                            "sha3256": hashlib.sha3_256(),
190
                            "sha3384": hashlib.sha3_384(),
191
                            "sha3512": hashlib.sha3_512()})
192 5
    return hashengines[hashtype]
193
194
195 5
def hash_get(filename, hashfunc, hashtype, workingdir, blocksize=16777216):
196
    """
197
    Generate and pretty format the hash result for a file.
198
199
    :param filename: File to hash.
200
    :type filename: str
201
202
    :param hashfunc: Hash function to use.
203
    :type hashfunc: function
204
205
    :param hashtype: Hash type.
206
    :type hashtype: str
207
208
    :param workingdir: Working directory.
209
    :type workingdir: str
210
211
    :param blocksize: Block size. Default is 16MB.
212
    :type blocksize: int
213
    """
214 5
    if hashfunc == hashlib_hash:
215 5
        method = get_engine(hashtype)
216
    else:
217 5
        method = hashtype
218 5
    result = hashfunc(os.path.join(workingdir, filename), method, blocksize)
219 5
    return "{0} {1}\n".format(result.upper(), os.path.basename(filename))
220
221
222 5
def base_hash(hashtype, source, workingdir, block, target, kwargs=None):
223
    """
224
    Generic hash function; get hash, write to file.
225
226
    :param hashtype: Hash type.
227
    :type hashtype: str
228
229
    :param source: File to be hashed; foobar.ext
230
    :type source: str
231
232
    :param workingdir: Path containing files you wish to verify.
233
    :type workingdir: str
234
235
    :param block: Blocksize, in bytes.
236
    :type block: int
237
238
    :param target: File to write to.
239
    :type target: file
240
241
    :param kwargs: Values. Refer to `:func:verifier_config_loader`.
242
    :type kwargs: dict
243
    """
244 5
    if kwargs[hashtype]:
245 5
        hash_generic = [hashtype.upper()]
246 5
        hashfunc = get_hashfunc(hashtype)
247 5
        hashtype2 = "sha" if hashtype == "sha0" else hashtype
248 5
        hash_generic.append(hash_get(source, hashfunc, hashtype2, workingdir, block))
249 5
        target.write("\n".join(hash_generic))
250
251
252 5
def hash_writer(source, dest, workingdir, kwargs=None):
253
    """
254
    Write per-file hashes.
255
256
    :param source: File to be hashed; foobar.ext
257
    :type source: str
258
259
    :param dest: Destination file; foobar.ext.cksum
260
    :type dest: str
261
262
    :param workingdir: Path containing files you wish to verify.
263
    :type workingdir: str
264
265
    :param kwargs: Values. Refer to `:func:verifier_config_loader`.
266
    :type kwargs: dict
267
    """
268 5
    block = int(kwargs['blocksize'])
269 5
    with open(dest, 'w') as target:
270 5
        base_hash("adler32", source, workingdir, block, target, kwargs)
271 5
        base_hash("crc32", source, workingdir, block, target, kwargs)
272 5
        base_hash("md4", source, workingdir, block, target, kwargs)
273 5
        base_hash("md5", source, workingdir, block, target, kwargs)
274 5
        base_hash("sha0", source, workingdir, block, target, kwargs)
275 5
        base_hash("sha1", source, workingdir, block, target, kwargs)
276 5
        base_hash("sha224", source, workingdir, block, target, kwargs)
277 5
        base_hash("sha256", source, workingdir, block, target, kwargs)
278 5
        base_hash("sha384", source, workingdir, block, target, kwargs)
279 5
        base_hash("sha512", source, workingdir, block, target, kwargs)
280 5
        base_hash("ripemd160", source, workingdir, block, target, kwargs)
281 5
        base_hash("whirlpool", source, workingdir, block, target, kwargs)
282 5
        if utilities.new_enough(3, 6):
283 1
            base_hash("sha3224", source, workingdir, block, target, kwargs)
284 1
            base_hash("sha3256", source, workingdir, block, target, kwargs)
285 1
            base_hash("sha3384", source, workingdir, block, target, kwargs)
286 1
            base_hash("sha3512", source, workingdir, block, target, kwargs)
287
288
289 5
def filefilter(file, workingdir, extras=()):
290
    """
291
    Check if file in folder is a folder, or if it's got a forbidden extension.
292
293
    :param file: File to be hashed.
294
    :type file: str
295
296
    :param workingdir: Path containing files you wish to verify.
297
    :type workingdir: str
298
299
    :param extras: Tuple of extra extensions.
300
    :type extras: tuple
301
    """
302 5
    return not (os.path.isdir(os.path.join(workingdir, file)) or file.endswith(bbconstants.SUPPS + extras))
303
304
305 5
def prep_verifier(ldir, selective=False):
306
    """
307
    Prepare files for verifier function.
308
309
    :param ldir: Path containing files you wish to verify.
310
    :type ldir: str
311
312
    :param selective: Filtering filenames/extensions. Default is false.
313
    :type selective: bool
314
    """
315 5
    exts = (".txt",) if selective else ()
316 5
    fxs = [os.path.join(ldir, afx) for afx in os.listdir(ldir) if filefilter(afx, ldir, exts)]
317 5
    return fxs
318
319
320 5
def verifier(ldir, kwargs=None, selective=False):
321
    """
322
    For all files in a directory, perform various hash/checksum functions.
323
    Take dict to define hashes, write output to a/individual .cksum file(s).
324
325
    :param ldir: Path containing files you wish to verify.
326
    :type ldir: str
327
328
    :param kwargs: Values. Refer to `:func:verifier_config_loader`.
329
    :type kwargs: dict
330
331
    :param selective: Filtering filenames/extensions. Default is false.
332
    :type selective: bool
333
    """
334 5
    kwargs = verifier_config_loader() if kwargs is None else kwargs
335 5
    fxs = prep_verifier(ldir, selective)
336 5
    with concurrent.futures.ThreadPoolExecutor(max_workers=utilities.cpu_workers(fxs)) as xec:
337 5
        for file in fxs:
338 5
            verifier_individual(xec, ldir, file, kwargs)
339
340
341 5
def verifier_individual(xec, ldir, file, kwargs):
342
    """
343
    Individually verify files through a ThreadPoolExecutor.
344
345
    :param xec: ThreadPoolExecutor instance.
346
    :type xec: concurrent.futures.ThreadPoolExecutor
347
348
    :param ldir: Path containing files you wish to verify.
349
    :type ldir: str
350
351
    :param file: Filename.
352
    :type file: str
353
354
    :param kwargs: Values. Refer to `:func:verifier_config_loader`.
355
    :type kwargs: dict
356
    """
357 5
    print("HASHING:", os.path.basename(file))
358 5
    basename = file + ".cksum"
359 5
    targetname = os.path.join(ldir, basename)
360 5
    try:
361 5
        xec.submit(hash_writer, file, targetname, ldir, kwargs)
362 5
    except Exception as exc:
363 5
        exceptions.handle_exception(exc)
364
365
366 5
def verifier_config_loader(homepath=None):
367
    """
368
    Read a ConfigParser file to get hash preferences.
369
370
    :param homepath: Folder containing ini file. Default is user directory.
371
    :type homepath: str
372
    """
373 5
    ini = iniconfig.generic_loader("hashmodes", homepath)
374 5
    results = {}
375 5
    results['crc32'] = bool(ini.getboolean('crc32', fallback=False))
376 5
    results['adler32'] = bool(ini.getboolean('adler32', fallback=False))
377 5
    results['sha0'] = bool(ini.getboolean('sha0', fallback=False))
378 5
    results['sha1'] = bool(ini.getboolean('sha1', fallback=True))
379 5
    results['sha224'] = bool(ini.getboolean('sha224', fallback=False))
380 5
    results['sha256'] = bool(ini.getboolean('sha256', fallback=True))
381 5
    results['sha384'] = bool(ini.getboolean('sha384', fallback=False))
382 5
    results['sha512'] = bool(ini.getboolean('sha512', fallback=False))
383 5
    results['md5'] = bool(ini.getboolean('md5', fallback=True))
384 5
    results['md4'] = bool(ini.getboolean('md4', fallback=False))
385 5
    results['ripemd160'] = bool(ini.getboolean('ripemd160', fallback=False))
386 5
    results['whirlpool'] = bool(ini.getboolean('whirlpool', fallback=False))
387 5
    results['blocksize'] = int(ini.getint('blocksize', fallback=16777216))
388 5
    results['sha3224'] = bool(ini.getboolean('sha3224', fallback=False))
389 5
    results['sha3256'] = bool(ini.getboolean('sha3256', fallback=False))
390 5
    results['sha3384'] = bool(ini.getboolean('sha3384', fallback=False))
391 5
    results['sha3512'] = bool(ini.getboolean('sha3512', fallback=False))
392 5
    return results
393
394
395 5
def verifier_config_writer(resultdict=None, homepath=None):
396
    """
397
    Write a ConfigParser file to store hash preferences.
398
399
    :param resultdict: Dictionary of results: {method, bool}
400
    :type resultdict: dict({str, bool})
401
402
    :param homepath: Folder containing ini file. Default is user directory.
403
    :type homepath: str
404
    """
405 5
    if resultdict is None:
406 5
        resultdict = verifier_config_loader()
407 5
    results = {method: str(flag).lower() for method, flag in resultdict.items()}
408
    iniconfig.generic_writer("hashmodes", results, homepath)
409