Completed
Push — master ( ceb745...51cded )
by John
02:41
created

hash_writer()   B

Complexity

Conditions 3

Size

Total Lines 35

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 16
CRAP Score 3.072

Importance

Changes 0
Metric Value
cc 3
c 0
b 0
f 0
dl 0
loc 35
rs 8.8571
ccs 16
cts 20
cp 0.8
crap 3.072
1
#!/usr/bin/env python3
2 1
"""This module is used to generate file hashes/checksums."""
3
4 1
import zlib  # crc32/adler32
5 1
import hashlib  # all other hashes
6 1
import hmac  # escreens is a hmac, news at 11
7 1
import os  # path work
8 1
import concurrent.futures  # parallelization
9 1
from bbarchivist import bbconstants  # premade stuff
10 1
from bbarchivist import exceptions  # exceptions
11 1
from bbarchivist import utilities  # cores
12 1
from bbarchivist import iniconfig  # config parsing
13
14 1
__author__ = "Thurask"
15 1
__license__ = "WTFPL v2"
16 1
__copyright__ = "2015-2017 Thurask"
17
18
19 1
def zlib_hash(filepath, method, blocksize=16 * 1024 * 1024):
20
    """
21
    Return zlib-based (i.e. CRC32/Adler32) checksum of a file.
22
23
    :param filepath: File you wish to verify.
24
    :type filepath: str
25
26
    :param method: "crc32" or "adler32".
27
    :type method: str
28
29
    :param blocksize: How much of file to read at once. Default is 16MB.
30
    :type blocksize: int
31
    """
32 1
    hashfunc, seed = zlib_handler(method)
33 1
    with open(filepath, 'rb') as file:
34 1
        for chunk in iter(lambda: file.read(blocksize), b''):
35 1
            seed = hashfunc(chunk, seed)
36 1
    final = format(seed & 0xFFFFFFFF, "08x")
37 1
    return final
38
39
40 1
def zlib_handler(method):
41
    """
42
    Prepare hash method and seed depending on CRC32/Adler32.
43
44
    :param method: "crc32" or "adler32".
45
    :type method: str
46
    """
47 1
    hashfunc = zlib.crc32 if method == "crc32" else zlib.adler32
48 1
    seed = 0 if method == "crc32" else 1
49 1
    return hashfunc, seed
50
51
52 1
def hashlib_hash(filepath, engine, blocksize=16 * 1024 * 1024):
53
    """
54
    Return MD5/SHA-1/SHA-2/SHA-3 hash of a file.
55
56
    :param filepath: File you wish to verify.
57
    :type filepath: str
58
59
    :param engine: Hash object to update with file contents.
60
    :type engine: _hashlib.HASH
61
62
    :param blocksize: How much of file to read at once. Default is 16MB.
63
    :type blocksize: int
64
    """
65 1
    hashfunc_reader(filepath, engine, blocksize)
66 1
    return engine.hexdigest()
67
68
69 1
def hashfunc_reader(filepath, engine, blocksize=16 * 1024 * 1024):
70
    """
71
    Generate hash from file contents.
72
73
    :param filepath: File you wish to verify.
74
    :type filepath: str
75
76
    :param engine: Hash object to update with file contents.
77
    :type engine: _hashlib.HASH
78
79
    :param blocksize: How much of file to read at once. Default is 16MB.
80
    :type blocksize: int
81
    """
82 1
    with open(filepath, 'rb') as file:
83 1
        while True:
84 1
            data = file.read(blocksize)
85 1
            if not data:
86 1
                break
87 1
            engine.update(data)
88
89
90 1
def ssl_hash(filepath, method, blocksize=16 * 1024 * 1024):
91
    """
92
    Return SSL-library dependent hash of a file.
93
94
    :param filepath: File you wish to verify.
95
    :type filepath: str
96
97
    :param method: Method to use: algorithms in hashlib that are not guaranteed.
98
    :type method: str
99
100
    :param blocksize: How much of file to read at once. Default is 16MB.
101
    :type blocksize: int
102
    """
103 1
    try:
104 1
        engine = hashlib.new(method)
105 1
        hashfunc_reader(filepath, engine, blocksize)
106 1
        return engine.hexdigest()
107 1
    except ValueError as exc:
108 1
        msg = "{0} HASH FAILED".format(method.upper())
