Completed
Push — master ( f50c6f...3186b0 )
by John
02:36
created

compress_config_loader()   A

Complexity

Conditions 3

Size

Total Lines 12

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 5
CRAP Score 3.0416

Importance

Changes 2
Bugs 0 Features 0
Metric Value
cc 3
c 2
b 0
f 0
dl 0
loc 12
ccs 5
cts 6
cp 0.8333
crap 3.0416
rs 9.4285
1
#!/usr/bin/env python3
2 4
"""This module is used to operate with archives."""
3
4 4
import os  # filesystem read
5 4
import subprocess  # invocation of 7z, cap
6 4
import zipfile  # zip compresssion
7 4
import tarfile  # txz/tbz/tgz/tar compression
8 4
from bbarchivist import utilities  # platform determination
9 4
from bbarchivist import bbconstants  # premade stuff
10 4
from bbarchivist import decorators  # timer
11 4
from bbarchivist import iniconfig  # config parsing
12
13 4
__author__ = "Thurask"
14 4
__license__ = "WTFPL v2"
15 4
__copyright__ = "Copyright 2015-2016 Thurask"
16
17
18 4
def smart_is_tarfile(filepath):
19
    """
20
    :func:`tarfile.is_tarfile` plus error handling.
21
22
    :param filepath: Filename.
23
    :type filepath: str
24
    """
25 4
    try:
26 4
        istar = tarfile.is_tarfile(filepath)
27 4
    except (OSError, IOError):
28 4
        return False
29
    else:
30 4
        return istar
31
32
33 4
@decorators.timer
34 4
def sz_compress(filepath, filename, szexe=None, strength=5, errors=False):
35
    """
36
    Pack a file into a LZMA2 7z file.
37
38
    :param filepath: Basename of file, no extension.
39
    :type filepath: str
40
41
    :param filename: Name of file to pack.
42
    :type filename: str
43
44
    :param szexe: Path to 7z executable.
45
    :type szexe: str
46
47
    :param strength: Compression strength. 5 is normal, 9 is ultra.
48
    :type strength: int
49
50
    :param errors: Print completion status message. Default is false.
51
    :type errors: bool
52
    """
53 4
    szcodes = {
54
        0: "NO ERRORS",
55
        1: "COMPLETED WITH WARNINGS",
56
        2: "FATAL ERROR",
57
        7: "COMMAND LINE ERROR",
58
        8: "OUT OF MEMORY ERROR",
59
        255: "USER STOPPED PROCESS"
60
    }
61 4
    strength = str(strength)
62 4
    rawname = os.path.dirname(filepath)
63 4
    thr = str(utilities.get_core_count())
64 4
    fold = os.path.join(rawname, filename)
65 4
    cmd = '{0} a -mx{1} -m0=lzma2 -mmt{2} "{3}.7z" "{4}"'.format(szexe, strength,
66
                                                                 thr, filepath, fold)
67 4
    excode = sz_subprocess(cmd)
68 4
    if errors:
69 4
        print(szcodes[excode])
70
71
72 4
def sz_subprocess(cmd):
73
    """
74
    Subprocess wrapper for 7-Zip commands.
75
76
    :param cmd: Command to pass to subprocess.
77
    :type cmd: str
78
    """
79 4
    with open(os.devnull, 'wb') as dnull:
80 4
        output = subprocess.call(cmd, stdout=dnull, stderr=subprocess.STDOUT, shell=True)
81 4
    return output
82
83
84 4
def sz_verify(filepath, szexe=None):
85
    """
86
    Verify that a .7z file is valid and working.
87
88
    :param filepath: Filename.
89
    :type filepath: str
90
91
    :param szexe: Path to 7z executable.
92
    :type szexe: str
93
    """
94 4
    filepath = os.path.abspath(filepath)
95 4
    cmd = '{0} t "{1}"'.format(szexe, filepath)
96 4
    excode = sz_subprocess(cmd)
97 4
    return excode == 0
98
99
100 4
def generic_tarfile_verify(filepath, method):
101
    """
102
    Verify that a tar/tgz/tbz/txz file is valid and working.
103
104
    :param filepath: Filename.
105
    :type filepath: str
106
107
    :param method: Tarfile read method.
108
    :type method: str
109
    """
110 4
    if smart_is_tarfile(filepath):
111 4
        with tarfile.open(filepath, method) as thefile:
112 4
            mems = thefile.getmembers()
113 4
        return False if not mems else True
114
    else:
115 4
        return False
116
117
118 4
def generic_tarfile_compress(archivename, filename, method, strength=5):
119
    """
120
    Pack a file into an uncompressed/gzip/bzip2/LZMA tarfile.
121
122
    :param archivename: Archive name.
123
    :type archivename: str
124
125
    :param filename: Name of file to pack into archive.
126
    :type filename: str
127
128
    :param method: Tarfile compress method.
129
    :type method: str
130
131
    :param strength: Compression strength. 5 is normal, 9 is ultra.
132
    :type strength: int
133
    """
