Completed
Push — master ( dbd54c...3a3f87 )
by John
03:22
created

hash_writer()   B

Complexity

Conditions 3

Size

Total Lines 35

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 16
CRAP Score 3.072

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 3
c 1
b 0
f 0
dl 0
loc 35
ccs 16
cts 20
cp 0.8
crap 3.072
rs 8.8571
1
#!/usr/bin/env python3
2 4
"""This module is used to generate file hashes/checksums."""
3
4 4
import zlib  # crc32/adler32
5 4
import hashlib  # all other hashes
6 4
import hmac  # escreens is a hmac, news at 11
7 4
import os  # path work
8 4
import concurrent.futures  # parallelization
9 4
from bbarchivist import bbconstants  # premade stuff
10 4
from bbarchivist import exceptions  # exceptions
11 4
from bbarchivist import utilities  # cores
12 4
from bbarchivist import iniconfig  # config parsing
13
14 4
__author__ = "Thurask"
15 4
__license__ = "WTFPL v2"
16 4
__copyright__ = "Copyright 2015-2016 Thurask"
17
18
19 4
def zlib_hash(filepath, method, blocksize=16 * 1024 * 1024):
20
    """
21
    Return zlib-based (i.e. CRC32/Adler32) checksum of a file.
22
23
    :param filepath: File you wish to verify.
24
    :type filepath: str
25
26
    :param method: "crc32" or "adler32".
27
    :type method: str
28
29
    :param blocksize: How much of file to read at once. Default is 16MB.
30
    :type blocksize: int
31
    """
32 4
    hashfunc = zlib.crc32 if method == "crc32" else zlib.adler32
33 4
    seed = 0 if method == "crc32" else 1
34 4
    with open(filepath, 'rb') as file:
35 4
        for chunk in iter(lambda: file.read(blocksize), b''):
36 4
            seed = hashfunc(chunk, seed)
37 4
    final = format(seed & 0xFFFFFFFF, "08x")
38 4
    return final
39
40
41 4
def hashlib_hash(filepath, engine, blocksize=16 * 1024 * 1024):
42
    """
43
    Return MD5/SHA-1/SHA-2/SHA-3 hash of a file.
44
45
    :param filepath: File you wish to verify.
46
    :type filepath: str
47
48
    :param engine: Hash object to update with file contents.
49
    :type engine: _hashlib.HASH
50
51
    :param blocksize: How much of file to read at once. Default is 16MB.
52
    :type blocksize: int
53
    """
54 4
    hashfunc_reader(filepath, engine, blocksize)
55 4
    return engine.hexdigest()
56
57
58 4
def hashfunc_reader(filepath, engine, blocksize=16 * 1024 * 1024):
59
    """
60
    Generate hash from file contents.
61
62
    :param filepath: File you wish to verify.
63
    :type filepath: str
64
65
    :param engine: Hash object to update with file contents.
66
    :type engine: _hashlib.HASH
67
68
    :param blocksize: How much of file to read at once. Default is 16MB.
69
    :type blocksize: int
70
    """
71 4
    with open(filepath, 'rb') as file:
72 4
        while True:
73 4
            data = file.read(blocksize)
74 4
            if not data:
75 4
                break
76 4
            engine.update(data)
77
78
79 4
def ssl_hash(filepath, method, blocksize=16 * 1024 * 1024):
80
    """
81
    Return SSL-library dependent hash of a file.
82
83
    :param filepath: File you wish to verify.
84
    :type filepath: str
85
86
    :param method: Method to use: algorithms in hashlib that are not guaranteed.
87
    :type method: str
88
89
    :param blocksize: How much of file to read at once. Default is 16MB.
90
    :type blocksize: int
91
    """
92 4
    try:
93 4
        engine = hashlib.new(method)
94 4
        hashfunc_reader(filepath, engine, blocksize)
95 4
        return engine.hexdigest()
96 4
    except ValueError as exc:
97 4
        msg = "{0} HASH FAILED".format(method.upper())
98 4
        exceptions.handle_exception(exc, msg, None)
