Completed
Push — master ( d56c20...c7f92e )
by John
01:16
created

verifier_config_writer()   A

Complexity

Conditions 3

Size

Total Lines 14

Duplication

Lines 0
Ratio 0 %

Importance

Changes 2
Bugs 0 Features 0
Metric Value
cc 3
c 2
b 0
f 0
dl 0
loc 14
rs 9.4285
1
#!/usr/bin/env python3
2
"""This module is used to generate file hashes/checksums and PGP signatures."""
3
4
import zlib  # crc32/adler32
5
import hashlib  # all other hashes
6
import hmac  # escreens is a hmac, news at 11
7
import os  # path work
8
import concurrent.futures  # parallelization
9
import gnupg  # interface b/w Python, GPG
10
from bbarchivist import bbconstants  # premade stuff
11
from bbarchivist import utilities  # cores
12
from bbarchivist import iniconfig  # config parsing
13
14
__author__ = "Thurask"
15
__license__ = "WTFPL v2"
16
__copyright__ = "Copyright 2015-2016 Thurask"
17
18
19
def hc32(filepath, blocksize=16 * 1024 * 1024):
20
    """
21
    Return CRC32 checksum of a file.
22
23
    :param filepath: File you wish to verify.
24
    :type filepath: str
25
26
    :param blocksize: How much of file to read at once.
27
    :type blocksize: int
28
    """
29
    seed = 0
30
    with open(filepath, 'rb') as file:
31
        for chunk in iter(lambda: file.read(blocksize), b''):
32
            seed = zlib.crc32(chunk, seed)
33
    final = format(seed & 0xFFFFFFFF, "08x")
34
    return final
35
36
37
def ha32(filepath, blocksize=16 * 1024 * 1024):
38
    """
39
    Return Adler32 checksum of a file.
40
41
    :param filepath: File you wish to verify.
42
    :type filepath: str
43
44
    :param blocksize: How much of file to read at once.
45
    :type blocksize: int
46
    """
47
    asum = 1
48
    with open(filepath, 'rb') as file:
49
        while True:
50
            data = file.read(blocksize)
51
            if not data:
52
                break
53
            asum = zlib.adler32(data, asum)
54
            if asum < 0:
55
                asum += 2 ** 32
56
    final = format(asum & 0xFFFFFFFF, "08x")
57
    return final
58
59
60
def hs1(filepath, blocksize=16 * 1024 * 1024):
61
    """
62
    Return SHA-1 hash of a file.
63
64
    :param filepath: File you wish to verify.
65
    :type filepath: str
66
67
    :param blocksize: How much of file to read at once.
68
    :type blocksize: int
69
    """
70
    sha1 = hashlib.sha1()
71
    hashfunc_reader(filepath, sha1, blocksize)
72
    return sha1.hexdigest()
73
74
75
def hs224(filepath, blocksize=16 * 1024 * 1024):
76
    """
77
    Return SHA-224 hash of a file.
78
79
    :param filepath: File you wish to verify.
80
    :type filepath: str
81
82
    :param blocksize: How much of file to read at once.
83
    :type blocksize: int
84
    """
85
    sha224 = hashlib.sha224()
86
    hashfunc_reader(filepath, sha224, blocksize)
87
    return sha224.hexdigest()
88
89
90
def hs256(filepath, blocksize=16 * 1024 * 1024):
91
    """
92
    Return SHA-256 hash of a file.
93
94
    :param filepath: File you wish to verify.
95
    :type filepath: str
96
97
    :param blocksize: How much of file to read at once.
98
    :type blocksize: int
99
    """
100
    sha256 = hashlib.sha256()
101
    hashfunc_reader(filepath, sha256, blocksize)
102
    return sha256.hexdigest()
103
104
105
def hs384(filepath, blocksize=16 * 1024 * 1024):
106
    """
107
    Return SHA-384 hash of a file.
108
109
    :param filepath: File you wish to verify.
110
    :type filepath: str
111
112
    :param blocksize: How much of file to read at once.
113
    :type blocksize: int
114
    """