134 4
    nocomp = ["w:", "w:xz"]  # methods w/o compression: tar, tar.xz
135 4
    generic_compresslevel(archivename, filename, method, strength) if method not in nocomp else generic_nocompresslevel(archivename, filename, method)
0 ignored issues
show
Unused Code Bug introduced by
The expression generic_compresslevel(ar...name, filename, method) does not seem to have sideeffects and its result is not used.

If a expression has no sideeffects (any lasting effect after it has been called) and its return value is not used, this usually means that this code can be removed or that an assignment is missing.

Loading history...
136
137
138 4
def generic_compresslevel(archivename, filename, method, strength=5):
139
    """
140
    Pack a file into a gzip/bzip2 tarfile.
141
142
    :param archivename: Archive name.
143
    :type archivename: str
144
145
    :param filename: Name of file to pack into archive.
146
    :type filename: str
147
148
    :param method: Tarfile compress method.
149
    :type method: str
150
151
    :param strength: Compression strength. 5 is normal, 9 is ultra.
152
    :type strength: int
153
    """
154 4
    with tarfile.open(archivename, method, compresslevel=strength) as afile:
155 4
        afile.add(filename, filter=None, arcname=os.path.basename(filename))
156
157
158 4
def generic_nocompresslevel(archivename, filename, method):
159
    """
160
    Pack a file into an uncompressed/LZMA tarfile.
161
162
    :param archivename: Archive name.
163
    :type archivename: str
164
165
    :param filename: Name of file to pack into archive.
166
    :type filename: str
167
168
    :param method: Tarfile compress method.
169
    :type method: str
170
    """
171 4
    with tarfile.open(archivename, method) as afile:
172 4
        afile.add(filename, filter=None, arcname=os.path.basename(filename))
173
174
175 4
@decorators.timer
176
def tar_compress(filepath, filename):
177
    """
178
    Pack a file into an uncompressed tarfile.
179
180
    :param filepath: Basename of file, no extension.
181
    :type filepath: str
182
183
    :param filename: Name of file to pack.
184
    :type filename: str
185
    """
186 4
    generic_tarfile_compress("{0}.tar".format(filepath), filename, "w:")
187
188
189 4
def tar_verify(filepath):
190
    """
191
    Verify that a tar file is valid and working.
192
193
    :param filepath: Filename.
194
    :type filepath: str
195
    """
196 4
    return generic_tarfile_verify(filepath, "r:")
197
198
199 4
@decorators.timer
200 4
def tgz_compress(filepath, filename, strength=5):
201
    """
202
    Pack a file into a gzip tarfile.
203
204
    :param filepath: Basename of file, no extension.
205
    :type filepath: str
206
207
    :param filename: Name of file to pack.
208
    :type filename: str
209
210
    :param strength: Compression strength. 5 is normal, 9 is ultra.
211
    :type strength: int
212
    """
213 4
    generic_tarfile_compress("{0}.tar.gz".format(filepath), filename, "w:gz", strength)
214
215
216 4
def tgz_verify(filepath):
217
    """
218
    Verify that a tar.gz file is valid and working.
219
220
    :param filepath: Filename.
221
    :type filepath: str
222
    """
223 4
    return generic_tarfile_verify(filepath, "r:gz")
224
225
226 4
@decorators.timer
227 4
def tbz_compress(filepath, filename, strength=5):
228
    """
229
    Pack a file into a bzip2 tarfile.
230
231
    :param filepath: Basename of file, no extension.
232
    :type filepath: str
233
234
    :param filename: Name of file to pack.
235
    :type filename: str
236
237
    :param strength: Compression strength. 5 is normal, 9 is ultra.
238
    :type strength: int
239
    """
240 4
    generic_tarfile_compress("{0}.tar.bz2".format(filepath), filename, "w:bz2", strength)
241
242
243 4
def tbz_verify(filepath):
244
    """
245
    Verify that a tar.bz2 file is valid and working.
246
247
    :param filepath: Filename.
248
    :type filepath: str
249
    """
250 4
    return generic_tarfile_verify(filepath, "r:bz2")
251
252
253 4
@decorators.timer
254
def txz_compress(filepath, filename):
255
    """
256
    Pack a file into a LZMA tarfile.
257
258
    :param filepath: Basename of file, no extension.
259
    :type filepath: str
260
261
    :param filename: Name of file to pack.
262
    :type filename: str
263
    """
264 4
    if not utilities.new_enough(3):
265
        pass
266
    else:
267 4
        generic_tarfile_compress("{0}.tar.xz".format(filepath), filename, "w:xz")
