Completed
Push — master ( d1933a...024ef5 )
by John
03:03
created

compress_config_loader()   A

Complexity

Conditions 3

Size

Total Lines 12

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 6
CRAP Score 3

Importance

Changes 2
Bugs 0 Features 0
Metric Value
cc 3
c 2
b 0
f 0
dl 0
loc 12
ccs 6
cts 6
cp 1
crap 3
rs 9.4285
1
#!/usr/bin/env python3
2 4
"""This module is used to operate with archives."""
3
4 4
import os  # filesystem read
5 4
import subprocess  # invocation of 7z, cap
6 4
import zipfile  # zip compresssion
7 4
import tarfile  # txz/tbz/tgz/tar compression
8 4
from bbarchivist import utilities  # platform determination
9 4
from bbarchivist import bbconstants  # premade stuff
10 4
from bbarchivist import decorators  # timer
11 4
from bbarchivist import iniconfig  # config parsing
12
13 4
__author__ = "Thurask"
14 4
__license__ = "WTFPL v2"
15 4
__copyright__ = "Copyright 2015-2016 Thurask"
16
17
18 4
def smart_is_tarfile(filepath):
19
    """
20
    :func:`tarfile.is_tarfile` plus error handling.
21
22
    :param filepath: Filename.
23
    :type filepath: str
24
    """
25 4
    try:
26 4
        istar = tarfile.is_tarfile(filepath)
27 4
    except (OSError, IOError):
28 4
        return False
29
    else:
30 4
        return istar
31
32
33 4
@decorators.timer
34 4
def sz_compress(filepath, filename, szexe=None, strength=5, errors=False):
35
    """
36
    Pack a file into a LZMA2 7z file.
37
38
    :param filepath: Basename of file, no extension.
39
    :type filepath: str
40
41
    :param filename: Name of file to pack.
42
    :type filename: str
43
44
    :param szexe: Path to 7z executable.
45
    :type szexe: str
46
47
    :param strength: Compression strength. 5 is normal, 9 is ultra.
48
    :type strength: int
49
50
    :param errors: Print completion status message. Default is false.
51
    :type errors: bool
52
    """
53 4
    szcodes = {
54
        0: "NO ERRORS",
55
        1: "COMPLETED WITH WARNINGS",
56
        2: "FATAL ERROR",
57
        7: "COMMAND LINE ERROR",
58
        8: "OUT OF MEMORY ERROR",
59
        255: "USER STOPPED PROCESS"
60
    }
61 4
    strength = str(strength)
62 4
    rawname = os.path.dirname(filepath)
63 4
    thr = str(utilities.get_core_count())
64 4
    fold = os.path.join(rawname, filename)
65 4
    cmd = '{0} a -mx{1} -m0=lzma2 -mmt{2} "{3}.7z" "{4}"'.format(szexe, strength,
66
                                                                 thr, filepath, fold)
67 4
    with open(os.devnull, 'wb') as dnull:
68 4
        excode = subprocess.call(cmd, stdout=dnull, stderr=subprocess.STDOUT, shell=True)
69 4
    if errors:
70 4
        print(szcodes[excode])
71
72
73 4
def sz_verify(filepath, szexe=None):
74
    """
75
    Verify that a .7z file is valid and working.
76
77
    :param filepath: Filename.
78
    :type filepath: str
79
80
    :param szexe: Path to 7z executable.
81
    :type szexe: str
82
    """
83 4
    filepath = os.path.abspath(filepath)
84 4
    cmd = '{0} t "{1}"'.format(szexe, filepath)
85 4
    with open(os.devnull, 'wb') as dnull:
86 4
        excode = subprocess.call(cmd, stdout=dnull, stderr=subprocess.STDOUT, shell=True)
87 4
    return excode == 0
88
89
90 4
@decorators.timer
91
def tar_compress(filepath, filename):
92
    """
93
    Pack a file into an uncompressed tarfile.
94
95
    :param filepath: Basename of file, no extension.
96
    :type filepath: str
97
98
    :param filename: Name of file to pack.
99
    :type filename: str
100
    """
101 4
    with tarfile.open("{0}.tar".format(filepath), 'w:') as tfile:
102 4
        tfile.add(filename, filter=None, arcname=os.path.basename(filename))