115
    sha384 = hashlib.sha384()
116
    hashfunc_reader(filepath, sha384, blocksize)
117
    return sha384.hexdigest()
118
119
120
def hs512(filepath, blocksize=16 * 1024 * 1024):
121
    """
122
    Return SHA-512 hash of a file.
123
124
    :param filepath: File you wish to verify.
125
    :type filepath: str
126
127
    :param blocksize: How much of file to read at once.
128
    :type blocksize: int
129
    """
130
    sha512 = hashlib.sha512()
131
    hashfunc_reader(filepath, sha512, blocksize)
132
    return sha512.hexdigest()
133
134
135
def hs3224(filepath, blocksize=16 * 1024 * 1024):
136
    """
137
    Return SHA3-224 hash of a file.
138
139
    :param filepath: File you wish to verify.
140
    :type filepath: str
141
142
    :param blocksize: How much of file to read at once.
143
    :type blocksize: int
144
    """
145
    try:
146
        sha3224 = hashlib.sha3_224()
147
    except AttributeError:
148
        print("REQUIRES PYTHON 3.6+")
149
    else:
150
        hashfunc_reader(filepath, sha3224, blocksize)
151
        return sha3224.hexdigest()
152
153
154
def hs3256(filepath, blocksize=16 * 1024 * 1024):
155
    """
156
    Return SHA3-256 hash of a file.
157
158
    :param filepath: File you wish to verify.
159
    :type filepath: str
160
161
    :param blocksize: How much of file to read at once.
162
    :type blocksize: int
163
    """
164
    try:
165
        sha3256 = hashlib.sha3_256()
166
    except AttributeError:
167
        print("REQUIRES PYTHON 3.6+")
168
    else:
169
        hashfunc_reader(filepath, sha3256, blocksize)
170
        return sha3256.hexdigest()
171
172
173
def hs3384(filepath, blocksize=16 * 1024 * 1024):
174
    """
175
    Return SHA3-384 hash of a file.
176
177
    :param filepath: File you wish to verify.
178
    :type filepath: str
179
180
    :param blocksize: How much of file to read at once.
181
    :type blocksize: int
182
    """
183
    try:
184
        sha3384 = hashlib.sha3_384()
185
    except AttributeError:
186
        print("REQUIRES PYTHON 3.6+")
187
    else:
188
        hashfunc_reader(filepath, sha3384, blocksize)
189
        return sha3384.hexdigest()
190
191
192
def hs3512(filepath, blocksize=16 * 1024 * 1024):
193
    """
194
    Return SHA3-512 hash of a file.
195
196
    :param filepath: File you wish to verify.
197
    :type filepath: str
198
199
    :param blocksize: How much of file to read at once.
200
    :type blocksize: int
201
    """
202
    try:
203
        sha3512 = hashlib.sha3_512()
204
    except AttributeError:
205
        print("REQUIRES PYTHON 3.6+")
206
    else:
207
        hashfunc_reader(filepath, sha3512, blocksize)
208
        return sha3512.hexdigest()
209
210
211
def hm5(filepath, blocksize=16 * 1024 * 1024):
212
    """
213
    Return MD5 hash of a file.
214
215
    :param filepath: File you wish to verify.
216
    :type filepath: str
217
218
    :param blocksize: How much of file to read at once.
219
    :type blocksize: int
220
    """
221
    md5 = hashlib.md5()
222
    hashfunc_reader(filepath, md5, blocksize)
223
    return md5.hexdigest()
224
225
226
def hashfunc_reader(filepath, engine, blocksize=16 * 1024 * 1024):
227
    """
228
    Generate hash from file contents.
229
230
    :param filepath: File you wish to verify.
231
    :type filepath: str
232
233
    :param engine: Hash object to update with file contents.
234
    :type engine: _hashlib.HASH
235
236
    :param blocksize: How much of file to read at once.
237
    :type blocksize: int
238
    """
239
    with open(filepath, 'rb') as file:
240
        while True:
241
            data = file.read(blocksize)
242
            if not data:
243
                break
244
            engine.update(data)
245
246
247
def ssl_hash(filepath, method, blocksize=16 * 1024 * 1024):
248
    """
249
    Return SSL-library dependent hash of a file.
250
251
    :param filepath: File you wish to verify.
252
    :type filepath: str
253
254
    :param method: Method to use: algorithms in hashlib that are not guaranteed.
255
    :type method: str
256
257
    :param blocksize: How much of file to read at once.
258
    :type blocksize: int
259
    """
260
    try:
261
        engine = hashlib.new(method)
262
        hashfunc_reader(filepath, engine, blocksize)
263
        return engine.hexdigest()
264
    except ValueError as exc:
265
        print(str(exc))
266
        print("{0} HASH FAILED".format(method.upper()))
267
268
269
def hm4(filepath, blocksize=16 * 1024 * 1024):
270
    """
271
    Return MD4 hash of a file; depends on system SSL library.
272
273
    :param filepath: File you wish to verify.
274
    :type filepath: str
275
276
    :param blocksize: How much of file to read at once.
277
    :type blocksize: int
278
    """
279
    return ssl_hash(filepath, "md4", blocksize)
280
281
282
def hr160(filepath, blocksize=16 * 1024 * 1024):
283
    """
284
    Return RIPEMD160 hash of a file; depends on system SSL library.
285
286
    :param filepath: File you wish to verify.
287
    :type filepath: str
288
289
    :param blocksize: How much of file to read at once.
290
    :type blocksize: int
291
    """
292
    return ssl_hash(filepath, "ripemd160", blocksize)
293
294
295
def hwp(filepath, blocksize=16 * 1024 * 1024):
296
    """
297
    Return Whirlpool hash of a file; depends on system SSL library.
298
299
    :param filepath: File you wish to verify.
300
    :type filepath: str
301
302
    :param blocksize: How much of file to read at once.
303
    :type blocksize: int
304
    """
305
    return ssl_hash(filepath, "whirlpool", blocksize)
306
307
308
def hs0(filepath, blocksize=16 * 1024 * 1024):
309
    """
310
    Return SHA-0 hash of a file; depends on system SSL library.
311
312
    :param filepath: File you wish to verify.
313
    :type filepath: str
314
315
    :param blocksize: How much of file to read at once.
316
    :type blocksize: int
317
    """
318
    return ssl_hash(filepath, "sha", blocksize)
319
320
321
def gpgfile(filepath, gpginst, key=None, pword=None):
322
    """
323
    Make ASCII-armored signature files with a given private key.
324
    Takes an instance of gnupg.GPG().
325
326
    :param filepath: File you wish to verify.
327
    :type filepath: str
328
329
    :param gpginst: Instance of Python GnuPG executable.
330
    :type gpginst: gnupg.GPG()
331
332
    :param key: Key ID. 0xABCDEF01
333
    :type key: str
334
335
    :param pword: Passphrase for key.
336
    :type pword: str
337
    """
338
    with open(filepath, "rb") as file:
339
        fname = file.name + ".asc"
340
        gpginst.sign_file(file, detach=True, keyid=key, passphrase=pword, output=fname)
341
342
343
def calculate_escreens(pin, app, uptime, duration=30):
344
    """
345
    Calculate key for the Engineering Screens based on input.
346
347
    :param pin: PIN to check. 8 character hexadecimal, lowercase.
348
    :type pin: str
349
350
    :param app: App version. 10.x.y.zzzz.
351
    :type app: str
352
353
    :param uptime: Uptime in ms.
354
    :type uptime: str
355
356
    :param duration: 1, 3, 6, 15, 30 (days).
357
    :type duration: str
358
    """
359
    #: Somehow, values for lifetimes for escreens.
360
    lifetimes = {
361
        1: "",
362
        3: "Hello my baby, hello my honey, hello my rag time gal",
363
        7: "He was a boy, and she was a girl, can I make it any more obvious?",
364
        15: "So am I, still waiting, for this world to stop hating?",
365
        30: "I love myself today, not like yesterday. "
366
    }
