Completed
Push — master ( 12103c...e6bd97 )
by John
03:17
created

filtercomp()   A

Complexity

Conditions 2

Size

Total Lines 19

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 5
CRAP Score 2

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 2
c 1
b 0
f 0
dl 0
loc 19
ccs 5
cts 5
cp 1
crap 2
rs 9.4285
1
#!/usr/bin/env python3
2 4
"""This module is used to operate with archives."""
3
4 4
import os  # filesystem read
5 4
import subprocess  # invocation of 7z, cap
6 4
import zipfile  # zip compresssion
7 4
import tarfile  # txz/tbz/tgz/tar compression
8 4
from bbarchivist import barutils  # zip tester
9 4
from bbarchivist import utilities  # platform determination
10 4
from bbarchivist import bbconstants  # premade stuff
11 4
from bbarchivist import decorators  # timer
12 4
from bbarchivist import iniconfig  # config parsing
13
14 4
__author__ = "Thurask"
15 4
__license__ = "WTFPL v2"
16 4
__copyright__ = "Copyright 2015-2016 Thurask"
17
18
19 4
def smart_is_tarfile(filepath):
20
    """
21
    :func:`tarfile.is_tarfile` plus error handling.
22
23
    :param filepath: Filename.
24
    :type filepath: str
25
    """
26 4
    try:
27 4
        istar = tarfile.is_tarfile(filepath)
28 4
    except (OSError, IOError):
29 4
        return False
30
    else:
31 4
        return istar
32
33
34 4
@decorators.timer
35 4
def sz_compress(filepath, filename, szexe=None, strength=5, errors=False):
36
    """
37
    Pack a file into a LZMA2 7z file.
38
39
    :param filepath: Basename of file, no extension.
40
    :type filepath: str
41
42
    :param filename: Name of file to pack.
43
    :type filename: str
44
45
    :param szexe: Path to 7z executable.
46
    :type szexe: str
47
48
    :param strength: Compression strength. 5 is normal, 9 is ultra.
49
    :type strength: int
50
51
    :param errors: Print completion status message. Default is false.
52
    :type errors: bool
53
    """
54 4
    szcodes = {
55
        0: "NO ERRORS",
56
        1: "COMPLETED WITH WARNINGS",
57
        2: "FATAL ERROR",
58
        7: "COMMAND LINE ERROR",
59
        8: "OUT OF MEMORY ERROR",
60
        255: "USER STOPPED PROCESS"
61
    }
62 4
    strength = str(strength)
63 4
    rawname = os.path.dirname(filepath)
64 4
    thr = str(utilities.get_core_count())
65 4
    fold = os.path.join(rawname, filename)
66 4
    cmd = '{0} a -mx{1} -m0=lzma2 -mmt{2} "{3}.7z" "{4}"'.format(szexe, strength,
67
                                                                 thr, filepath, fold)
68 4
    excode = sz_subprocess(cmd)
69 4
    if errors:
70 4
        print(szcodes[excode])
71
72
73 4
def sz_subprocess(cmd):
74
    """
75
    Subprocess wrapper for 7-Zip commands.
76
77
    :param cmd: Command to pass to subprocess.
78
    :type cmd: str
79
    """
80 4
    with open(os.devnull, 'wb') as dnull:
81 4
        output = subprocess.call(cmd, stdout=dnull, stderr=subprocess.STDOUT, shell=True)
82 4
    return output
83
84
85 4
def sz_verify(filepath, szexe=None):
86
    """
87
    Verify that a .7z file is valid and working.
88
89
    :param filepath: Filename.
90
    :type filepath: str
91
92
    :param szexe: Path to 7z executable.
93
    :type szexe: str
94
    """
95 4
    filepath = os.path.abspath(filepath)
96 4
    cmd = '{0} t "{1}"'.format(szexe, filepath)
97 4
    excode = sz_subprocess(cmd)
98 4
    return excode == 0
99
100
101 4
def generic_tarfile_verify(filepath, method):
102
    """
103
    Verify that a tar/tgz/tbz/txz file is valid and working.
104
105
    :param filepath: Filename.
106
    :type filepath: str
107
108
    :param method: Tarfile read method.
109
    :type method: str
110
    """