268
269
270 4
def txz_verify(filepath):
271
    """
272
    Verify that a tar.xz file is valid and working.
273
274
    :param filepath: Filename.
275
    :type filepath: str
276
    """
277 4
    if not utilities.new_enough(3):
278
        return None
279
    else:
280 4
        return generic_tarfile_verify(filepath, "r:xz")
281
282
283 4
@decorators.timer
284
def zip_compress(filepath, filename):
285
    """
286
    Pack a file into a DEFLATE zipfile.
287
288
    :param filepath: Basename of file, no extension.
289
    :type filepath: str
290
291
    :param filename: Name of file to pack.
292
    :type filename: str
293
    """
294 4
    with zipfile.ZipFile("{0}.zip".format(filepath), 'w', zipfile.ZIP_DEFLATED, allowZip64=True) as zfile:
295 4
        zfile.write(filename, arcname=os.path.basename(filename))
296
297
298 4
def zip_verify(filepath):
299
    """
300
    Verify that a .zip file is valid and working.
301
302
    :param filepath: Filename.
303
    :type filepath: str
304
    """
305 4
    if zipfile.is_zipfile(filepath):
306 4
        try:
307 4
            with zipfile.ZipFile(filepath, "r") as zfile:
308 4
                brokens = zfile.testzip()
309
        except zipfile.BadZipFile:
310
            brokens = filepath
311 4
        return brokens != filepath
312
    else:
313 4
        return False
314
315
316 4
def filter_method(method, szexe=None):
317
    """
318
    Make sure methods are OK.
319
320
    :param method: Compression method to use.
321
    :type method: str
322
323
    :param szexe: Path to 7z executable, if needed.
324
    :type szexe: str
325
    """
326 4
    if not utilities.new_enough(3) and method == "txz":
327
        method = "zip"  # fallback
328 4
    if method == "7z" and szexe is None:
329 4
        ifexists = utilities.prep_seven_zip()  # see if 7z exists
330 4
        if not ifexists:
331 4
            method = "zip"  # fallback
332
        else:
333 4
            szexe = utilities.get_seven_zip(False)
334 4
    return method
335
336
337 4
def calculate_strength():
338
    """
339
    Determine zip/gzip/bzip2 strength by OS bit setting.
340
    """
341 4
    strength = 9 if utilities.is_amd64() else 5
342 4
    return strength
343
344
345 4
def filtercomp(files, criterion, critargs, boolfilt=True):
346
    """
347
    :param files: Files to work on.
348
    :type files: list(str)
349
350
    :param criterion: Function to use for evaluation.
351
    :type criterion: func
352
353
    :param critargs: Arguments for function, other than file.
354
    :type critargs: list
355
356
    :param boolfilt: True if comparing criterion, False if comparing not criterion.
357
    :type boolfilt: bool
358
    """
359 4
    if boolfilt:
360 4
        fx2 = [file for file in files if criterion(file, *critargs)]
361
    else:
362 4
        fx2 = [file for file in files if not criterion(file, *critargs)]
363 4
    return fx2
364
365
366 4
def compressfilter(filepath, selective=False):
367
    """
368
    Filter directory listing of working directory.
369
370
    :param filepath: Working directory. Required.
371
    :type filepath: str
372
373
    :param selective: Only compress autoloaders. Default is false.
374
    :type selective: bool/str
375
    """
376 4
    arx = bbconstants.ARCS
377 4
    pfx = bbconstants.PREFIXES
378 4
    files = [file for file in os.listdir(filepath) if not os.path.isdir(file)]
379 4
    if selective is None:
380 4
        filt2 = os.listdir(filepath)
381 4
    elif selective == "arcsonly":
382 4
        filt2 = filtercomp(files, utilities.prepends, ("", arx))
383 4
    elif selective:
384 4
        filt0 = filtercomp(files, utilities.prepends, (pfx, ""))
385 4
        filt1 = filtercomp(filt0, utilities.prepends, ("", arx), False)  # pop archives
386 4
        filt2 = filtercomp(filt1, utilities.prepends, ("", ".exe"))  # include exes
387
    else:
388 4
        filt2 = filtercomp(files, utilities.prepends, ("", arx), False)  # pop archives
389 4
    filt3 = [os.path.join(filepath, file) for file in filt2]
390 4
    return filt3
391
392
393 4
def prep_compress_function(method="7z", szexe=None, errors=False):
394
    """
395
    Prepare compression function and partial arguments.
396
397
    :param method: Compression type. Default is "7z".
398
    :type method: str
399
400
    :param szexe: Path to 7z executable, if needed.
401
    :type szexe: str
402
403
    :param errors: Print completion status message. Default is false.
404
    :type errors: bool
405
    """
