Completed
Branch master (654bda)
by John
01:42 queued 28s
created

compress_config_loader()   A

Complexity

Conditions 3

Size

Total Lines 12

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 6
CRAP Score 3

Importance

Changes 2
Bugs 0 Features 0
Metric Value
cc 3
c 2
b 0
f 0
dl 0
loc 12
ccs 6
cts 6
cp 1
crap 3
rs 9.4285
1
#!/usr/bin/env python3
2 4
"""This module is used to operate with archives."""
3
4 4
import os  # filesystem read
5 4
import subprocess  # invocation of 7z, cap
6 4
import zipfile  # zip compresssion
7 4
import tarfile  # txz/tbz/tgz/tar compression
8 4
from bbarchivist import utilities  # platform determination
9 4
from bbarchivist import bbconstants  # premade stuff
10 4
from bbarchivist import decorators  # timer
11 4
from bbarchivist import iniconfig  # config parsing
12
13 4
__author__ = "Thurask"
14 4
__license__ = "WTFPL v2"
15 4
__copyright__ = "Copyright 2015-2016 Thurask"
16
17
18 4
def smart_is_tarfile(filepath):
19
    """
20
    :func:`tarfile.is_tarfile` plus error handling.
21
22
    :param filepath: Filename.
23
    :type filepath: str
24
    """
25 4
    try:
26 4
        istar = tarfile.is_tarfile(filepath)
27 4
    except (OSError, IOError):
28 4
        return False
29
    else:
30 4
        return istar
31
32
33 4
@decorators.timer
34 4
def sz_compress(filepath, filename, szexe=None, strength=5, errors=False):
35
    """
36
    Pack a file into a LZMA2 7z file.
37
38
    :param filepath: Basename of file, no extension.
39
    :type filepath: str
40
41
    :param filename: Name of file to pack.
42
    :type filename: str
43
44
    :param szexe: Path to 7z executable.
45
    :type szexe: str
46
47
    :param strength: Compression strength. 5 is normal, 9 is ultra.
48
    :type strength: int
49
50
    :param errors: Print completion status message. Default is false.
51
    :type errors: bool
52
    """
53 4
    szcodes = {
54
        0: "NO ERRORS",
55
        1: "COMPLETED WITH WARNINGS",
56
        2: "FATAL ERROR",
57
        7: "COMMAND LINE ERROR",
58
        8: "OUT OF MEMORY ERROR",
59
        255: "USER STOPPED PROCESS"
60
    }
61 4
    strength = str(strength)
62 4
    rawname = os.path.dirname(filepath)
63 4
    thr = str(utilities.get_core_count())
64 4
    fold = os.path.join(rawname, filename)
65 4
    cmd = '{0} a -mx{1} -m0=lzma2 -mmt{2} "{3}.7z" "{4}"'.format(szexe, strength,
66
                                                                 thr, filepath, fold)
67 4
    excode = sz_subprocess(cmd)
68 4
    if errors:
69 4
        print(szcodes[excode])
70
71
72 4
def sz_subprocess(cmd):
73
    """
74
    Subprocess wrapper for 7-Zip commands.
75
76
    :param cmd: Command to pass to subprocess.
77
    :type cmd: str
78
    """
79 4
    with open(os.devnull, 'wb') as dnull:
80 4
        output = subprocess.call(cmd, stdout=dnull, stderr=subprocess.STDOUT, shell=True)
81 4
    return output
82
83
84 4
def sz_verify(filepath, szexe=None):
85
    """
86
    Verify that a .7z file is valid and working.
87
88
    :param filepath: Filename.
89
    :type filepath: str
90
91
    :param szexe: Path to 7z executable.
92
    :type szexe: str
93
    """
94 4
    filepath = os.path.abspath(filepath)
95 4
    cmd = '{0} t "{1}"'.format(szexe, filepath)
96 4
    excode = sz_subprocess(cmd)
97 4
    return excode == 0
98
99
100 4
def generic_tarfile_verify(filepath, method):
101
    """
102
    Verify that a tar/tgz/tbz/txz file is valid and working.
103
104
    :param filepath: Filename.
105
    :type filepath: str
106
107
    :param method: Tarfile read method.
108
    :type method: str
109
    """
110 4
    if smart_is_tarfile(filepath):
111 4
        with tarfile.open(filepath, method) as thefile:
112 4
            mems = thefile.getmembers()
113 4
        return False if not mems else True
114
    else:
115 4
        return False
