Completed
Push — master ( 152b55...0bdad9 )
by John
01:22
created

hash_writer()   F

Complexity

Conditions 14

Size

Total Lines 66

Duplication

Lines 0
Ratio 0 %

Importance

Changes 2
Bugs 0 Features 0
Metric Value
cc 14
c 2
b 0
f 0
dl 0
loc 66
rs 2.6685

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like hash_writer() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
#!/usr/bin/env python3
2
"""This module is used to generate file hashes/checksums and PGP signatures."""
3
4
import zlib  # crc32/adler32
5
import hashlib  # all other hashes
6
import hmac  # escreens is a hmac, news at 11
7
import configparser  # config parsing, duh
8
import os  # path work
9
import concurrent.futures  # parallelization
10
import gnupg  # interface b/w Python, GPG
11
from bbarchivist import bbconstants  # premade stuff
12
from bbarchivist import utilities  # cores
13
14
__author__ = "Thurask"
15
__license__ = "WTFPL v2"
16
__copyright__ = "Copyright 2015-2016 Thurask"
17
18
19
def hc32(filepath, blocksize=16 * 1024 * 1024):
20
    """
21
    Return CRC32 checksum of a file.
22
23
    :param filepath: File you wish to verify.
24
    :type filepath: str
25
26
    :param blocksize: How much of file to read at once.
27
    :type blocksize: int
28
    """
29
    seed = 0
30
    with open(filepath, 'rb') as file:
31
        for chunk in iter(lambda: file.read(blocksize), b''):
32
            seed = zlib.crc32(chunk, seed)
33
    final = format(seed & 0xFFFFFFFF, "08x")
34
    return final
35
36
37
def ha32(filepath, blocksize=16 * 1024 * 1024):
38
    """
39
    Return Adler32 checksum of a file.
40
41
    :param filepath: File you wish to verify.
42
    :type filepath: str
43
44
    :param blocksize: How much of file to read at once.
45
    :type blocksize: int
46
    """
47
    asum = 1
48
    with open(filepath, 'rb') as file:
49
        while True:
50
            data = file.read(blocksize)
51
            if not data:
52
                break
53
            asum = zlib.adler32(data, asum)
54
            if asum < 0:  # pragma: no cover
55
                asum += 2 ** 32
56
    final = format(asum & 0xFFFFFFFF, "08x")
57
    return final
58
59
60
def hs1(filepath, blocksize=16 * 1024 * 1024):
61
    """
62
    Return SHA-1 hash of a file.
63
64
    :param filepath: File you wish to verify.
65
    :type filepath: str
66
67
    :param blocksize: How much of file to read at once.
68
    :type blocksize: int
69
    """
70
    sha1 = hashlib.sha1()
71
    with open(filepath, 'rb') as file:
72
        while True:
73
            data = file.read(blocksize)
74
            if not data:
75
                break
76
            sha1.update(data)
77
    return sha1.hexdigest()
78
79
80
def hs224(filepath, blocksize=16 * 1024 * 1024):
81
    """
82
    Return SHA-224 hash of a file.
83
84
    :param filepath: File you wish to verify.
85
    :type filepath: str
86
87
    :param blocksize: How much of file to read at once.
88
    :type blocksize: int
89
    """
90
    sha224 = hashlib.sha224()
91
    with open(filepath, 'rb') as file:
92
        while True:
93
            data = file.read(blocksize)
94
            if not data:
95
                break
96
            sha224.update(data)
97
    return sha224.hexdigest()
98
99
100
def hs256(filepath, blocksize=16 * 1024 * 1024):
101
    """
102
    Return SHA-256 hash of a file.
103
104
    :param filepath: File you wish to verify.
105
    :type filepath: str
106
107
    :param blocksize: How much of file to read at once.
108
    :type blocksize: int
109
    """
110
    sha256 = hashlib.sha256()
111
    with open(filepath, 'rb') as file:
112
        while True:
113
            data = file.read(blocksize)
114
            if not data:
115
                break
116
            sha256.update(data)
117
    return sha256.hexdigest()
118
119
120
def hs384(filepath, blocksize=16 * 1024 * 1024):
121
    """
122
    Return SHA-384 hash of a file.
123
124
    :param filepath: File you wish to verify.
125
    :type filepath: str
126
127
    :param blocksize: How much of file to read at once.
128
    :type blocksize: int
129
    """
130
    sha384 = hashlib.sha384()
131
    with open(filepath, 'rb') as file:
132
        while True:
133
            data = file.read(blocksize)
134
            if not data:
135
                break
136
            sha384.update(data)
137
    return sha384.hexdigest()
138
139
140
def hs512(filepath, blocksize=16 * 1024 * 1024):
141
    """
142
    Return SHA-512 hash of a file.
143
144
    :param filepath: File you wish to verify.
145
    :type filepath: str
146
147
    :param blocksize: How much of file to read at once.
148
    :type blocksize: int
149
    """
150
    sha512 = hashlib.sha512()
151
    with open(filepath, 'rb') as file:
152
        while True:
153
            data = file.read(blocksize)
154
            if not data:
155
                break
156
            sha512.update(data)
157
    return sha512.hexdigest()
158
159
160
def hm5(filepath, blocksize=16 * 1024 * 1024):
161
    """
162
    Return MD5 hash of a file.
163
164
    :param filepath: File you wish to verify.
165
    :type filepath: str
166
167
    :param blocksize: How much of file to read at once.
168
    :type blocksize: int
169
    """
170
    md5 = hashlib.md5()
171
    with open(filepath, 'rb') as file:
172
        while True:
173
            data = file.read(blocksize)
174
            if not data:
175
                break
176
            md5.update(data)
177
    return md5.hexdigest()
178
179
180
def ssl_hash(filepath, method, blocksize=16 * 1024 * 1024):
181
    """
182
    Return SSL-library dependent hash of a file.
183
184
    :param filepath: File you wish to verify.
185
    :type filepath: str
186
187
    :param method: Method to use: algorithms in hashlib that are not guaranteed.
188
    :type method: str
189
190
    :param blocksize: How much of file to read at once.
191
    :type blocksize: int
192
    """
193
    try:
194
        engine = hashlib.new(method)
195
        with open(filepath, 'rb') as file:
196
            while True:
197
                data = file.read(blocksize)
198
                if not data:
199
                    break
200
                engine.update(data)
201
        return engine.hexdigest()
202
    except ValueError as exc:
203
        print(str(exc))
204
        print("{0} HASH FAILED".format(method.upper()))
205
206
207
def hm4(filepath, blocksize=16 * 1024 * 1024):
208
    """
209
    Return MD4 hash of a file; depends on system SSL library.
210
211
    :param filepath: File you wish to verify.
212
    :type filepath: str
213
214
    :param blocksize: How much of file to read at once.
215
    :type blocksize: int
216
    """
217
    return ssl_hash(filepath, "md4", blocksize)
218
219
220
def hr160(filepath, blocksize=16 * 1024 * 1024):
221
    """
222
    Return RIPEMD160 hash of a file; depends on system SSL library.
223
224
    :param filepath: File you wish to verify.
225
    :type filepath: str
226
227
    :param blocksize: How much of file to read at once.
228
    :type blocksize: int
229
    """
230
    return ssl_hash(filepath, "ripemd160", blocksize)
231
232
233
def hwp(filepath, blocksize=16 * 1024 * 1024):
234
    """
235
    Return Whirlpool hash of a file; depends on system SSL library.
236
237
    :param filepath: File you wish to verify.
238
    :type filepath: str
239
240
    :param blocksize: How much of file to read at once.
241
    :type blocksize: int
242
    """
243
    return ssl_hash(filepath, "whirlpool", blocksize)
244
245
246
def hs0(filepath, blocksize=16 * 1024 * 1024):
247
    """
248
    Return SHA-0 hash of a file; depends on system SSL library.
249
250
    :param filepath: File you wish to verify.
251
    :type filepath: str
252
253
    :param blocksize: How much of file to read at once.
254
    :type blocksize: int
255
    """
256
    return ssl_hash(filepath, "sha", blocksize)
257
258
259
def gpgfile(filepath, gpginst, key=None, pword=None):
260
    """
261
    Make ASCII-armored signature files with a given private key.
262
    Takes an instance of gnupg.GPG().
263
264
    :param filepath: File you wish to verify.
265
    :type filepath: str
266
267
    :param gpginst: Instance of Python GnuPG executable.
268
    :type gpginst: gnupg.GPG()
269
270
    :param key: Key ID. 0xABCDEF01
271
    :type key: str
272
273
    :param pword: Passphrase for key.
274
    :type pword: str
275
    """
276
    with open(filepath, "rb") as file:
277
        fname = file.name + ".asc"
278
        gpginst.sign_file(file, detach=True, keyid=key, passphrase=pword, output=fname)