111 4
    if smart_is_tarfile(filepath):
112 4
        with tarfile.open(filepath, method) as thefile:
113 4
            mems = thefile.getmembers()
114 4
        return False if not mems else True
115
    else:
116 4
        return False
117
118
119 4
def generic_tarfile_compress(archivename, filename, method, strength=5):
120
    """
121
    Pack a file into an uncompressed/gzip/bzip2/LZMA tarfile.
122
123
    :param archivename: Archive name.
124
    :type archivename: str
125
126
    :param filename: Name of file to pack into archive.
127
    :type filename: str
128
129
    :param method: Tarfile compress method.
130
    :type method: str
131
132
    :param strength: Compression strength. 5 is normal, 9 is ultra.
133
    :type strength: int
134
    """
135 4
    nocomp = ["w:", "w:xz"]  # methods w/o compression: tar, tar.xz
136 4
    if method in nocomp:
137 4
        generic_nocompresslevel(archivename, filename, method)
138
    else:
139 4
        generic_compresslevel(archivename, filename, method, strength)
140
141
142 4
def generic_compresslevel(archivename, filename, method, strength=5):
143
    """
144
    Pack a file into a gzip/bzip2 tarfile.
145
146
    :param archivename: Archive name.
147
    :type archivename: str
148
149
    :param filename: Name of file to pack into archive.
150
    :type filename: str
151
152
    :param method: Tarfile compress method.
153
    :type method: str
154
155
    :param strength: Compression strength. 5 is normal, 9 is ultra.
156
    :type strength: int
157
    """
158 4
    with tarfile.open(archivename, method, compresslevel=strength) as afile:
159 4
        afile.add(filename, filter=None, arcname=os.path.basename(filename))
160
161
162 4
def generic_nocompresslevel(archivename, filename, method):
163
    """
164
    Pack a file into an uncompressed/LZMA tarfile.
165
166
    :param archivename: Archive name.
167
    :type archivename: str
168
169
    :param filename: Name of file to pack into archive.
170
    :type filename: str
171
172
    :param method: Tarfile compress method.
173
    :type method: str
174
    """
175 4
    with tarfile.open(archivename, method) as afile:
176 4
        afile.add(filename, filter=None, arcname=os.path.basename(filename))
177
178
179 4
@decorators.timer
180
def tar_compress(filepath, filename):
181
    """
182
    Pack a file into an uncompressed tarfile.
183
184
    :param filepath: Basename of file, no extension.
185
    :type filepath: str
186
187
    :param filename: Name of file to pack.
188
    :type filename: str
189
    """
190 4
    generic_tarfile_compress("{0}.tar".format(filepath), filename, "w:")
191
192
193 4
def tar_verify(filepath):
194
    """
195
    Verify that a tar file is valid and working.
196
197
    :param filepath: Filename.
198
    :type filepath: str
199
    """
200 4
    return generic_tarfile_verify(filepath, "r:")
201
202
203 4
@decorators.timer
204 4
def tgz_compress(filepath, filename, strength=5):
205
    """
206
    Pack a file into a gzip tarfile.
207
208
    :param filepath: Basename of file, no extension.
209
    :type filepath: str
210
211
    :param filename: Name of file to pack.
212
    :type filename: str
213
214
    :param strength: Compression strength. 5 is normal, 9 is ultra.
215
    :type strength: int
216
    """
217 4
    generic_tarfile_compress("{0}.tar.gz".format(filepath), filename, "w:gz", strength)
218
219
220 4
def tgz_verify(filepath):
221
    """
222
    Verify that a tar.gz file is valid and working.
223
224
    :param filepath: Filename.
225
    :type filepath: str
226
    """
227 4
    return generic_tarfile_verify(filepath, "r:gz")
228
229
230 4
@decorators.timer
231 4
def tbz_compress(filepath, filename, strength=5):
232
    """
233
    Pack a file into a bzip2 tarfile.
234
235
    :param filepath: Basename of file, no extension.
236
    :type filepath: str
237
238
    :param filename: Name of file to pack.
239
    :type filename: str
240
241
    :param strength: Compression strength. 5 is normal, 9 is ultra.
242
    :type strength: int
243
    """