103
104
105 4
def tar_verify(filepath):
106
    """
107
    Verify that a tar file is valid and working.
108
109
    :param filepath: Filename.
110
    :type filepath: str
111
    """
112 4
    if smart_is_tarfile(filepath):
113 4
        with tarfile.open(filepath, "r:") as thefile:
114 4
            mems = thefile.getmembers()
115 4
        return False if not mems else True
116
    else:
117 4
        return False
118
119
120 4
@decorators.timer
121 4
def tgz_compress(filepath, filename, strength=5):
122
    """
123
    Pack a file into a gzip tarfile.
124
125
    :param filepath: Basename of file, no extension.
126
    :type filepath: str
127
128
    :param filename: Name of file to pack.
129
    :type filename: str
130
131
    :param strength: Compression strength. 5 is normal, 9 is ultra.
132
    :type strength: int
133
    """
134 4
    with tarfile.open("{0}.tar.gz".format(filepath), 'w:gz', compresslevel=strength) as gzfile:
135 4
        gzfile.add(filename, filter=None, arcname=os.path.basename(filename))
136
137
138 4
def tgz_verify(filepath):
139
    """
140
    Verify that a tar.gz file is valid and working.
141
142
    :param filepath: Filename.
143
    :type filepath: str
144
    """
145 4
    if smart_is_tarfile(filepath):
146 4
        with tarfile.open(filepath, "r:gz") as thefile:
147 4
            mems = thefile.getmembers()
148 4
        return False if not mems else True
149
    else:
150 4
        return False
151
152
153 4
@decorators.timer
154 4
def tbz_compress(filepath, filename, strength=5):
155
    """
156
    Pack a file into a bzip2 tarfile.
157
158
    :param filepath: Basename of file, no extension.
159
    :type filepath: str
160
161
    :param filename: Name of file to pack.
162
    :type filename: str
163
164
    :param strength: Compression strength. 5 is normal, 9 is ultra.
165
    :type strength: int
166
    """
167 4
    with tarfile.open("{0}.tar.bz2".format(filepath), 'w:bz2', compresslevel=strength) as bzfile:
168 4
        bzfile.add(filename, filter=None, arcname=os.path.basename(filename))
169
170
171 4
def tbz_verify(filepath):
172
    """
173
    Verify that a tar.bz2 file is valid and working.
174
175
    :param filepath: Filename.
176
    :type filepath: str
177
    """
178 4
    if smart_is_tarfile(filepath):
179 4
        with tarfile.open(filepath, "r:bz2") as thefile:
180 4
            mems = thefile.getmembers()
181 4
        return False if not mems else True
182
    else:
183 4
        return False
184
185
186 4
@decorators.timer
187
def txz_compress(filepath, filename):
188
    """
189
    Pack a file into a LZMA tarfile.
190
191
    :param filepath: Basename of file, no extension.
192
    :type filepath: str
193
194
    :param filename: Name of file to pack.
195
    :type filename: str
196
    """
197 3
    if not utilities.new_enough(3):
198
        pass
199
    else:
200 3
        with tarfile.open("{0}.tar.xz".format(filepath), 'w:xz') as xzfile:
201 3
            xzfile.add(filename, filter=None, arcname=os.path.basename(filename))
202
203
204 4
def txz_verify(filepath):
205
    """
206
    Verify that a tar.xz file is valid and working.
207
208
    :param filepath: Filename.
209
    :type filepath: str
210
    """
211 3
    if not utilities.new_enough(3):
212
        return None
213
    else:
214 3
        if smart_is_tarfile(filepath):
215 3
            with tarfile.open(filepath, "r:xz") as thefile:
216 3
                mems = thefile.getmembers()
217 3
            return False if not mems else True
218
        else:
219 3
            return False
220
221
222 4
@decorators.timer
223
def zip_compress(filepath, filename):
224
    """
225
    Pack a file into a DEFLATE zipfile.
226
227
    :param filepath: Basename of file, no extension.
228
    :type filepath: str
229
230
    :param filename: Name of file to pack.
231
    :type filename: str
232
    """
233 4
    with zipfile.ZipFile("{0}.zip".format(filepath), 'w', zipfile.ZIP_DEFLATED, allowZip64=True) as zfile:
234 4
        zfile.write(filename, arcname=os.path.basename(filename))