279
280
281
def calculate_escreens(pin, app, uptime, duration=30):
282
    """
283
    Calculate key for the Engineering Screens based on input.
284
285
    :param pin: PIN to check. 8 character hexadecimal, lowercase.
286
    :type pin: str
287
288
    :param app: App version. 10.x.y.zzzz.
289
    :type app: str
290
291
    :param uptime: Uptime in ms.
292
    :type uptime: str
293
294
    :param duration: 1, 3, 6, 15, 30 (days).
295
    :type duration: str
296
    """
297
    #: Somehow, values for lifetimes for escreens.
298
    lifetimes = {
299
        1: "",
300
        3: "Hello my baby, hello my honey, hello my rag time gal",
301
        7: "He was a boy, and she was a girl, can I make it any more obvious?",
302
        15: "So am I, still waiting, for this world to stop hating?",
303
        30: "I love myself today, not like yesterday. "
304
    }
305
    lifetimes[30] += "I'm cool, I'm calm, I'm gonna be okay"
306
    #: Escreens magic HMAC secret.
307
    secret = 'Up the time stream without a TARDIS'
308
    duration = int(duration)
309
    if duration not in [1, 3, 6, 15, 30]:
310
        duration = 1
311
    data = pin.lower() + app + str(uptime) + lifetimes[duration]
312
    newhmac = hmac.new(secret.encode(), data.encode(), digestmod=hashlib.sha1)
313
    key = newhmac.hexdigest()[:8]
314
    return key.upper()
315
316
317
def hash_get(filename, hashfunc, workingdir, blocksize=16777216):
318
    """
319
    Generate and pretty format the hash result for a file.
320
321
    :param filename: File to hash.
322
    :type filename: str
323
324
    :param hashfunc: Hash function to use.
325
    :type hashfunc: function
326
327
    :param workingdir: Working directory.
328
    :type workingdir: str
329
330
    :param blocksize: Block size. Default is 16MB.
331
    :type blocksize: int
332
    """
333
    result = hashfunc(os.path.join(workingdir, filename), blocksize)
334
    return "{0} {1}\n".format(result.upper(), filename)
335
336
337
def hash_writer(source, dest, workingdir, kwargs=None):
338
    """
339
    Write per-file hashes.
340
341
    :param source: File to be hashed; foobar.ext
342
    :type source: str
343
344
    :param dest: Destination file; foobar.ext.cksum
345
    :type dest: str
346
347
    :param workingdir: Path containing files you wish to verify.
348
    :type workingdir: str
349
350
    :param kwargs: Values. Refer to `:func:verifier_config_loader`.
351
    :type kwargs: dict
352
    """
353
    block = int(kwargs['blocksize'])
354
    with open(dest, 'w') as target:
355
        if kwargs['adler32']:
356
            h_a32 = ["ADLER32"]
357
            h_a32.append(hash_get(source, ha32, workingdir, block))
358
            target.write("\n".join(h_a32))
359
        if kwargs['crc32']:
360
            h_c32 = ["CRC32"]
361
            h_c32.append(hash_get(source, hc32, workingdir, block))
362
            target.write("\n".join(h_c32))
363
        if kwargs['md4']:
364
            h_m4 = ["MD4"]
365
            h_m4.append(hash_get(source, hm4, workingdir, block))
366
            target.write("\n".join(h_m4))
367
        if kwargs['md5']:
368
            h_m5 = ["MD5"]
369
            h_m5.append(hash_get(source, hm5, workingdir, block))
370
            target.write("\n".join(h_m5))
371
        if kwargs['sha0']:
372
            h_s0 = ["SHA0"]
373
            h_s0.append(hash_get(source, hs0, workingdir, block))
374
            target.write("\n".join(h_s0))
375
        if kwargs['sha1']:
376
            h_s1 = ["SHA1"]
377
            h_s1.append(hash_get(source, hs1, workingdir, block))
378
            target.write("\n".join(h_s1))
379
        if kwargs['sha224']:
380
            h_s224 = ["SHA224"]
381
            h_s224.append(hash_get(source, hs224, workingdir, block))
382
            target.write("\n".join(h_s224))
383
        if kwargs['sha256']:
384
            h_s256 = ["SHA256"]
385
            h_s256.append(hash_get(source, hs256, workingdir, block))
386
            target.write("\n".join(h_s256))
387
        if kwargs['sha384']:
388
            h_s384 = ["SHA384"]
389
            h_s384.append(hash_get(source, hs384, workingdir, block))