367
    lifetimes[30] += "I'm cool, I'm calm, I'm gonna be okay"
368
    #: Escreens magic HMAC secret.
369
    secret = 'Up the time stream without a TARDIS'
370
    duration = int(duration)
371
    if duration not in [1, 3, 6, 15, 30]:
372
        duration = 1
373
    data = pin.lower() + app + str(uptime) + lifetimes[duration]
374
    newhmac = hmac.new(secret.encode(), data.encode(), digestmod=hashlib.sha1)
375
    key = newhmac.hexdigest()[:8]
376
    return key.upper()
377
378
379
def hash_get(filename, hashfunc, workingdir, blocksize=16777216):
380
    """
381
    Generate and pretty format the hash result for a file.
382
383
    :param filename: File to hash.
384
    :type filename: str
385
386
    :param hashfunc: Hash function to use.
387
    :type hashfunc: function
388
389
    :param workingdir: Working directory.
390
    :type workingdir: str
391
392
    :param blocksize: Block size. Default is 16MB.
393
    :type blocksize: int
394
    """
395
    result = hashfunc(os.path.join(workingdir, filename), blocksize)
396
    return "{0} {1}\n".format(result.upper(), os.path.basename(filename))
397
398
399
def base_hash(hashtype, source, workingdir, block, hashfunc, target, kwargs=None):
400
    """
401
    Generic hash function; get hash, write to file.
402
403
    :param hashtype: Hash type.
404
    :type hashtype: str
405
406
    :param source: File to be hashed; foobar.ext
407
    :type source: str
408
409
    :param workingdir: Path containing files you wish to verify.
410
    :type workingdir: str
411
412
    :param block: Blocksize, in bytes.
413
    :type block: int
414
415
    :param target: File to write to.
416
    :type target: file
417
418
    :param kwargs: Values. Refer to `:func:verifier_config_loader`.
419
    :type kwargs: dict
420
    """
421
    if kwargs[hashtype]:
422
        hash_generic = [hashtype.upper()]
423
        hash_generic.append(hash_get(source, hashfunc, workingdir, block))
424
        target.write("\n".join(hash_generic))
425
426
427
def hash_writer(source, dest, workingdir, kwargs=None):
428
    """
429
    Write per-file hashes.
430
431
    :param source: File to be hashed; foobar.ext
432
    :type source: str
433
434
    :param dest: Destination file; foobar.ext.cksum
435
    :type dest: str
436
437
    :param workingdir: Path containing files you wish to verify.
438
    :type workingdir: str
439
440
    :param kwargs: Values. Refer to `:func:verifier_config_loader`.
441
    :type kwargs: dict
442
    """
443
    block = int(kwargs['blocksize'])
444
    with open(dest, 'w') as target:
445
        base_hash("adler32", source, workingdir, block, ha32, target, kwargs)
446
        base_hash("crc32", source, workingdir, block, hc32, target, kwargs)
447
        base_hash("md4", source, workingdir, block, hm4, target, kwargs)
448
        base_hash("md5", source, workingdir, block, hm5, target, kwargs)
449
        base_hash("sha0", source, workingdir, block, hs0, target, kwargs)
450
        base_hash("sha1", source, workingdir, block, hs1, target, kwargs)
451
        base_hash("sha224", source, workingdir, block, hs224, target, kwargs)
452
        base_hash("sha256", source, workingdir, block, hs256, target, kwargs)
453
        base_hash("sha384", source, workingdir, block, hs384, target, kwargs)
454
        base_hash("sha512", source, workingdir, block, hs512, target, kwargs)
455
        base_hash("ripemd160", source, workingdir, block, hr160, target, kwargs)
456
        base_hash("whirlpool", source, workingdir, block, hwp, target, kwargs)
457
        base_hash("sha3224", source, workingdir, block, hs3224, target, kwargs)
458
        base_hash("sha3256", source, workingdir, block, hs3256, target, kwargs)
459
        base_hash("sha3384", source, workingdir, block, hs3384, target, kwargs)