244 4
    generic_tarfile_compress("{0}.tar.bz2".format(filepath), filename, "w:bz2", strength)
245
246
247 4
def tbz_verify(filepath):
248
    """
249
    Verify that a tar.bz2 file is valid and working.
250
251
    :param filepath: Filename.
252
    :type filepath: str
253
    """
254 4
    return generic_tarfile_verify(filepath, "r:bz2")
255
256
257 4
@decorators.timer
258
def txz_compress(filepath, filename):
259
    """
260
    Pack a file into a LZMA tarfile.
261
262
    :param filepath: Basename of file, no extension.
263
    :type filepath: str
264
265
    :param filename: Name of file to pack.
266
    :type filename: str
267
    """
268 4
    if not utilities.new_enough(3):
269
        pass
270
    else:
271 4
        generic_tarfile_compress("{0}.tar.xz".format(filepath), filename, "w:xz")
272
273
274 4
def txz_verify(filepath):
275
    """
276
    Verify that a tar.xz file is valid and working.
277
278
    :param filepath: Filename.
279
    :type filepath: str
280
    """
281 4
    if not utilities.new_enough(3):
282
        return None
283
    else:
284 4
        return generic_tarfile_verify(filepath, "r:xz")
285
286
287 4
@decorators.timer
288
def zip_compress(filepath, filename):
289
    """
290
    Pack a file into a DEFLATE zipfile.
291
292
    :param filepath: Basename of file, no extension.
293
    :type filepath: str
294
295
    :param filename: Name of file to pack.
296
    :type filename: str
297
    """
298 4
    with zipfile.ZipFile("{0}.zip".format(filepath), 'w', zipfile.ZIP_DEFLATED, allowZip64=True) as zfile:
299 4
        zfile.write(filename, arcname=os.path.basename(filename))
300
301
302 4
def zip_verify(filepath):
303
    """
304
    Verify that a .zip file is valid and working.
305
306
    :param filepath: Filename.
307
    :type filepath: str
308
    """
309 4
    if zipfile.is_zipfile(filepath):
310 4
        brokens = barutils.bar_tester(filepath)
311 4
        return brokens != filepath
312
    else:
313 4
        return False
314
315
316 4
def filter_method(method, szexe=None):
317
    """
318
    Make sure methods are OK.
319
320
    :param method: Compression method to use.
321
    :type method: str
322
323
    :param szexe: Path to 7z executable, if needed.
324
    :type szexe: str
325
    """
326 4
    if not utilities.new_enough(3) and method == "txz":
327
        method = "zip"  # fallback
328 4
    method = filter_method_nosz(method, szexe)
329 4
    return method
330
331
332 4
def filter_method_nosz(method, szexe=None):
333
    """
334
    Make sure 7-Zip is OK.
335
336
    :param method: Compression method to use.
337
    :type method: str
338
339
    :param szexe: Path to 7z executable, if needed.
340
    :type szexe: str
341
    """
342 4
    if method == "7z" and szexe is None:
343 4
        ifexists = utilities.prep_seven_zip()  # see if 7z exists
344 4
        if not ifexists:
345 4
            method = "zip"  # fallback
346
        else:
347 4
            szexe = utilities.get_seven_zip(False)
348 4
    return method
349
350
351 4
def calculate_strength():
352
    """
353
    Determine zip/gzip/bzip2 strength by OS bit setting.
354
    """
355 4
    strength = 9 if utilities.is_amd64() else 5
356 4
    return strength
357
358
359 4
def filter_with_boolfilt(files, criterion, critargs):
360
    """
361
    Return everything that matches criterion.
362
363
    :param files: Files to work on.
364
    :type files: list(str)
365
366
    :param criterion: Function to use for evaluation.
367
    :type criterion: func
368
369
    :param critargs: Arguments for function, other than file.
370
    :type critargs: list
371
    """
372 4
    return [file for file in files if criterion(file, *critargs)]