109 1
        exceptions.handle_exception(exc, msg, None)
110
111
112 1
def calculate_escreens(pin, app, uptime, duration=30):
113
    """
114
    Calculate key for the Engineering Screens based on input.
115
116
    :param pin: PIN to check. 8 character hexadecimal, lowercase.
117
    :type pin: str
118
119
    :param app: App version. 10.x.y.zzzz.
120
    :type app: str
121
122
    :param uptime: Uptime in ms.
123
    :type uptime: str
124
125
    :param duration: 1, 3, 6, 15, 30 (days).
126
    :type duration: str
127
    """
128
    #: Somehow, values for lifetimes for escreens.
129 1
    lifetimes = {
130
        1: "",
131
        3: "Hello my baby, hello my honey, hello my rag time gal",
132
        7: "He was a boy, and she was a girl, can I make it any more obvious?",
133
        15: "So am I, still waiting, for this world to stop hating?",
134
        30: "I love myself today, not like yesterday. I'm cool, I'm calm, I'm gonna be okay"
135
    }
136
    #: Escreens magic HMAC secret.
137 1
    ehmac = 'Up the time stream without a TARDIS'
138 1
    duration = int(duration)
139 1
    if duration not in [1, 3, 6, 15, 30]:
140 1
        duration = 1
141 1
    data = pin.lower() + app + str(uptime) + lifetimes[duration]
142 1
    newhmac = hmac.new(ehmac.encode(), data.encode(), digestmod=hashlib.sha1)
143 1
    key = newhmac.hexdigest()[:8]
144 1
    return key.upper()
145
146
147 1
def get_hashfunc(hashtype):
148
    """
149
    Get genericized hash function from hash type.
150
151
    :param hashtype: Hash type.
152
    :type hashtype: str
153
    """
154 1
    hashfuncs = {"adler32": zlib_hash,
155
                 "crc32": zlib_hash,
156
                 "md4": ssl_hash,
157
                 "sha0": ssl_hash,
158
                 "ripemd160": ssl_hash,
159
                 "whirlpool": ssl_hash,
160
                 "md5": hashlib_hash,
161
                 "sha1": hashlib_hash,
162
                 "sha224": hashlib_hash,
163
                 "sha256": hashlib_hash,
164
                 "sha384": hashlib_hash,
165
                 "sha512": hashlib_hash,
166
                 "sha3224": hashlib_hash,
167
                 "sha3256": hashlib_hash,
168
                 "sha3384": hashlib_hash,
169
                 "sha3512": hashlib_hash}
170 1
    return hashfuncs[hashtype]
171
172
173 1
def get_engine(hashtype):
174
    """
175
    Get hashlib engine from hash type.
176
177
    :param hashtype: Hash type.
178
    :type hashtype: str
179
    """
180 1
    hashengines = {"md5": hashlib.md5(),
181
                   "sha1": hashlib.sha1(),
182
                   "sha224": hashlib.sha224(),
183
                   "sha256": hashlib.sha256(),
184
                   "sha384": hashlib.sha384(),
185
                   "sha512": hashlib.sha512()}
186 1
    if utilities.new_enough(6):
187
        hashengines.update({"sha3224": hashlib.sha3_224(),
188
                            "sha3256": hashlib.sha3_256(),
189
                            "sha3384": hashlib.sha3_384(),
190
                            "sha3512": hashlib.sha3_512()})
191 1
    return hashengines[hashtype]
192
193
194 1
def hash_get(filename, hashfunc, hashtype, workingdir, blocksize=16777216):
195
    """
196
    Generate and pretty format the hash result for a file.
197
198
    :param filename: File to hash.
199
    :type filename: str
200
201
    :param hashfunc: Hash function to use.
202
    :type hashfunc: function
203
204
    :param hashtype: Hash type.
205
    :type hashtype: str
206
207
    :param workingdir: Working directory.
208
    :type workingdir: str
209
210
    :param blocksize: Block size. Default is 16MB.
211
    :type blocksize: int
212
    """
213 1
    if hashfunc == hashlib_hash:
214 1
        method = get_engine(hashtype)
215
    else:
216 1
        method = hashtype
217 1
    result = hashfunc(os.path.join(workingdir, filename), method, blocksize)
218 1
    return "{0} {1}\n".format(result.upper(), os.path.basename(filename))