460
        base_hash("sha3512", source, workingdir, block, hs3512, target, kwargs)
461
462
463
def filefilter(file, workingdir, extras=()):
464
    """
465
    Check if file in folder is a folder, or if it's got a forbidden extension.
466
467
    :param file: File to be hashed.
468
    :type file: str
469
470
    :param workingdir: Path containing files you wish to verify.
471
    :type workingdir: str
472
473
    :param extras: Tuple of extra extensions.
474
    :type extras: tuple
475
    """
476
    return not (os.path.isdir(os.path.join(workingdir, file)) or file.endswith(bbconstants.SUPPS + extras))
477
478
479
def verifier(ldir, kwargs=None, selective=False):
480
    """
481
    For all files in a directory, perform various hash/checksum functions.
482
    Take dict to define hashes, write output to a/individual .cksum file(s).
483
484
    :param ldir: Path containing files you wish to verify.
485
    :type ldir: str
486
487
    :param kwargs: Values. Refer to `:func:verifier_config_loader`.
488
    :type kwargs: dict
489
    """
490
    if kwargs is None:
491
        kwargs = verifier_config_loader()
492
    exts = (".txt",) if selective else ()
493
    files = [os.path.join(ldir, file) for file in os.listdir(ldir) if filefilter(file, ldir, exts)]
494
    with concurrent.futures.ThreadPoolExecutor(max_workers=utilities.workers(files)) as xec:
495
        for file in files:
496
            print("HASHING:", os.path.basename(file))
497
            basename = file + ".cksum"
498
            targetname = os.path.join(ldir, basename)
499
            try:
500
                xec.submit(hash_writer, file, targetname, ldir, kwargs)
501
            except Exception as exc:
502
                print("SOMETHING WENT WRONG")
503
                print(str(exc))
504
                raise SystemExit
505
506
507
def gpgrunner(workingdir, keyid=None, pword=None, selective=False):
508
    """
509
    Create ASCII-armored PGP signatures for all files in a given directory, in parallel.
510
511
    :param workingdir: Path containing files you wish to verify.
512
    :type workingdir: str
513
514
    :param keyid: Key to use. 8-character hexadecimal, with or without 0x.
515
    :type keyid: str
516
517
    :param pword: Passphrase for given key.
518
    :type pword: str
519
520
    :param selective: Filtering filenames/extensions. Default is false.
521
    :type selective: bool
522
    """
523
    try:
524
        gpg = gnupg.GPG()
525
    except ValueError:
526
        print("COULD NOT FIND GnuPG!")
527
        raise SystemExit
528
    else:
529
        keyid = "0x" + keyid.upper() if not keyid.startswith("0x") else keyid.upper().replace("X", "x")
530
        dirlist = os.listdir(workingdir)
531
        files = [file for file in dirlist if filefilter(file, workingdir)]
532
        with concurrent.futures.ThreadPoolExecutor(max_workers=utilities.workers(files)) as xec:
533
            for file in files:
534
                gpgwriter(gpg, xec, file, workingdir, selective, keyid, pword)
535
536
537
def gpgwriter(gpg, xec, file, workingdir, selective=False, keyid=None, pword=None):
538
    """
539
    :param gpg: Instance of Python GnuPG executable.
540
    :type gpg: gnupg.GPG()
541
542
    :param xec: ThreadPoolExecutor instance.
543
    :type xec: concurrent.futures.ThreadPoolExecutor
544
545
    :param file: File inside workingdir that is being verified.
546
    :type file: str
547
548
    :param workingdir: Path containing files you wish to verify.
549
    :type workingdir: str
550
551
    :param selective: Filtering filenames/extensions. Default is false.
552
    :type selective: bool
553
554
    :param keyid: Key to use. 8-character hexadecimal, with or without 0x.
555
    :type keyid: str
556
557
    :param pword: Passphrase for given key.
558
    :type pword: str
559
    """
560
    sup = bbconstants.SUPPS + (".txt",) if selective else bbconstants.SUPPS
561
    if not file.endswith(sup):
