Completed
Push — master ( fc3a5e...a87bbd )
by John
06:51
created

hash_get()   B

Complexity

Conditions 2

Size

Total Lines 25

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 5
CRAP Score 2.0932

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 2
c 1
b 0
f 0
dl 0
loc 25
rs 8.8571
ccs 5
cts 7
cp 0.7143
crap 2.0932
1
#!/usr/bin/env python3
2 4
"""This module is used to generate file hashes/checksums."""
3
4 4
import zlib  # crc32/adler32
5 4
import hashlib  # all other hashes
6 4
import hmac  # escreens is a hmac, news at 11
7 4
import os  # path work
8 4
import concurrent.futures  # parallelization
9 4
from bbarchivist import bbconstants  # premade stuff
10 4
from bbarchivist import exceptions  # exceptions
11 4
from bbarchivist import utilities  # cores
12 4
from bbarchivist import iniconfig  # config parsing
13
14 4
__author__ = "Thurask"
15 4
__license__ = "WTFPL v2"
16 4
__copyright__ = "Copyright 2015-2016 Thurask"
17
18
19 4
def zlib_hash(filepath, method, blocksize=16 * 1024 * 1024):
20
    """
21
    Return zlib-based (i.e. CRC32/Adler32) checksum of a file.
22
23
    :param filepath: File you wish to verify.
24
    :type filepath: str
25
26
    :param method: "crc32" or "adler32".
27
    :type method: str
28
29
    :param blocksize: How much of file to read at once. Default is 16MB.
30
    :type blocksize: int
31
    """
32 4
    hashfunc = zlib.crc32 if method == "crc32" else zlib.adler32
33 4
    seed = 0 if method == "crc32" else 1
34 4
    with open(filepath, 'rb') as file:
35 4
        for chunk in iter(lambda: file.read(blocksize), b''):
36 4
            seed = hashfunc(chunk, seed)
37 4
    final = format(seed & 0xFFFFFFFF, "08x")
38 4
    return final
39
40
41 4
def hashlib_hash(filepath, engine, blocksize=16 * 1024 * 1024):
42
    """
43
    Return MD5/SHA-1/SHA-2/SHA-3 hash of a file.
44
45
    :param filepath: File you wish to verify.
46
    :type filepath: str
47
48
    :param engine: Hash object to update with file contents.
49
    :type engine: _hashlib.HASH
50
51 4
    :param blocksize: How much of file to read at once. Default is 16MB.
52
    :type blocksize: int
53
    """
54 4
    hashfunc_reader(filepath, engine, blocksize)
55
    return engine.hexdigest()
56
57
58
def hashfunc_reader(filepath, engine, blocksize=16 * 1024 * 1024):
59
    """
60
    Generate hash from file contents.
61
62
    :param filepath: File you wish to verify.
63
    :type filepath: str
64 4
65
    :param engine: Hash object to update with file contents.
66
    :type engine: _hashlib.HASH
67 4
68
    :param blocksize: How much of file to read at once. Default is 16MB.
69
    :type blocksize: int
70
    """
71
    with open(filepath, 'rb') as file:
72
        while True:
73
            data = file.read(blocksize)
74
            if not data:
75
                break
76
            engine.update(data)
77 4
78 4
79 4
def ssl_hash(filepath, method, blocksize=16 * 1024 * 1024):
80
    """
81
    Return SSL-library dependent hash of a file.
82 4
83
    :param filepath: File you wish to verify.
84
    :type filepath: str
85
86
    :param method: Method to use: algorithms in hashlib that are not guaranteed.
87
    :type method: str
88
89
    :param blocksize: How much of file to read at once. Default is 16MB.
90
    :type blocksize: int
91
    """
92 4
    try:
93 4
        engine = hashlib.new(method)
94 4
        hashfunc_reader(filepath, engine, blocksize)
95
        return engine.hexdigest()
96
    except ValueError as exc:
97 4
        msg = "{0} HASH FAILED".format(method.upper())
98
        exceptions.handle_exception(exc, msg, None)
99
100
101
def calculate_escreens(pin, app, uptime, duration=30):
102
    """
103
    Calculate key for the Engineering Screens based on input.
104
105
    :param pin: PIN to check. 8 character hexadecimal, lowercase.
106
    :type pin: str
107 4
108 4
    :param app: App version. 10.x.y.zzzz.
109 4
    :type app: str
110
111
    :param uptime: Uptime in ms.
112 4
    :type uptime: str
113
114
    :param duration: 1, 3, 6, 15, 30 (days).
115
    :type duration: str
116
    """