99
100
101 4
def calculate_escreens(pin, app, uptime, duration=30):
102
    """
103
    Calculate key for the Engineering Screens based on input.
104
105
    :param pin: PIN to check. 8 character hexadecimal, lowercase.
106
    :type pin: str
107
108
    :param app: App version. 10.x.y.zzzz.
109
    :type app: str
110
111
    :param uptime: Uptime in ms.
112
    :type uptime: str
113
114
    :param duration: 1, 3, 6, 15, 30 (days).
115
    :type duration: str
116
    """
117
    #: Somehow, values for lifetimes for escreens.
118 4
    lifetimes = {
119
        1: "",
120
        3: "Hello my baby, hello my honey, hello my rag time gal",
121
        7: "He was a boy, and she was a girl, can I make it any more obvious?",
122
        15: "So am I, still waiting, for this world to stop hating?",
123
        30: "I love myself today, not like yesterday. I'm cool, I'm calm, I'm gonna be okay"
124
    }
125
    #: Escreens magic HMAC secret.
126 4
    ehmac = 'Up the time stream without a TARDIS'
127 4
    duration = int(duration)
128 4
    if duration not in [1, 3, 6, 15, 30]:
129 4
        duration = 1
130 4
    data = pin.lower() + app + str(uptime) + lifetimes[duration]
131 4
    newhmac = hmac.new(ehmac.encode(), data.encode(), digestmod=hashlib.sha1)
132 4
    key = newhmac.hexdigest()[:8]
133 4
    return key.upper()
134
135
136 4
def get_hashfunc(hashtype):
137
    """
138
    Get genericized hash function from hash type.
139
140
    :param hashtype: Hash type.
141
    :type hashtype: str
142
    """
143 4
    hashfuncs = {"adler32": zlib_hash,
144
                 "crc32": zlib_hash,
145
                 "md4": ssl_hash,
146
                 "sha0": ssl_hash,
147
                 "ripemd160": ssl_hash,
148
                 "whirlpool": ssl_hash,
149
                 "md5": hashlib_hash,
150
                 "sha1": hashlib_hash,
151
                 "sha224": hashlib_hash,
152
                 "sha256": hashlib_hash,
153
                 "sha384": hashlib_hash,
154
                 "sha512": hashlib_hash,
155
                 "sha3224": hashlib_hash,
156
                 "sha3256": hashlib_hash,
157
                 "sha3384": hashlib_hash,
158
                 "sha3512": hashlib_hash}
159 4
    return hashfuncs[hashtype]
160
161
162 4
def get_engine(hashtype):
163
    """
164
    Get hashlib engine from hash type.
165
166
    :param hashtype: Hash type.
167
    :type hashtype: str
168
    """
169 4
    hashengines = {"md5": hashlib.md5(),
170
                   "sha1": hashlib.sha1(),
171
                   "sha224": hashlib.sha224(),
172
                   "sha256": hashlib.sha256(),
173
                   "sha384": hashlib.sha384(),
174
                   "sha512": hashlib.sha512()}
175 4
    if utilities.new_enough(6):
176
        hashengines.update({"sha3224": hashlib.sha3_224(),
177
                            "sha3256": hashlib.sha3_256(),
178
                            "sha3384": hashlib.sha3_384(),
179
                            "sha3512": hashlib.sha3_512()})
180 4
    return hashengines[hashtype]
181
182
183 4
def hash_get(filename, hashfunc, hashtype, workingdir, blocksize=16777216):
184
    """
185
    Generate and pretty format the hash result for a file.
186
187
    :param filename: File to hash.
188
    :type filename: str
189
190
    :param hashfunc: Hash function to use.
191
    :type hashfunc: function
192
193
    :param hashtype: Hash type.
194
    :type hashtype: str
195
196
    :param workingdir: Working directory.
197
    :type workingdir: str
198
199
    :param blocksize: Block size. Default is 16MB.
200
    :type blocksize: int
201
    """
202 4
    if hashfunc == hashlib_hash:
203 4
        method = get_engine(hashtype)
204
    else:
205 4
        method = hashtype
206 4
    result = hashfunc(os.path.join(workingdir, filename), method, blocksize)
207 4
    return "{0} {1}\n".format(result.upper(), os.path.basename(filename))