390
            target.write("\n".join(h_s384))
391
        if kwargs['sha512']:
392
            h_s512 = ["SHA512"]
393
            h_s512.append(hash_get(source, hs512, workingdir, block))
394
            target.write("\n".join(h_s512))
395
        if kwargs['ripemd160']:
396
            h_r160 = ["RIPEMD160"]
397
            h_r160.append(hash_get(source, hr160, workingdir, block))
398
            target.write("\n".join(h_r160))
399
        if kwargs['whirlpool']:
400
            h_wp = ["WHIRLPOOL"]
401
            h_wp.append(hash_get(source, hwp, workingdir, block))
402
            target.write("\n".join(h_wp))
403
404
405
def filefilter(file, workingdir, extras=()):
406
    """
407
    Check if file in folder is a folder, or if it's got a forbidden extension.
408
409
    :param file: File to be hashed.
410
    :type file: str
411
412
    :param workingdir: Path containing files you wish to verify.
413
    :type workingdir: str
414
415
    :param extras: Tuple of extra extensions.
416
    :type extras: tuple
417
    """
418
    return not (os.path.isdir(os.path.join(workingdir, file)) or file.endswith(bbconstants.SUPPS + extras))
419
420
421
def verifier(workingdir, kwargs=None, selective=False):
422
    """
423
    For all files in a directory, perform various hash/checksum functions.
424
    Take dict to define hashes, write output to a/individual .cksum file(s).
425
426
    :param workingdir: Path containing files you wish to verify.
427
    :type workingdir: str
428
429
    :param kwargs: Values. Refer to `:func:verifier_config_loader`.
430
    :type kwargs: dict
431
    """
432
    if kwargs is None:
433
        kwargs = verifier_config_loader()
434
    extras = (".txt",) if selective else ()
435
    files = [file for file in os.listdir(workingdir) if filefilter(file, workingdir, extras)]
436
    with concurrent.futures.ThreadPoolExecutor(max_workers=utilities.workers(files)) as xec:
437
        for file in files:
438
            print("HASHING:", str(file))
439
            basename = file + ".cksum"
440
            targetname = os.path.join(workingdir, basename)
441
            try:
442
                xec.submit(hash_writer, file, targetname, workingdir, kwargs)
443
            except Exception as exc:
444
                print("SOMETHING WENT WRONG")
445
                print(str(exc))
446
                raise SystemExit
447
448
449
def gpgrunner(workingdir, keyid=None, pword=None, selective=False):
450
    """
451
    Create ASCII-armored PGP signatures for all files in a given directory, in parallel.
452
453
    :param workingdir: Path containing files you wish to verify.
454
    :type workingdir: str
455
456
    :param keyid: Key to use. 8-character hexadecimal, with or without 0x.
457
    :type keyid: str
458
459
    :param pword: Passphrase for given key.
460
    :type pword: str
461
462
    :param selective: Filtering filenames/extensions. Default is false.
463
    :type selective: bool
464
    """
465
    try:
466
        gpg = gnupg.GPG()
467
    except ValueError:
468
        print("COULD NOT FIND GnuPG!")
469
        raise SystemExit
470
    else:
471
        if not keyid.startswith("0x"):
472
            keyid = "0x" + keyid.upper()
473
        dirlist = os.listdir(workingdir)
474
        files = [file for file in dirlist if not os.path.isdir(file)]
475
        with concurrent.futures.ThreadPoolExecutor(max_workers=utilities.workers(files)) as xec:
476
            for file in files:
477
                sup = bbconstants.SUPPS + (".txt",) if selective else bbconstants.SUPPS
478
                if not file.endswith(sup):
479
                    aps = bbconstants.ARCSPLUS
480
                    pfx = bbconstants.PREFIXES
481
                    if (utilities.prepends(file, pfx, aps)) if selective else True:
482
                        print("VERIFYING:", str(file))
483
                        thepath = os.path.join(workingdir, file)
484
                        try:
485
                            xec.submit(gpgfile, thepath, gpg, keyid, pword)
486
                        except Exception as exc:
487
                            print("SOMETHING WENT WRONG")
488 View Code Duplication
                            print(str(exc))
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
489
                            raise SystemExit
490
491
492
def gpg_config_loader(homepath=None):
493
    """
494
    Read a ConfigParser file to get PGP key, password (optional)
495
496
    :param homepath: Folder containing ini file. Default is user directory.
497
    :type homepath: str
498
    """