562
        aps = bbconstants.ARCSPLUS
563
        pfx = bbconstants.PREFIXES
564
        if (utilities.prepends(file, pfx, aps)) if selective else True:
565
            print("VERIFYING:", os.path.basename(file))
566
            thepath = os.path.join(workingdir, file)
567
            try:
568
                xec.submit(gpgfile, thepath, gpg, keyid, pword)
569
            except Exception as exc:
570
                print("SOMETHING WENT WRONG")
571
                print(str(exc))
572
                raise SystemExit
573
574
575
def gpg_config_loader(homepath=None):
576
    """
577
    Read a ConfigParser file to get PGP key, password (optional)
578
579
    :param homepath: Folder containing ini file. Default is user directory.
580
    :type homepath: str
581
    """
582
    config = iniconfig.generic_loader('gpgrunner', homepath)
583
    gpgkey = config.get('key', fallback=None)
584
    gpgpass = config.get('pass', fallback=None)
585
    return gpgkey, gpgpass
586
587
588
def gpg_config_writer(key=None, password=None, homepath=None):
589
    """
590
    Write a ConfigParser file to store PGP key, password (optional)
591
592
    :param key: Key ID, leave as None to not write.
593
    :type key: str
594
595
    :param password: Key password, leave as None to not write.
596
    :type password: str
597
598
    :param homepath: Folder containing ini file. Default is user directory.
599
    :type homepath: str
600
    """
601
    results = {}
602
    if key is not None:
603
        results["key"] = key
604
    if password is not None:
605
        results["pass"] = password
606
    iniconfig.generic_writer("gpgrunner", results, homepath)
607
608
609
def verifier_config_loader(homepath=None):
610
    """
611
    Read a ConfigParser file to get hash preferences.
612
613
    :param homepath: Folder containing ini file. Default is user directory.
614
    :type homepath: str
615
    """
616
    ini = iniconfig.generic_loader("hashmodes", homepath)
617
    results = {}
618
    results['crc32'] = bool(ini.getboolean('crc32', fallback=False))
619
    results['adler32'] = bool(ini.getboolean('adler32', fallback=False))
620
    results['sha0'] = bool(ini.getboolean('sha0', fallback=False))
621
    results['sha1'] = bool(ini.getboolean('sha1', fallback=True))
622
    results['sha224'] = bool(ini.getboolean('sha224', fallback=False))
623
    results['sha256'] = bool(ini.getboolean('sha256', fallback=True))
624
    results['sha384'] = bool(ini.getboolean('sha384', fallback=False))
625
    results['sha512'] = bool(ini.getboolean('sha512', fallback=False))
626
    results['md5'] = bool(ini.getboolean('md5', fallback=True))
627
    results['md4'] = bool(ini.getboolean('md4', fallback=False))
628
    results['ripemd160'] = bool(ini.getboolean('ripemd160', fallback=False))
629
    results['whirlpool'] = bool(ini.getboolean('whirlpool', fallback=False))
630
    results['blocksize'] = int(ini.getint('blocksize', fallback=16777216))
631
    results['sha3224'] = bool(ini.getboolean('sha3224', fallback=False))
632
    results['sha3256'] = bool(ini.getboolean('sha3256', fallback=False))
633
    results['sha3384'] = bool(ini.getboolean('sha3384', fallback=False))
634
    results['sha3512'] = bool(ini.getboolean('sha3512', fallback=False))
635
    return results
636
637
638
def verifier_config_writer(resultdict=None, homepath=None):
639
    """
640
    Write a ConfigParser file to store hash preferences.
641
642
    :param resultdict: Dictionary of results: {method, bool}
643
    :type resultdict: dict({str, bool})
644
645
    :param homepath: Folder containing ini file. Default is user directory.
646
    :type homepath: str
647
    """
648
    if resultdict is None:
649
        resultdict = verifier_config_loader()
650
    results = {method: str(flag).lower() for method, flag in resultdict.items()}
651
    iniconfig.generic_writer("hashmodes", results, homepath)
652