406 4
    methods = {"7z": sz_compress, "tgz": tgz_compress, "txz": txz_compress, "tbz": tbz_compress,
407
               "tar": tar_compress, "zip": zip_compress}
408 4
    args = [szexe] if method == "7z" else []
409 4
    if method in ("7z", "tbz", "tgz"):
410 4
        args.append(calculate_strength())
411 4
    if method == "7z":
412 4
        args.append(errors)
413 4
    return methods[method], args
414
415
416 4
def compress(filepath, method="7z", szexe=None, selective=False, errors=False):
417
    """
418
    Compress all autoloader files in a given folder, with a given method.
419
420
    :param filepath: Working directory. Required.
421
    :type filepath: str
422
423
    :param method: Compression type. Default is "7z".
424
    :type method: str
425
426
    :param szexe: Path to 7z executable, if needed.
427
    :type szexe: str
428
429
    :param selective: Only compress autoloaders. Default is false.
430
    :type selective: bool
431
432
    :param errors: Print completion status message. Default is false.
433
    :type errors: bool
434
    """
435 4
    method = filter_method(method, szexe)
436 4
    files = compressfilter(filepath, selective)
437 4
    for file in files:
438 4
        fname = os.path.basename(file)
439 4
        filename = os.path.splitext(fname)[0]
440 4
        fileloc = os.path.join(filepath, filename)
441 4
        print("COMPRESSING: {0}".format(fname))
442 4
        compfunc, extargs = prep_compress_function(method, szexe, errors)
443 4
        compfunc(fileloc, file, *extargs)
444 4
    return True
445
446
447 4
def tarzip_verifier(file):
448
    """
449
    Assign .tar.xxx, .tar and .zip verifiers.
450
451
    :param file: Filename.
452
    :type file: str
453
    """
454 4
    maps = {".tar.gz": tgz_verify, ".tar.xz": txz_verify,
455
            ".tar.bz2": tbz_verify, ".tar": tar_verify,
456
            ".zip": zip_verify, ".bar": zip_verify}
457 4
    for key, value in maps.items():
458 4
        if file.endswith(key):
459
            return value(file)
460
461
462 4
def decide_verifier(file, szexe=None):
463
    """
464
    Decide which verifier function to use.
465
466
    :param file: Filename.
467
    :type file: str
468
469
    :param szexe: Path to 7z executable, if needed.
470
    :type szexe: str
471
    """
472 4
    print("VERIFYING: {0}".format(file))
473 4
    if file.endswith(".7z") and szexe is not None:
474
        verif = sz_verify(os.path.abspath(file), szexe)
475
    else:
476 4
        verif = tarzip_verifier(file)
477 4
    decide_verifier_printer(file, verif)
478 4
    return verif
479
480
481 4
def decide_verifier_printer(file, verif):
482
    """
483
    Print status of verifier function.
484
485
    :param file: Filename.
486
    :type file: str
487
488
    :param verif: If the file is OK or not.
489
    :type verif: bool
490
    """
491 4
    if not verif:
492 4
        print("{0} IS BROKEN!".format(os.path.basename(file)))
493
    else:
494
        print("{0} OK".format(os.path.basename(file)))
495
496
497 4
def verify(filepath, method="7z", szexe=None, selective=False):
498
    """
499
    Verify specific archive files in a given folder.
500
501
    :param filepath: Working directory. Required.
502
    :type filepath: str
503
504
    :param method: Compression type. Default is "7z". Defined in source.
505
    :type method: str
506
507
    :param szexe: Path to 7z executable, if needed.
508
    :type szexe: str
509
510
    :param selective: Only verify autoloaders. Default is false.
511
    :type selective: bool
512
    """
513 4
    method = filter_method(method, szexe)
514 4
    files = compressfilter(filepath, selective)
515 4
    for file in files:
516 4
        decide_verifier(file, szexe)
517
518
519 4
def compress_config_loader(homepath=None):
520
    """
521
    Read a ConfigParser file to get compression preferences.
522
523
    :param homepath: Folder containing ini file. Default is user directory.
524
    :type homepath: str
525
    """
526 4
    compini = iniconfig.generic_loader('compression', homepath)
527 4
    method = compini.get('method', fallback="7z")
528 4
    if not utilities.new_enough(3) and method == "txz":
529
        method = "zip"  # for 3.2 compatibility
530 4
    return method
531
532
533 4
def compress_config_writer(method=None, homepath=None):
534
    """
535
    Write a ConfigParser file to store compression preferences.
536
537
    :param method: Method to use.
538
    :type method: str
539
540
    :param homepath: Folder containing ini file. Default is user directory.
541
    :type homepath: str
542
    """
543 4
    method = compress_config_loader() if method is None else method
544 4
    results = {"method": method}
545
    iniconfig.generic_writer("compression", results, homepath)
546