116
117
118 4
def generic_tarfile_compress(archivename, filename, method, strength=5):
119
    """
120
    Pack a file into an uncompressed/gzip/bzip2/LZMA tarfile.
121
122
    :param archivename: Archive name.
123
    :type archivename: str
124
125
    :param filename: Name of file to pack into archive.
126
    :type filename: str
127
128
    :param method: Tarfile compress method.
129
    :type method: str
130
131
    :param strength: Compression strength. 5 is normal, 9 is ultra.
132
    :type strength: int
133
    """
134 4
    nocomp = ["w:", "w:xz"]  # methods w/o compression: tar, tar.xz
135 4
    if method in nocomp:
136 4
        generic_nocompresslevel(archivename, filename, method)
137
    else:
138 4
        generic_compresslevel(archivename, filename, method, strength)
139
140
141 4
def generic_compresslevel(archivename, filename, method, strength=5):
142
    """
143
    Pack a file into a gzip/bzip2 tarfile.
144
145
    :param archivename: Archive name.
146
    :type archivename: str
147
148
    :param filename: Name of file to pack into archive.
149
    :type filename: str
150
151
    :param method: Tarfile compress method.
152
    :type method: str
153
154
    :param strength: Compression strength. 5 is normal, 9 is ultra.
155
    :type strength: int
156
    """
157 4
    with tarfile.open(archivename, method, compresslevel=strength) as afile:
158 4
        afile.add(filename, filter=None, arcname=os.path.basename(filename))
159
160
161 4
def generic_nocompresslevel(archivename, filename, method):
162
    """
163
    Pack a file into an uncompressed/LZMA tarfile.
164
165
    :param archivename: Archive name.
166
    :type archivename: str
167
168
    :param filename: Name of file to pack into archive.
169
    :type filename: str
170
171
    :param method: Tarfile compress method.
172
    :type method: str
173
    """
174 4
    with tarfile.open(archivename, method) as afile:
175 4
        afile.add(filename, filter=None, arcname=os.path.basename(filename))
176
177
178 4
@decorators.timer
179
def tar_compress(filepath, filename):
180
    """
181
    Pack a file into an uncompressed tarfile.
182
183
    :param filepath: Basename of file, no extension.
184
    :type filepath: str
185
186
    :param filename: Name of file to pack.
187
    :type filename: str
188
    """
189 4
    generic_tarfile_compress("{0}.tar".format(filepath), filename, "w:")
190
191
192 4
def tar_verify(filepath):
193
    """
194
    Verify that a tar file is valid and working.
195
196
    :param filepath: Filename.
197
    :type filepath: str
198
    """
199 4
    return generic_tarfile_verify(filepath, "r:")
200
201
202 4
@decorators.timer
203 4
def tgz_compress(filepath, filename, strength=5):
204
    """
205
    Pack a file into a gzip tarfile.
206
207
    :param filepath: Basename of file, no extension.
208
    :type filepath: str
209
210
    :param filename: Name of file to pack.
211
    :type filename: str
212
213
    :param strength: Compression strength. 5 is normal, 9 is ultra.
214
    :type strength: int
215
    """
216 4
    generic_tarfile_compress("{0}.tar.gz".format(filepath), filename, "w:gz", strength)
217
218
219 4
def tgz_verify(filepath):
220
    """
221
    Verify that a tar.gz file is valid and working.
222
223
    :param filepath: Filename.
224
    :type filepath: str
225
    """
226 4
    return generic_tarfile_verify(filepath, "r:gz")
227
228
229 4
@decorators.timer
230 4
def tbz_compress(filepath, filename, strength=5):
231
    """
232
    Pack a file into a bzip2 tarfile.
233
234
    :param filepath: Basename of file, no extension.
235
    :type filepath: str
236
237
    :param filename: Name of file to pack.
238
    :type filename: str
239
240
    :param strength: Compression strength. 5 is normal, 9 is ultra.
241
    :type strength: int
242
    """
243 4
    generic_tarfile_compress("{0}.tar.bz2".format(filepath), filename, "w:bz2", strength)
244
245
246 4
def tbz_verify(filepath):
247
    """
248
    Verify that a tar.bz2 file is valid and working.
249
250
    :param filepath: Filename.
251
    :type filepath: str
252
    """
253 4
    return generic_tarfile_verify(filepath, "r:bz2")
254
255
256 4
@decorators.timer
257
def txz_compress(filepath, filename):
258
    """
259
    Pack a file into a LZMA tarfile.
260
261
    :param filepath: Basename of file, no extension.
262
    :type filepath: str
263
264
    :param filename: Name of file to pack.
265
    :type filename: str
266
    """
267 3
    if not utilities.new_enough(3):
268
        pass
269
    else:
270 3
        generic_tarfile_compress("{0}.tar.xz".format(filepath), filename, "w:xz")
271
272
273 4
def txz_verify(filepath):
274
    """
275
    Verify that a tar.xz file is valid and working.
276
277
    :param filepath: Filename.
278
    :type filepath: str
279
    """