373
374
375 4
def filter_without_boolfilt(files, criterion, critargs):
376
    """
377
    Return everything that doesn't match criterion.
378
379
    :param files: Files to work on.
380
    :type files: list(str)
381
382
    :param criterion: Function to use for evaluation.
383
    :type criterion: func
384
385
    :param critargs: Arguments for function, other than file.
386
    :type critargs: list
387
    """
388 4
    return [file for file in files if not criterion(file, *critargs)]
389
390
391 4
def filtercomp(files, criterion, critargs, boolfilt=True):
392
    """
393
    :param files: Files to work on.
394
    :type files: list(str)
395
396
    :param criterion: Function to use for evaluation.
397
    :type criterion: func
398
399
    :param critargs: Arguments for function, other than file.
400
    :type critargs: list
401
402
    :param boolfilt: True if comparing criterion, False if comparing not criterion.
403
    :type boolfilt: bool
404
    """
405 4
    if boolfilt:
406 4
        fx2 = filter_with_boolfilt(files, criterion, critargs)
407
    else:
408 4
        fx2 = filter_without_boolfilt(files, criterion, critargs)
409 4
    return fx2
410
411
412 4
def compressfilter_select(filepath, files, selective=False):
413
    """
414
    :param filepath: Working directory. Required.
415
    :type filepath: str
416
417
    :param files: List of files in filepath.
418
    :type files: list(str)
419
420
    :param selective: Only compress autoloaders. Default is false.
421
    :type selective: bool/str
422
    """
423 4
    arx = bbconstants.ARCS
424 4
    pfx = bbconstants.PREFIXES
425 4
    if selective is None:
426 4
        filt2 = os.listdir(filepath)
427 4
    elif selective == "arcsonly":
428 4
        filt2 = filtercomp(files, utilities.prepends, ("", arx))
429 4
    elif selective:
430 4
        filt0 = filtercomp(files, utilities.prepends, (pfx, ""))
431 4
        filt1 = filtercomp(filt0, utilities.prepends, ("", arx), False)  # pop archives
432 4
        filt2 = filtercomp(filt1, utilities.prepends, ("", ".exe"))  # include exes
433
    else:
434 4
        filt2 = filtercomp(files, utilities.prepends, ("", arx), False)  # pop archives
435 4
    return filt2
436
437
438 4
def compressfilter(filepath, selective=False):
439
    """
440
    Filter directory listing of working directory.
441
442
    :param filepath: Working directory. Required.
443
    :type filepath: str
444
445
    :param selective: Only compress autoloaders. Default is false.
446
    :type selective: bool/str
447
    """
448
    
0 ignored issues
show
Coding Style introduced by
Trailing whitespace
Loading history...
449 4
    files = [file for file in os.listdir(filepath) if not os.path.isdir(file)]
450 4
    filt2 = compressfilter_select(filepath, files, selective)
451 4
    filt3 = [os.path.join(filepath, file) for file in filt2]
452 4
    return filt3
453
454
455 4
def prep_compress_function(method="7z", szexe=None, errors=False):
456
    """
457
    Prepare compression function and partial arguments.
458
459
    :param method: Compression type. Default is "7z".
460
    :type method: str
461
462
    :param szexe: Path to 7z executable, if needed.
463
    :type szexe: str
464
465
    :param errors: Print completion status message. Default is false.
466
    :type errors: bool
467
    """
468 4
    methods = {"7z": sz_compress, "tgz": tgz_compress, "txz": txz_compress, "tbz": tbz_compress,
469
               "tar": tar_compress, "zip": zip_compress}
470 4
    args = [szexe] if method == "7z" else []
471 4
    if method in ("7z", "tbz", "tgz"):
472 4
        args.append(calculate_strength())
473 4
    if method == "7z":
474 4
        args.append(errors)
475 4
    return methods[method], args