219
220
221 1
def base_hash(hashtype, source, workingdir, block, target, kwargs=None):
222
    """
223
    Generic hash function; get hash, write to file.
224
225
    :param hashtype: Hash type.
226
    :type hashtype: str
227
228
    :param source: File to be hashed; foobar.ext
229
    :type source: str
230
231
    :param workingdir: Path containing files you wish to verify.
232
    :type workingdir: str
233
234
    :param block: Blocksize, in bytes.
235
    :type block: int
236
237
    :param target: File to write to.
238
    :type target: file
239
240
    :param kwargs: Values. Refer to `:func:verifier_config_loader`.
241
    :type kwargs: dict
242
    """
243 1
    if kwargs[hashtype]:
244 1
        hash_generic = [hashtype.upper()]
245 1
        hashfunc = get_hashfunc(hashtype)
246 1
        hashtype2 = "sha" if hashtype == "sha0" else hashtype
247 1
        hash_generic.append(hash_get(source, hashfunc, hashtype2, workingdir, block))
248 1
        target.write("\n".join(hash_generic))
249
250
251 1
def hash_writer(source, dest, workingdir, kwargs=None):
252
    """
253
    Write per-file hashes.
254
255
    :param source: File to be hashed; foobar.ext
256
    :type source: str
257
258
    :param dest: Destination file; foobar.ext.cksum
259
    :type dest: str
260
261
    :param workingdir: Path containing files you wish to verify.
262
    :type workingdir: str
263
264
    :param kwargs: Values. Refer to `:func:verifier_config_loader`.
265
    :type kwargs: dict
266
    """
267 1
    block = int(kwargs['blocksize'])
268 1
    with open(dest, 'w') as target:
269 1
        base_hash("adler32", source, workingdir, block, target, kwargs)
270 1
        base_hash("crc32", source, workingdir, block, target, kwargs)
271 1
        base_hash("md4", source, workingdir, block, target, kwargs)
272 1
        base_hash("md5", source, workingdir, block, target, kwargs)
273 1
        base_hash("sha0", source, workingdir, block, target, kwargs)
274 1
        base_hash("sha1", source, workingdir, block, target, kwargs)
275 1
        base_hash("sha224", source, workingdir, block, target, kwargs)
276 1
        base_hash("sha256", source, workingdir, block, target, kwargs)
277 1
        base_hash("sha384", source, workingdir, block, target, kwargs)
278 1
        base_hash("sha512", source, workingdir, block, target, kwargs)
279 1
        base_hash("ripemd160", source, workingdir, block, target, kwargs)
280 1
        base_hash("whirlpool", source, workingdir, block, target, kwargs)
281 1
        if utilities.new_enough(6):
282
            base_hash("sha3224", source, workingdir, block, target, kwargs)
283
            base_hash("sha3256", source, workingdir, block, target, kwargs)
284
            base_hash("sha3384", source, workingdir, block, target, kwargs)
285
            base_hash("sha3512", source, workingdir, block, target, kwargs)
286
287
288 1
def filefilter(file, workingdir, extras=()):
289
    """
290
    Check if file in folder is a folder, or if it's got a forbidden extension.
291
292
    :param file: File to be hashed.
293
    :type file: str
294
295
    :param workingdir: Path containing files you wish to verify.
296
    :type workingdir: str
297
298
    :param extras: Tuple of extra extensions.
299
    :type extras: tuple
300
    """
301 1
    return not (os.path.isdir(os.path.join(workingdir, file)) or file.endswith(bbconstants.SUPPS + extras))
302
303
304 1
def prep_verifier(ldir, selective=False):
305
    """
306
    Prepare files for verifier function.
307
308
    :param ldir: Path containing files you wish to verify.
309
    :type ldir: str
310
311
    :param selective: Filtering filenames/extensions. Default is false.
312
    :type selective: bool
313
    """
314 1
    exts = (".txt",) if selective else ()
315 1
    fxs = [os.path.join(ldir, afx) for afx in os.listdir(ldir) if filefilter(afx, ldir, exts)]
316 1
    return fxs
317
318
319 1
def verifier(ldir, kwargs=None, selective=False):
320
    """
321
    For all files in a directory, perform various hash/checksum functions.
322
    Take dict to define hashes, write output to a/individual .cksum file(s).
323
324
    :param ldir: Path containing files you wish to verify.
325
    :type ldir: str
326
327
    :param kwargs: Values. Refer to `:func:verifier_config_loader`.
328
    :type kwargs: dict
329
330
    :param selective: Filtering filenames/extensions. Default is false.
331
    :type selective: bool
332
    """