280 3
    if not utilities.new_enough(3):
281
        return None
282
    else:
283 3
        return generic_tarfile_verify(filepath, "r:xz")
284
285
286 4
@decorators.timer
287
def zip_compress(filepath, filename):
288
    """
289
    Pack a file into a DEFLATE zipfile.
290
291
    :param filepath: Basename of file, no extension.
292
    :type filepath: str
293
294
    :param filename: Name of file to pack.
295
    :type filename: str
296
    """
297 4
    with zipfile.ZipFile("{0}.zip".format(filepath), 'w', zipfile.ZIP_DEFLATED, allowZip64=True) as zfile:
298 4
        zfile.write(filename, arcname=os.path.basename(filename))
299
300
301 4
def zip_verify(filepath):
302
    """
303
    Verify that a .zip file is valid and working.
304
305
    :param filepath: Filename.
306
    :type filepath: str
307
    """
308 4
    if zipfile.is_zipfile(filepath):
309 4
        try:
310 4
            with zipfile.ZipFile(filepath, "r") as zfile:
311 4
                brokens = zfile.testzip()
312
        except zipfile.BadZipFile:
313
            brokens = filepath
314 4
        return brokens != filepath
315
    else:
316 4
        return False
317
318
319 4
def filter_method(method, szexe=None):
320
    """
321
    Make sure methods are OK.
322
323
    :param method: Compression method to use.
324
    :type method: str
325
326
    :param szexe: Path to 7z executable, if needed.
327
    :type szexe: str
328
    """
329 4
    if not utilities.new_enough(3) and method == "txz":
330 1
        method = "zip"  # fallback
331 4
    if method == "7z" and szexe is None:
332 4
        ifexists = utilities.prep_seven_zip()  # see if 7z exists
333 4
        if not ifexists:
334 4
            method = "zip"  # fallback
335
        else:
336 4
            szexe = utilities.get_seven_zip(False)
337 4
    return method
338
339
340 4
def calculate_strength():
341
    """
342
    Determine zip/gzip/bzip2 strength by OS bit setting.
343
    """
344 4
    strength = 9 if utilities.is_amd64() else 5
345 4
    return strength
346
347
348 4
def filtercomp(files, criterion, critargs, boolfilt=True):
349
    """
350
    :param files: Files to work on.
351
    :type files: list(str)
352
353
    :param criterion: Function to use for evaluation.
354
    :type criterion: func
355
356
    :param critargs: Arguments for function, other than file.
357
    :type critargs: list
358
359
    :param boolfilt: True if comparing criterion, False if comparing not criterion.
360
    :type boolfilt: bool
361
    """
362 4
    if boolfilt:
363 4
        fx2 = [file for file in files if criterion(file, *critargs)]
364
    else:
365 4
        fx2 = [file for file in files if not criterion(file, *critargs)]
366 4
    return fx2
367
368
369 4
def compressfilter(filepath, selective=False):
370
    """
371
    Filter directory listing of working directory.
372
373
    :param filepath: Working directory. Required.
374
    :type filepath: str
375
376
    :param selective: Only compress autoloaders. Default is false.
377
    :type selective: bool/str
378
    """
379 4
    arx = bbconstants.ARCS
380 4
    pfx = bbconstants.PREFIXES
381 4
    files = [file for file in os.listdir(filepath) if not os.path.isdir(file)]
382 4
    if selective is None:
383 4
        filt2 = os.listdir(filepath)
384 4
    elif selective == "arcsonly":
385 4
        filt2 = filtercomp(files, utilities.prepends, ("", arx))
386 4
    elif selective:
387 4
        filt0 = filtercomp(files, utilities.prepends, (pfx, ""))
388 4
        filt1 = filtercomp(filt0, utilities.prepends, ("", arx), False)  # pop archives
389 4
        filt2 = filtercomp(filt1, utilities.prepends, ("", ".exe"))  # include exes
390
    else:
391 4
        filt2 = filtercomp(files, utilities.prepends, ("", arx), False)  # pop archives
392 4
    filt3 = [os.path.join(filepath, file) for file in filt2]
393 4
    return filt3
394
395
396 4
def prep_compress_function(method="7z", szexe=None, errors=False):
397
    """
398
    Prepare compression function and partial arguments.
399
400
    :param method: Compression type. Default is "7z".
401
    :type method: str
402
403
    :param szexe: Path to 7z executable, if needed.
404
    :type szexe: str
405
406
    :param errors: Print completion status message. Default is false.
407
    :type errors: bool
408
    """
409 4
    methods = {"7z": sz_compress, "tgz": tgz_compress, "txz": txz_compress, "tbz": tbz_compress,
410
               "tar": tar_compress, "zip": zip_compress}