476
477
478 4
def compress(filepath, method="7z", szexe=None, selective=False, errors=False):
479
    """
480
    Compress all autoloader files in a given folder, with a given method.
481
482
    :param filepath: Working directory. Required.
483
    :type filepath: str
484
485
    :param method: Compression type. Default is "7z".
486
    :type method: str
487
488
    :param szexe: Path to 7z executable, if needed.
489
    :type szexe: str
490
491
    :param selective: Only compress autoloaders. Default is false.
492
    :type selective: bool
493
494
    :param errors: Print completion status message. Default is false.
495
    :type errors: bool
496
    """
497 4
    method = filter_method(method, szexe)
498 4
    files = compressfilter(filepath, selective)
499 4
    for file in files:
500 4
        fname = os.path.basename(file)
501 4
        filename = os.path.splitext(fname)[0]
502 4
        fileloc = os.path.join(filepath, filename)
503 4
        print("COMPRESSING: {0}".format(fname))
504 4
        compfunc, extargs = prep_compress_function(method, szexe, errors)
505 4
        compfunc(fileloc, file, *extargs)
506 4
    return True
507
508
509 4
def tarzip_verifier(file):
510
    """
511
    Assign .tar.xxx, .tar and .zip verifiers.
512
513
    :param file: Filename.
514
    :type file: str
515
    """
516 4
    maps = {".tar.gz": tgz_verify, ".tar.xz": txz_verify,
517
            ".tar.bz2": tbz_verify, ".tar": tar_verify,
518
            ".zip": zip_verify, ".bar": zip_verify}
519 4
    for key, value in maps.items():
520 4
        if file.endswith(key):
521
            return value(file)
522
523
524 4
def decide_verifier(file, szexe=None):
525
    """
526
    Decide which verifier function to use.
527
528
    :param file: Filename.
529
    :type file: str
530
531
    :param szexe: Path to 7z executable, if needed.
532
    :type szexe: str
533
    """
534 4
    print("VERIFYING: {0}".format(file))
535 4
    if file.endswith(".7z") and szexe is not None:
536
        verif = sz_verify(os.path.abspath(file), szexe)
537
    else:
538 4
        verif = tarzip_verifier(file)
539 4
    decide_verifier_printer(file, verif)
540 4
    return verif
541
542
543 4
def decide_verifier_printer(file, verif):
544
    """
545
    Print status of verifier function.
546
547
    :param file: Filename.
548
    :type file: str
549
550
    :param verif: If the file is OK or not.
551
    :type verif: bool
552
    """
553 4
    if not verif:
554 4
        print("{0} IS BROKEN!".format(os.path.basename(file)))
555
    else:
556
        print("{0} OK".format(os.path.basename(file)))
557
558
559 4
def verify(filepath, method="7z", szexe=None, selective=False):
560
    """
561
    Verify specific archive files in a given folder.
562
563
    :param filepath: Working directory. Required.
564
    :type filepath: str
565
566
    :param method: Compression type. Default is "7z". Defined in source.
567
    :type method: str
568
569
    :param szexe: Path to 7z executable, if needed.
570
    :type szexe: str
571
572
    :param selective: Only verify autoloaders. Default is false.
573
    :type selective: bool
574
    """
575 4
    method = filter_method(method, szexe)
576 4
    files = compressfilter(filepath, selective)
577 4
    for file in files:
578 4
        decide_verifier(file, szexe)
579
580
581 4
def compress_config_loader(homepath=None):
582
    """
583
    Read a ConfigParser file to get compression preferences.
584
585
    :param homepath: Folder containing ini file. Default is user directory.
586
    :type homepath: str
587
    """
588 4
    compini = iniconfig.generic_loader('compression', homepath)
589 4
    method = compini.get('method', fallback="7z")
590 4
    if not utilities.new_enough(3) and method == "txz":
591
        method = "zip"  # for 3.2 compatibility
592 4
    return method
593
594
595 4
def compress_config_writer(method=None, homepath=None):
596
    """
597
    Write a ConfigParser file to store compression preferences.
598
599
    :param method: Method to use.
600
    :type method: str
601
602
    :param homepath: Folder containing ini file. Default is user directory.
603
    :type homepath: str
604
    """
605 4
    method = compress_config_loader() if method is None else method
606 4
    results = {"method": method}
607
    iniconfig.generic_writer("compression", results, homepath)
608