333 1
    kwargs = verifier_config_loader() if kwargs is None else kwargs
334 1
    fxs = prep_verifier(ldir, selective)
335 1
    with concurrent.futures.ThreadPoolExecutor(max_workers=utilities.cpu_workers(fxs)) as xec:
336 1
        for file in fxs:
337 1
            verifier_individual(xec, ldir, file, kwargs)
338
339
340 1
def verifier_individual(xec, ldir, file, kwargs):
341
    """
342
    Individually verify files through a ThreadPoolExecutor.
343
344
    :param xec: ThreadPoolExecutor instance.
345
    :type xec: concurrent.futures.ThreadPoolExecutor
346
347
    :param ldir: Path containing files you wish to verify.
348
    :type ldir: str
349
350
    :param file: Filename.
351
    :type file: str
352
353
    :param kwargs: Values. Refer to `:func:verifier_config_loader`.
354
    :type kwargs: dict
355
    """
356 1
    print("HASHING:", os.path.basename(file))
357 1
    basename = file + ".cksum"
358 1
    targetname = os.path.join(ldir, basename)
359 1
    try:
360 1
        xec.submit(hash_writer, file, targetname, ldir, kwargs)
361 1
    except Exception as exc:
1 ignored issue
show
Best Practice introduced by
Catching very general exceptions such as Exception is usually not recommended.

Generally, you would want to handle very specific errors in the exception handler. This ensure that you do not hide other types of errors which should be fixed.

So, unless you specifically plan to handle any error, consider adding a more specific exception.

Loading history...
362 1
        exceptions.handle_exception(exc)
363
364
365 1
def verifier_config_loader(homepath=None):
366
    """
367
    Read a ConfigParser file to get hash preferences.
368
369
    :param homepath: Folder containing ini file. Default is user directory.
370
    :type homepath: str
371
    """
372 1
    ini = iniconfig.generic_loader("hashmodes", homepath)
373 1
    results = {}
374 1
    results['crc32'] = bool(ini.getboolean('crc32', fallback=False))
375 1
    results['adler32'] = bool(ini.getboolean('adler32', fallback=False))
376 1
    results['sha0'] = bool(ini.getboolean('sha0', fallback=False))
377 1
    results['sha1'] = bool(ini.getboolean('sha1', fallback=True))
378 1
    results['sha224'] = bool(ini.getboolean('sha224', fallback=False))
379 1
    results['sha256'] = bool(ini.getboolean('sha256', fallback=True))
380 1
    results['sha384'] = bool(ini.getboolean('sha384', fallback=False))
381 1
    results['sha512'] = bool(ini.getboolean('sha512', fallback=False))
382 1
    results['md5'] = bool(ini.getboolean('md5', fallback=True))
383 1
    results['md4'] = bool(ini.getboolean('md4', fallback=False))
384 1
    results['ripemd160'] = bool(ini.getboolean('ripemd160', fallback=False))
385 1
    results['whirlpool'] = bool(ini.getboolean('whirlpool', fallback=False))
386 1
    results['blocksize'] = int(ini.getint('blocksize', fallback=16777216))
387 1
    results['sha3224'] = bool(ini.getboolean('sha3224', fallback=False))
388 1
    results['sha3256'] = bool(ini.getboolean('sha3256', fallback=False))
389 1
    results['sha3384'] = bool(ini.getboolean('sha3384', fallback=False))
390 1
    results['sha3512'] = bool(ini.getboolean('sha3512', fallback=False))
391 1
    return results
392
393
394 1
def verifier_config_writer(resultdict=None, homepath=None):
395
    """
396
    Write a ConfigParser file to store hash preferences.
397
398
    :param resultdict: Dictionary of results: {method, bool}
399
    :type resultdict: dict({str, bool})
400
401
    :param homepath: Folder containing ini file. Default is user directory.
402
    :type homepath: str
403
    """
404 1
    if resultdict is None:
405 1
        resultdict = verifier_config_loader()
406 1
    results = {method: str(flag).lower() for method, flag in resultdict.items()}
407
    iniconfig.generic_writer("hashmodes", results, homepath)
408