411 4
    args = [szexe] if method == "7z" else []
412 4
    if method in ("7z", "tbz", "tgz"):
413 4
        args.append(calculate_strength())
414 4
    if method == "7z":
415 4
        args.append(errors)
416 4
    return methods[method], args
417
418
419 4
def compress(filepath, method="7z", szexe=None, selective=False, errors=False):
420
    """
421
    Compress all autoloader files in a given folder, with a given method.
422
423
    :param filepath: Working directory. Required.
424
    :type filepath: str
425
426
    :param method: Compression type. Default is "7z".
427
    :type method: str
428
429
    :param szexe: Path to 7z executable, if needed.
430
    :type szexe: str
431
432
    :param selective: Only compress autoloaders. Default is false.
433
    :type selective: bool
434
435
    :param errors: Print completion status message. Default is false.
436
    :type errors: bool
437
    """
438 4
    method = filter_method(method, szexe)
439 4
    files = compressfilter(filepath, selective)
440 4
    for file in files:
441 4
        fname = os.path.basename(file)
442 4
        filename = os.path.splitext(fname)[0]
443 4
        fileloc = os.path.join(filepath, filename)
444 4
        print("COMPRESSING: {0}".format(fname))
445 4
        compfunc, extargs = prep_compress_function(method, szexe, errors)
446 4
        compfunc(fileloc, file, *extargs)
447 4
    return True
448
449
450 4
def tarzip_verifier(file):
451
    """
452
    Assign .tar.xxx, .tar and .zip verifiers.
453
454
    :param file: Filename.
455
    :type file: str
456
    """
457 4
    maps = {".tar.gz": tgz_verify, ".tar.xz": txz_verify,
458
            ".tar.bz2": tbz_verify, ".tar": tar_verify,
459
            ".zip": zip_verify, ".bar": zip_verify}
460 4
    for key, value in maps.items():
461 4
        if file.endswith(key):
462
            return value(file)
463
464
465 4
def decide_verifier(file, szexe=None):
466
    """
467
    Decide which verifier function to use.
468
469
    :param file: Filename.
470
    :type file: str
471
472
    :param szexe: Path to 7z executable, if needed.
473
    :type szexe: str
474
    """
475 4
    print("VERIFYING: {0}".format(file))
476 4
    if file.endswith(".7z") and szexe is not None:
477
        verif = sz_verify(os.path.abspath(file), szexe)
478
    else:
479 4
        verif = tarzip_verifier(file)
480 4
    decide_verifier_printer(file, verif)
481 4
    return verif
482
483
484 4
def decide_verifier_printer(file, verif):
485
    """
486
    Print status of verifier function.
487
488
    :param file: Filename.
489
    :type file: str
490
491
    :param verif: If the file is OK or not.
492
    :type verif: bool
493
    """
494 4
    if not verif:
495 4
        print("{0} IS BROKEN!".format(os.path.basename(file)))
496
    else:
497
        print("{0} OK".format(os.path.basename(file)))
498
499
500 4
def verify(filepath, method="7z", szexe=None, selective=False):
501
    """
502
    Verify specific archive files in a given folder.
503
504
    :param filepath: Working directory. Required.
505
    :type filepath: str
506
507
    :param method: Compression type. Default is "7z". Defined in source.
508
    :type method: str
509
510
    :param szexe: Path to 7z executable, if needed.
511
    :type szexe: str
512
513
    :param selective: Only verify autoloaders. Default is false.
514
    :type selective: bool
515
    """
516 4
    method = filter_method(method, szexe)
517 4
    files = compressfilter(filepath, selective)
518 4
    for file in files:
519 4
        decide_verifier(file, szexe)
520
521
522 4
def compress_config_loader(homepath=None):
523
    """
524
    Read a ConfigParser file to get compression preferences.
525
526
    :param homepath: Folder containing ini file. Default is user directory.
527
    :type homepath: str
528
    """
529 4
    compini = iniconfig.generic_loader('compression', homepath)
530 4
    method = compini.get('method', fallback="7z")
531 4
    if not utilities.new_enough(3) and method == "txz":
532 1
        method = "zip"  # for 3.2 compatibility
533 4
    return method
534
535
536 4
def compress_config_writer(method=None, homepath=None):
537
    """
538
    Write a ConfigParser file to store compression preferences.
539
540
    :param method: Method to use.
541
    :type method: str
542
543
    :param homepath: Folder containing ini file. Default is user directory.
544
    :type homepath: str
545
    """
546 4
    method = compress_config_loader() if method is None else method
547 4
    results = {"method": method}
548
    iniconfig.generic_writer("compression", results, homepath)
549