208
209
210 4
def base_hash(hashtype, source, workingdir, block, target, kwargs=None):
211
    """
212
    Generic hash function; get hash, write to file.
213
214
    :param hashtype: Hash type.
215
    :type hashtype: str
216
217
    :param source: File to be hashed; foobar.ext
218
    :type source: str
219
220
    :param workingdir: Path containing files you wish to verify.
221
    :type workingdir: str
222
223
    :param block: Blocksize, in bytes.
224
    :type block: int
225
226
    :param target: File to write to.
227
    :type target: file
228
229
    :param kwargs: Values. Refer to `:func:verifier_config_loader`.
230
    :type kwargs: dict
231
    """
232 4
    if kwargs[hashtype]:
233 4
        hash_generic = [hashtype.upper()]
234 4
        hashfunc = get_hashfunc(hashtype)
235 4
        hashtype2 = "sha" if hashtype == "sha0" else hashtype
236 4
        hash_generic.append(hash_get(source, hashfunc, hashtype2, workingdir, block))
237 4
        target.write("\n".join(hash_generic))
238
239
240 4
def hash_writer(source, dest, workingdir, kwargs=None):
241
    """
242
    Write per-file hashes.
243
244
    :param source: File to be hashed; foobar.ext
245
    :type source: str
246
247
    :param dest: Destination file; foobar.ext.cksum
248
    :type dest: str
249
250
    :param workingdir: Path containing files you wish to verify.
251
    :type workingdir: str
252
253
    :param kwargs: Values. Refer to `:func:verifier_config_loader`.
254
    :type kwargs: dict
255
    """
256 4
    block = int(kwargs['blocksize'])
257 4
    with open(dest, 'w') as target:
258 4
        base_hash("adler32", source, workingdir, block, target, kwargs)
259 4
        base_hash("crc32", source, workingdir, block, target, kwargs)
260 4
        base_hash("md4", source, workingdir, block, target, kwargs)
261 4
        base_hash("md5", source, workingdir, block, target, kwargs)
262 4
        base_hash("sha0", source, workingdir, block, target, kwargs)
263 4
        base_hash("sha1", source, workingdir, block, target, kwargs)
264 4
        base_hash("sha224", source, workingdir, block, target, kwargs)
265 4
        base_hash("sha256", source, workingdir, block, target, kwargs)
266 4
        base_hash("sha384", source, workingdir, block, target, kwargs)
267 4
        base_hash("sha512", source, workingdir, block, target, kwargs)
268 4
        base_hash("ripemd160", source, workingdir, block, target, kwargs)
269 4
        base_hash("whirlpool", source, workingdir, block, target, kwargs)
270 4
        if utilities.new_enough(6):
271
            base_hash("sha3224", source, workingdir, block, target, kwargs)
272
            base_hash("sha3256", source, workingdir, block, target, kwargs)
273
            base_hash("sha3384", source, workingdir, block, target, kwargs)
274
            base_hash("sha3512", source, workingdir, block, target, kwargs)
275
276
277 4
def filefilter(file, workingdir, extras=()):
278
    """
279
    Check if file in folder is a folder, or if it's got a forbidden extension.
280
281
    :param file: File to be hashed.
282
    :type file: str
283
284
    :param workingdir: Path containing files you wish to verify.
285
    :type workingdir: str
286
287
    :param extras: Tuple of extra extensions.
288
    :type extras: tuple
289
    """
290 4
    return not (os.path.isdir(os.path.join(workingdir, file)) or file.endswith(bbconstants.SUPPS + extras))
291
292
293 4
def prep_verifier(ldir, selective=False):
294
    """
295
    Prepare files for verifier function.
296
297
    :param ldir: Path containing files you wish to verify.
298
    :type ldir: str
299
300
    :param selective: Filtering filenames/extensions. Default is false.
301
    :type selective: bool
302
    """
303 4
    exts = (".txt",) if selective else ()
304 4
    fxs = [os.path.join(ldir, afx) for afx in os.listdir(ldir) if filefilter(afx, ldir, exts)]
305 4
    return fxs
306
307
308 4
def verifier(ldir, kwargs=None, selective=False):
309
    """
310
    For all files in a directory, perform various hash/checksum functions.
311
    Take dict to define hashes, write output to a/individual .cksum file(s).
312
313
    :param ldir: Path containing files you wish to verify.
314
    :type ldir: str
315
316
    :param kwargs: Values. Refer to `:func:verifier_config_loader`.
317
    :type kwargs: dict
318
319
    :param selective: Filtering filenames/extensions. Default is false.
320
    :type selective: bool
321
    """