235
236
237 4
def zip_verify(filepath):
238
    """
239
    Verify that a .zip file is valid and working.
240
241
    :param filepath: Filename.
242
    :type filepath: str
243
    """
244 4
    if zipfile.is_zipfile(filepath):
245 4
        try:
246 4
            with zipfile.ZipFile(filepath, "r") as zfile:
247 4
                brokens = zfile.testzip()
248
        except zipfile.BadZipFile:
249
            brokens = filepath
250 4
        return brokens != filepath
251
    else:
252 4
        return False
253
254
255 4
def filter_method(method, szexe=None):
256
    """
257
    Make sure methods are OK.
258
259
    :param method: Compression method to use.
260
    :type method: str
261
262
    :param szexe: Path to 7z executable, if needed.
263
    :type szexe: str
264
    """
265 4
    if not utilities.new_enough(3) and method == "txz":
266 1
        method = "zip"  # fallback
267 4
    if method == "7z" and szexe is None:
268 4
        ifexists = utilities.prep_seven_zip()  # see if 7z exists
269 4
        if not ifexists:
270 4
            method = "zip"  # fallback
271
        else:
272 4
            szexe = utilities.get_seven_zip(False)
273 4
    return method
274
275
276 4
def calculate_strength():
277
    """
278
    Determine zip/gzip/bzip2 strength by OS bit setting.
279
    """
280 4
    strength = 9 if utilities.is_amd64() else 5
281 4
    return strength
282
283
284 4
def filtercomp(files, criterion, critargs, boolfilt=True):
285
    """
286
    :param files: Files to work on.
287
    :type files: list(str)
288
289
    :param criterion: Function to use for evaluation.
290
    :type criterion: func
291
292
    :param critargs: Arguments for function, other than file.
293
    :type critargs: list
294
295
    :param boolfilt: True if comparing criterion, False if comparing not criterion.
296
    :type boolfilt: bool
297
    """
298 4
    if boolfilt:
299 4
        fx2 = [file for file in files if criterion(file, *critargs)]
300
    else:
301 4
        fx2 = [file for file in files if not criterion(file, *critargs)]
302 4
    return fx2
303
304
305 4
def compressfilter(filepath, selective=False):
306
    """
307
    Filter directory listing of working directory.
308
309
    :param filepath: Working directory. Required.
310
    :type filepath: str
311
312
    :param selective: Only compress autoloaders. Default is false.
313
    :type selective: bool/str
314
    """
315 4
    arx = bbconstants.ARCS
316 4
    pfx = bbconstants.PREFIXES
317 4
    files = [file for file in os.listdir(filepath) if not os.path.isdir(file)]
318 4
    if selective is None:
319 4
        filt2 = os.listdir(filepath)
320 4
    elif selective == "arcsonly":
321 4
        filt2 = filtercomp(files, utilities.prepends, ("", arx))
322 4
    elif selective:
323 4
        filt0 = filtercomp(files, utilities.prepends, (pfx, ""))
324 4
        filt1 = filtercomp(filt0, utilities.prepends, ("", arx), False)  # pop archives
325 4
        filt2 = filtercomp(filt1, utilities.prepends, ("", ".exe"))  # include exes
326
    else:
327 4
        filt2 = filtercomp(files, utilities.prepends, ("", arx), False)  # pop archives
328 4
    filt3 = [os.path.join(filepath, file) for file in filt2]
329 4
    return filt3
330
331
332 4
def prep_compress_function(method="7z", szexe=None, errors=False):
333
    """
334
    Prepare compression function and partial arguments.
335
336
    :param method: Compression type. Default is "7z".
337
    :type method: str
338
339
    :param szexe: Path to 7z executable, if needed.
340
    :type szexe: str
341
342
    :param errors: Print completion status message. Default is false.
343
    :type errors: bool
344
    """
345 4
    methods = {"7z": sz_compress, "tgz": tgz_compress, "txz": txz_compress, "tbz": tbz_compress,
346
               "tar": tar_compress, "zip": zip_compress}
347 4
    args = [szexe] if method == "7z" else []
348 4
    if method in ("7z", "tbz", "tgz"):
349 4
        args.append(calculate_strength())
350 4
    if method == "7z":
351 4
        args.append(errors)
352 4
    return methods[method], args