499
    config = configparser.ConfigParser()
500
    if homepath is None:
501
        homepath = os.path.expanduser("~")
502
    conffile = os.path.join(homepath, "bbarchivist.ini")
503
    if not os.path.exists(conffile):
504
        open(conffile, 'w').close()
505
    config.read(conffile)
506
    if not config.has_section('gpgrunner'):
507
        config['gpgrunner'] = {}
508
    gpgkey = config.get('gpgrunner', 'key', fallback=None)
509
    gpgpass = config.get('gpgrunner', 'pass', fallback=None)
510
    return gpgkey, gpgpass
511
512
513
def gpg_config_writer(key=None, password=None, homepath=None):
514
    """
515
    Write a ConfigParser file to store PGP key, password (optional)
516
517
    :param key: Key ID, leave as None to not write.
518
    :type key: str
519
520
    :param password: Key password, leave as None to not write.
521
    :type password: str
522
523
    :param homepath: Folder containing ini file. Default is user directory.
524
    :type homepath: str
525
    """
526
    config = configparser.ConfigParser()
527
    if homepath is None:
528
        homepath = os.path.expanduser("~")
529
    conffile = os.path.join(homepath, "bbarchivist.ini")
530
    if not os.path.exists(conffile):
531
        open(conffile, 'w').close()
532
    config.read(conffile)
533
    if not config.has_section('gpgrunner'):
534
        config['gpgrunner'] = {}
535
    if key is not None:
536
        config['gpgrunner']['key'] = key
537
    if password is not None:
538
        config['gpgrunner']['pass'] = password
539
    with open(conffile, "w") as configfile:
540
        config.write(configfile)
541
542
543
def verifier_config_loader(homepath=None):
544
    """
545
    Read a ConfigParser file to get hash preferences.
546
547
    :param homepath: Folder containing ini file. Default is user directory.
548
    :type homepath: str
549
    """
550
    results = {}
551
    config = configparser.ConfigParser()
552
    if homepath is None:
553
        homepath = os.path.expanduser("~")
554
    conffile = os.path.join(homepath, "bbarchivist.ini")
555
    if not os.path.exists(conffile):
556
        open(conffile, 'w').close()
557
    config.read(conffile)
558
    if not config.has_section('hashmodes'):
559
        config['hashmodes'] = {}
560
    ini = config['hashmodes']
561
    results['crc32'] = bool(ini.getboolean('crc32', fallback=False))
562
    results['adler32'] = bool(ini.getboolean('adler32', fallback=False))
563
    results['sha0'] = bool(ini.getboolean('sha0', fallback=False))
564
    results['sha1'] = bool(ini.getboolean('sha1', fallback=True))
565
    results['sha224'] = bool(ini.getboolean('sha224', fallback=False))
566
    results['sha256'] = bool(ini.getboolean('sha256', fallback=True))
567
    results['sha384'] = bool(ini.getboolean('sha384', fallback=False))
568
    results['sha512'] = bool(ini.getboolean('sha512', fallback=False))
569
    results['md5'] = bool(ini.getboolean('md5', fallback=True))
570
    results['md4'] = bool(ini.getboolean('md4', fallback=False))
571
    results['ripemd160'] = bool(ini.getboolean('ripemd160', fallback=False))
572
    results['whirlpool'] = bool(ini.getboolean('whirlpool', fallback=False))
573
    results['blocksize'] = int(ini.getint('blocksize', fallback=16777216))
574
    return results
575
576
577
def verifier_config_writer(resultdict=None, homepath=None):
578
    """
579
    Write a ConfigParser file to store hash preferences.
580
581
    :param resultdict: Dictionary of results: {method, bool}
582
    :type resultdict: dict({str, bool})
583
584
    :param homepath: Folder containing ini file. Default is user directory.
585
    :type homepath: str
586
    """
587
    if resultdict is None:
588
        resultdict = verifier_config_loader()
589
    config = configparser.ConfigParser()
590
    if homepath is None:
591
        homepath = os.path.expanduser("~")
592
    conffile = os.path.join(homepath, "bbarchivist.ini")
593
    if not os.path.exists(conffile):
594
        open(conffile, 'w').close()
595
    config.read(conffile)
596
    if not config.has_section('hashmodes'):
597
        config['hashmodes'] = {}
598
    for method, flag in resultdict.items():
599
        config.set('hashmodes', method, str(flag).lower())
600
    with open(conffile, "w") as configfile:
601
        config.write(configfile)
602