117
    #: Somehow, values for lifetimes for escreens.
118
    lifetimes = {
119
        1: "",
120
        3: "Hello my baby, hello my honey, hello my rag time gal",
121
        7: "He was a boy, and she was a girl, can I make it any more obvious?",
122 4
        15: "So am I, still waiting, for this world to stop hating?",
123 4
        30: "I love myself today, not like yesterday. I'm cool, I'm calm, I'm gonna be okay"
124 4
    }
125
    #: Escreens magic HMAC secret.
126
    ehmac = 'Up the time stream without a TARDIS'
127 4
    duration = int(duration)
128
    if duration not in [1, 3, 6, 15, 30]:
129
        duration = 1
130
    data = pin.lower() + app + str(uptime) + lifetimes[duration]
131
    newhmac = hmac.new(ehmac.encode(), data.encode(), digestmod=hashlib.sha1)
132
    key = newhmac.hexdigest()[:8]
133
    return key.upper()
134
135
136
def get_hashfunc(hashtype):
137 4
    """
138 4
    Get genericized hash function from hash type.
139 4
140
    :param hashtype: Hash type.
141
    :type hashtype: str
142 4
    """
143
    hashfuncs = {"adler32": zlib_hash,
144
                 "crc32": zlib_hash,
145
                 "md4": ssl_hash,
146
                 "sha0": ssl_hash,
147
                 "ripemd160": ssl_hash,
148
                 "whirlpool": ssl_hash,
149
                 "md5": hashlib_hash,
150
                 "sha1": hashlib_hash,
151
                 "sha224": hashlib_hash,
152 4
                 "sha256": hashlib_hash,
153 4
                 "sha384": hashlib_hash,
154 4
                 "sha512": hashlib_hash,
155 4
                 "sha3224": hashlib_hash,
156
                 "sha3256": hashlib_hash,
157
                 "sha3384": hashlib_hash,
158
                 "sha3512": hashlib_hash}
159
    return hashfuncs[hashtype]
160
161 4
162
def get_engine(hashtype):
163
    """
164
    Get hashlib engine from hash type.
165
166
    :param hashtype: Hash type.
167
    :type hashtype: str
168
    """