353
354
355 4
def compress(filepath, method="7z", szexe=None, selective=False, errors=False):
356
    """
357
    Compress all autoloader files in a given folder, with a given method.
358
359
    :param filepath: Working directory. Required.
360
    :type filepath: str
361
362
    :param method: Compression type. Default is "7z".
363
    :type method: str
364
365
    :param szexe: Path to 7z executable, if needed.
366
    :type szexe: str
367
368
    :param selective: Only compress autoloaders. Default is false.
369
    :type selective: bool
370
371
    :param errors: Print completion status message. Default is false.
372
    :type errors: bool
373
    """
374 4
    method = filter_method(method, szexe)
375 4
    files = compressfilter(filepath, selective)
376 4
    for file in files:
377 4
        fname = os.path.basename(file)
378 4
        filename = os.path.splitext(fname)[0]
379 4
        fileloc = os.path.join(filepath, filename)
380 4
        print("COMPRESSING: {0}".format(fname))
381 4
        compfunc, extargs = prep_compress_function(method, szexe, errors)
382 4
        compfunc(fileloc, file, *extargs)
383 4
    return True
384
385
386 4
def tarzip_verifier(file):
387
    """
388
    Assign .tar.xxx, .tar and .zip verifiers.
389
390
    :param file: Filename.
391
    :type file: str
392
    """
393 4
    maps = {".tar.gz": tgz_verify, ".tar.xz": txz_verify,
394
            ".tar.bz2": tbz_verify, ".tar": tar_verify,
395
            ".zip": zip_verify, ".bar": zip_verify}
396 4
    for key, value in maps.items():
397 4
        if file.endswith(key):
398
            return value(file)
399
400
401 4
def decide_verifier(file, szexe=None):
402
    """
403
    Decide which verifier function to use.
404
405
    :param file: Filename.
406
    :type file: str
407
408
    :param szexe: Path to 7z executable, if needed.
409
    :type szexe: str
410
    """
411 4
    print("VERIFYING: {0}".format(file))
412 4
    if file.endswith(".7z") and szexe is not None:
413
        verif = sz_verify(os.path.abspath(file), szexe)
414
    else:
415 4
        verif = tarzip_verifier(file)
416 4
    decide_verifier_printer(file, verif)
417 4
    return verif
418
419
420 4
def decide_verifier_printer(file, verif):
421
    """
422
    Print status of verifier function.
423
424
    :param file: Filename.
425
    :type file: str
426
427
    :param verif: If the file is OK or not.
428
    :type verif: bool
429
    """
430 4
    if not verif:
431 4
        print("{0} IS BROKEN!".format(os.path.basename(file)))
432
    else:
433
        print("{0} OK".format(os.path.basename(file)))
434
435
436 4
def verify(filepath, method="7z", szexe=None, selective=False):
437
    """
438
    Verify specific archive files in a given folder.
439
440
    :param filepath: Working directory. Required.
441
    :type filepath: str
442
443
    :param method: Compression type. Default is "7z". Defined in source.
444
    :type method: str
445
446
    :param szexe: Path to 7z executable, if needed.
447
    :type szexe: str
448
449
    :param selective: Only verify autoloaders. Default is false.
450
    :type selective: bool
451
    """
452 4
    method = filter_method(method, szexe)
453 4
    files = compressfilter(filepath, selective)
454 4
    for file in files:
455 4
        decide_verifier(file, szexe)
456
457
458 4
def compress_config_loader(homepath=None):
459
    """
460
    Read a ConfigParser file to get compression preferences.
461
462
    :param homepath: Folder containing ini file. Default is user directory.
463
    :type homepath: str
464
    """
465 4
    compini = iniconfig.generic_loader('compression', homepath)
466 4
    method = compini.get('method', fallback="7z")
467 4
    if not utilities.new_enough(3) and method == "txz":
468 1
        method = "zip"  # for 3.2 compatibility
469 4
    return method
470
471
472 4
def compress_config_writer(method=None, homepath=None):
473
    """
474
    Write a ConfigParser file to store compression preferences.
475
476
    :param method: Method to use.
477
    :type method: str
478
479
    :param homepath: Folder containing ini file. Default is user directory.
480
    :type homepath: str
481
    """
482 4
    method = compress_config_loader() if method is None else method
483 4
    results = {"method": method}
484
    iniconfig.generic_writer("compression", results, homepath)
485