322 4
    kwargs = verifier_config_loader() if kwargs is None else kwargs
323 4
    fxs = prep_verifier(ldir, selective)
324 4
    with concurrent.futures.ThreadPoolExecutor(max_workers=utilities.workers(fxs)) as xec:
325 4
        for file in fxs:
326 4
            verifier_individual(xec, ldir, file, kwargs)
327
328
329 4
def verifier_individual(xec, ldir, file, kwargs):
330
    """
331
    Individually verify files through a ThreadPoolExecutor.
332
333
    :param xec: ThreadPoolExecutor instance.
334
    :type xec: concurrent.futures.ThreadPoolExecutor
335
336
    :param ldir: Path containing files you wish to verify.
337
    :type ldir: str
338
339
    :param file: Filename.
340
    :type file: str
341
342
    :param kwargs: Values. Refer to `:func:verifier_config_loader`.
343
    :type kwargs: dict
344
    """
345 4
    print("HASHING:", os.path.basename(file))
346 4
    basename = file + ".cksum"
347 4
    targetname = os.path.join(ldir, basename)
348 4
    try:
349 4
        xec.submit(hash_writer, file, targetname, ldir, kwargs)
350 4
    except Exception as exc:
1 ignored issue
show
Best Practice introduced by
Catching very general exceptions such as Exception is usually not recommended.

Generally, you would want to handle very specific errors in the exception handler. This ensure that you do not hide other types of errors which should be fixed.

So, unless you specifically plan to handle any error, consider adding a more specific exception.

Loading history...
351 4
        exceptions.handle_exception(exc)
352
353
354 4
def verifier_config_loader(homepath=None):
355
    """
356
    Read a ConfigParser file to get hash preferences.
357
358
    :param homepath: Folder containing ini file. Default is user directory.
359
    :type homepath: str
360
    """
361 4
    ini = iniconfig.generic_loader("hashmodes", homepath)
362 4
    results = {}
363 4
    results['crc32'] = bool(ini.getboolean('crc32', fallback=False))
364 4
    results['adler32'] = bool(ini.getboolean('adler32', fallback=False))
365 4
    results['sha0'] = bool(ini.getboolean('sha0', fallback=False))
366 4
    results['sha1'] = bool(ini.getboolean('sha1', fallback=True))
367 4
    results['sha224'] = bool(ini.getboolean('sha224', fallback=False))
368 4
    results['sha256'] = bool(ini.getboolean('sha256', fallback=True))
369 4
    results['sha384'] = bool(ini.getboolean('sha384', fallback=False))
370 4
    results['sha512'] = bool(ini.getboolean('sha512', fallback=False))
371 4
    results['md5'] = bool(ini.getboolean('md5', fallback=True))
372 4
    results['md4'] = bool(ini.getboolean('md4', fallback=False))
373 4
    results['ripemd160'] = bool(ini.getboolean('ripemd160', fallback=False))
374 4
    results['whirlpool'] = bool(ini.getboolean('whirlpool', fallback=False))
375 4
    results['blocksize'] = int(ini.getint('blocksize', fallback=16777216))
376 4
    results['sha3224'] = bool(ini.getboolean('sha3224', fallback=False))
377 4
    results['sha3256'] = bool(ini.getboolean('sha3256', fallback=False))
378 4
    results['sha3384'] = bool(ini.getboolean('sha3384', fallback=False))
379 4
    results['sha3512'] = bool(ini.getboolean('sha3512', fallback=False))
380 4
    return results
381
382
383 4
def verifier_config_writer(resultdict=None, homepath=None):
384
    """
385
    Write a ConfigParser file to store hash preferences.
386
387
    :param resultdict: Dictionary of results: {method, bool}
388
    :type resultdict: dict({str, bool})
389
390
    :param homepath: Folder containing ini file. Default is user directory.
391
    :type homepath: str
392
    """
393 4
    if resultdict is None:
394 4
        resultdict = verifier_config_loader()
395 4
    results = {method: str(flag).lower() for method, flag in resultdict.items()}
396
    iniconfig.generic_writer("hashmodes", results, homepath)
397