169
    hashengines =  {"md5": hashlib.md5(),
0 ignored issues
show
Coding Style introduced by
Exactly one space required after assignment
hashengines = {"md5": hashlib.md5(),
^
Loading history...
170
                    "sha1": hashlib.sha1(),
171 4
                    "sha224": hashlib.sha224(),
172 4
                    "sha256": hashlib.sha256(),
173 4
                    "sha384": hashlib.sha384(),
174 4
                    "sha512": hashlib.sha512()}
175
    if new_enough(6):
0 ignored issues
show
Comprehensibility Best Practice introduced by
Undefined variable 'new_enough'
Loading history...
176
         hashengines.update({"sha3224": hashlib.sha3_224(),
1 ignored issue
show
Coding Style introduced by
The indentation here looks off. 8 spaces were expected, but 9 were found.
Loading history...
Bug introduced by
The Module hashlib does not seem to have a member named sha3_224.

This check looks for calls to members that are non-existent. These calls will fail.

The member could have been renamed or removed.

Loading history...
177
                             "sha3256": hashlib.sha3_256(),
1 ignored issue
show
Bug introduced by
The Module hashlib does not seem to have a member named sha3_256.

This check looks for calls to members that are non-existent. These calls will fail.

The member could have been renamed or removed.

Loading history...
178
                             "sha3384": hashlib.sha3_384(),
1 ignored issue
show
Bug introduced by
The Module hashlib does not seem to have a member named sha3_384.

This check looks for calls to members that are non-existent. These calls will fail.

The member could have been renamed or removed.

Loading history...
179
                             "sha3512": hashlib.sha3_512()})
1 ignored issue
show
Bug introduced by
The Module hashlib does not seem to have a member named sha3_512.

This check looks for calls to members that are non-existent. These calls will fail.

The member could have been renamed or removed.

Loading history...
180 4
    return hashengines[hashtype]
181
182
183
def hash_get(filename, hashfunc, hashtype, workingdir, blocksize=16777216):
184
    """
185
    Generate and pretty format the hash result for a file.
186
187
    :param filename: File to hash.
188
    :type filename: str
189
190 4
    :param hashfunc: Hash function to use.
191 4
    :type hashfunc: function
192 4
193 4
    :param hashtype: Hash type.
194
    :type hashtype: str
195
196
    :param workingdir: Working directory.
197
    :type workingdir: str
198
199 4
    :param blocksize: Block size. Default is 16MB.
200
    :type blocksize: int
201
    """
202
    if hashfunc == hashlib_hash:
203
        method = get_engine(hashtype)
204
    else:
205
        method = hashtype
206
    result = hashfunc(os.path.join(workingdir, filename), method, blocksize)
207
    return "{0} {1}\n".format(result.upper(), os.path.basename(filename))
208
209 4
210 4
def base_hash(hashtype, source, workingdir, block, target, kwargs=None):
211 4
    """
212 4
    Generic hash function; get hash, write to file.
213
214
    :param hashtype: Hash type.
215
    :type hashtype: str
216
217
    :param source: File to be hashed; foobar.ext
218 4
    :type source: str
219
220
    :param workingdir: Path containing files you wish to verify.
221
    :type workingdir: str
222
223
    :param block: Blocksize, in bytes.
224
    :type block: int
225
226
    :param target: File to write to.
227
    :type target: file
228 4
229 4
    :param kwargs: Values. Refer to `:func:verifier_config_loader`.
230 4
    :type kwargs: dict
231
    """
232
    if kwargs[hashtype]:
233 4
        hash_generic = [hashtype.upper()]
234
        hashfunc = get_hashfunc(hashtype)
235
        hashtype2 = "sha" if hashtype == "sha0" else hashtype
236
        hash_generic.append(hash_get(source, hashfunc, hashtype2, workingdir, block))
237
        target.write("\n".join(hash_generic))
238
239
240
def hash_writer(source, dest, workingdir, kwargs=None):
241
    """
242
    Write per-file hashes.
243
244
    :param source: File to be hashed; foobar.ext
245
    :type source: str
246 4
247 4
    :param dest: Destination file; foobar.ext.cksum
248 4
    :type dest: str
249 4
250 4
    :param workingdir: Path containing files you wish to verify.
251 4
    :type workingdir: str
252
253
    :param kwargs: Values. Refer to `:func:verifier_config_loader`.
254 4
    :type kwargs: dict
255
    """
256
    block = int(kwargs['blocksize'])
257
    with open(dest, 'w') as target:
258
        base_hash("adler32", source, workingdir, block, target, kwargs)
259
        base_hash("crc32", source, workingdir, block, target, kwargs)
260
        base_hash("md4", source, workingdir, block, target, kwargs)
261
        base_hash("md5", source, workingdir, block, target, kwargs)
262
        base_hash("sha0", source, workingdir, block, target, kwargs)
263
        base_hash("sha1", source, workingdir, block, target, kwargs)
264
        base_hash("sha224", source, workingdir, block, target, kwargs)
265
        base_hash("sha256", source, workingdir, block, target, kwargs)
266
        base_hash("sha384", source, workingdir, block, target, kwargs)
267 4
        base_hash("sha512", source, workingdir, block, target, kwargs)
268 4
        base_hash("ripemd160", source, workingdir, block, target, kwargs)
269 4
        base_hash("whirlpool", source, workingdir, block, target, kwargs)
270 4
        if utilities.new_enough(6):
271 4
            base_hash("sha3224", source, workingdir, block, target, kwargs)
272 4
            base_hash("sha3256", source, workingdir, block, target, kwargs)
273 4
            base_hash("sha3384", source, workingdir, block, target, kwargs)
274
            base_hash("sha3512", source, workingdir, block, target, kwargs)
275
276 4
277
def filefilter(file, workingdir, extras=()):
278
    """
279
    Check if file in folder is a folder, or if it's got a forbidden extension.
280
281
    :param file: File to be hashed.
282
    :type file: str
283
284
    :param workingdir: Path containing files you wish to verify.
285
    :type workingdir: str
286 4
287
    :param extras: Tuple of extra extensions.
288
    :type extras: tuple
289 4
    """
290
    return not (os.path.isdir(os.path.join(workingdir, file)) or file.endswith(bbconstants.SUPPS + extras))
291
292
293
def prep_verifier(ldir, selective=False):
294
    """
295
    Prepare files for verifier function.
296
297
    :param ldir: Path containing files you wish to verify.
298
    :type ldir: str
299 4
300
    :param selective: Filtering filenames/extensions. Default is false.
301
    :type selective: bool
302 4
    """
303
    exts = (".txt",) if selective else ()
304
    fxs = [os.path.join(ldir, afx) for afx in os.listdir(ldir) if filefilter(afx, ldir, exts)]
305
    return fxs
306
307
308
def verifier(ldir, kwargs=None, selective=False):
309
    """
310
    For all files in a directory, perform various hash/checksum functions.
311
    Take dict to define hashes, write output to a/individual .cksum file(s).
312 4
313
    :param ldir: Path containing files you wish to verify.
314
    :type ldir: str
315 4
316
    :param kwargs: Values. Refer to `:func:verifier_config_loader`.
317
    :type kwargs: dict
318
319
    :param selective: Filtering filenames/extensions. Default is false.
320
    :type selective: bool
321
    """
322
    kwargs = verifier_config_loader() if kwargs is None else kwargs
323
    fxs = prep_verifier(ldir, selective)
324
    with concurrent.futures.ThreadPoolExecutor(max_workers=utilities.workers(fxs)) as xec:
325 4
        for file in fxs:
326
            verifier_individual(xec, ldir, file, kwargs)
327
328 4
329
def verifier_individual(xec, ldir, file, kwargs):
330
    """
331
    Individually verify files through a ThreadPoolExecutor.
332
333
    :param xec: ThreadPoolExecutor instance.
334
    :type xec: concurrent.futures.ThreadPoolExecutor
335
336
    :param ldir: Path containing files you wish to verify.
337
    :type ldir: str
338
339
    :param file: Filename.
340
    :type file: str
341
342
    :param kwargs: Values. Refer to `:func:verifier_config_loader`.
343
    :type kwargs: dict
344
    """
345 4
    print("HASHING:", os.path.basename(file))
346
    basename = file + ".cksum"
347
    targetname = os.path.join(ldir, basename)
348
    try:
349
        xec.submit(hash_writer, file, targetname, ldir, kwargs)
350
    except Exception as exc:
1 ignored issue
show
Best Practice introduced by
Catching very general exceptions such as Exception is usually not recommended.

Generally, you would want to handle very specific errors in the exception handler. This ensure that you do not hide other types of errors which should be fixed.

So, unless you specifically plan to handle any error, consider adding a more specific exception.

Loading history...
351
        exceptions.handle_exception(exc)
352
353 4
354 4
def verifier_config_loader(homepath=None):
355 4
    """
356 4
    Read a ConfigParser file to get hash preferences.
357 4
358 4
    :param homepath: Folder containing ini file. Default is user directory.
359 4
    :type homepath: str
360 4
    """
361
    ini = iniconfig.generic_loader("hashmodes", homepath)
362
    results = {}
363 4
    results['crc32'] = bool(ini.getboolean('crc32', fallback=False))
364
    results['adler32'] = bool(ini.getboolean('adler32', fallback=False))
365
    results['sha0'] = bool(ini.getboolean('sha0', fallback=False))
366
    results['sha1'] = bool(ini.getboolean('sha1', fallback=True))
367
    results['sha224'] = bool(ini.getboolean('sha224', fallback=False))
368
    results['sha256'] = bool(ini.getboolean('sha256', fallback=True))
369
    results['sha384'] = bool(ini.getboolean('sha384', fallback=False))
370
    results['sha512'] = bool(ini.getboolean('sha512', fallback=False))
371
    results['md5'] = bool(ini.getboolean('md5', fallback=True))
372
    results['md4'] = bool(ini.getboolean('md4', fallback=False))
373
    results['ripemd160'] = bool(ini.getboolean('ripemd160', fallback=False))
374
    results['whirlpool'] = bool(ini.getboolean('whirlpool', fallback=False))
375
    results['blocksize'] = int(ini.getint('blocksize', fallback=16777216))
376
    results['sha3224'] = bool(ini.getboolean('sha3224', fallback=False))
377
    results['sha3256'] = bool(ini.getboolean('sha3256', fallback=False))
378
    results['sha3384'] = bool(ini.getboolean('sha3384', fallback=False))
379 4
    results['sha3512'] = bool(ini.getboolean('sha3512', fallback=False))
380 4
    return results
381
382
383 4
def verifier_config_writer(resultdict=None, homepath=None):
384
    """
385
    Write a ConfigParser file to store hash preferences.
386
387
    :param resultdict: Dictionary of results: {method, bool}
388
    :type resultdict: dict({str, bool})
389
390
    :param homepath: Folder containing ini file. Default is user directory.
391
    :type homepath: str
392
    """
393
    if resultdict is None:
394
        resultdict = verifier_config_loader()
395
    results = {method: str(flag).lower() for method, flag in resultdict.items()}
396
    iniconfig.generic_writer("hashmodes", results, homepath)
397