Completed
Push — master ( 733f1c...18912f )
by John
01:11
created

filtercomp()   B

Complexity

Conditions 6

Size

Total Lines 19

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 6
c 1
b 0
f 0
dl 0
loc 19
rs 8
1
#!/usr/bin/env python3
2
"""This module is used to operate with archives."""
3
4
import os  # filesystem read
5
import subprocess  # invocation of 7z, cap
6
import zipfile  # zip compresssion
7
import tarfile  # txz/tbz/tgz/tar compression
8
import configparser  # config parsing, duh
9
from bbarchivist import utilities  # platform determination
10
from bbarchivist import bbconstants  # premade stuff
11
from bbarchivist import decorators  # timer
12
13
__author__ = "Thurask"
14
__license__ = "WTFPL v2"
15
__copyright__ = "Copyright 2015-2016 Thurask"
16
17
18
def smart_is_tarfile(filepath):
19
    """
20
    :func:`tarfile.is_tarfile` plus error handling.
21
22
    :param filepath: Filename.
23
    :type filepath: str
24
    """
25
    try:
26
        istar = tarfile.is_tarfile(filepath)
27
    except (OSError, IOError):
28
        return False
29
    else:
30
        return istar
31
32
33
@decorators.timer
34
def sz_compress(filepath, filename, szexe=None, strength=5, errors=False):
35
    """
36
    Pack a file into a LZMA2 7z file.
37
38
    :param filepath: Basename of file, no extension.
39
    :type filepath: str
40
41
    :param filename: Name of file to pack.
42
    :type filename: str
43
44
    :param szexe: Path to 7z executable.
45
    :type szexe: str
46
47
    :param strength: Compression strength. 5 is normal, 9 is ultra.
48
    :type strength: int
49
50
    :param errors: Print completion status message. Default is false.
51
    :type errors: bool
52
    """
53
    szcodes = {
54
        0: "NO ERRORS",
55
        1: "COMPLETED WITH WARNINGS",
56
        2: "FATAL ERROR",
57
        7: "COMMAND LINE ERROR",
58
        8: "OUT OF MEMORY ERROR",
59
        255: "USER STOPPED PROCESS"
60
    }
61
    strength = str(strength)
62
    rawname = os.path.dirname(filepath)
63
    thr = str(utilities.get_core_count())
64
    fold = os.path.join(rawname, filename)
65
    cmd = '{0} a -mx{1} -m0=lzma2 -mmt{2} "{3}.7z" "{4}"'.format(szexe, strength,
66
                                                                 thr, filepath, fold)
67
    with open(os.devnull, 'wb') as dnull:
68
        excode = subprocess.call(cmd, stdout=dnull, stderr=subprocess.STDOUT, shell=True)
69
    if errors:
70
        print(szcodes[excode])
71
72
73
def sz_verify(filepath, szexe=None):
74
    """
75
    Verify that a .7z file is valid and working.
76
77
    :param filepath: Filename.
78
    :type filepath: str
79
80
    :param szexe: Path to 7z executable.
81
    :type szexe: str
82
    """
83
    filepath = os.path.abspath(filepath)
84
    cmd = '{0} t "{1}"'.format(szexe, filepath)
85
    with open(os.devnull, 'wb') as dnull:
86
        excode = subprocess.call(cmd, stdout=dnull, stderr=subprocess.STDOUT, shell=True)
87
    return excode == 0
88
89
90
@decorators.timer
91
def tar_compress(filepath, filename):
92
    """
93
    Pack a file into an uncompressed tarfile.
94
95
    :param filepath: Basename of file, no extension.
96
    :type filepath: str
97
98
    :param filename: Name of file to pack.
99
    :type filename: str
100
    """
101
    with tarfile.open("{0}.tar".format(filepath), 'w:') as tfile:
102
        tfile.add(filename, filter=None)
103
104
105
def tar_verify(filepath):
106
    """
107
    Verify that a tar file is valid and working.
108
109
    :param filepath: Filename.
110
    :type filepath: str
111
    """
112
    if smart_is_tarfile(filepath):
113
        with tarfile.open(filepath, "r:") as thefile:
114
            mems = thefile.getmembers()
115
        return False if not mems else True
116
    else:
117
        return False
118
119
120
@decorators.timer
121
def tgz_compress(filepath, filename, strength=5):
122
    """
123
    Pack a file into a gzip tarfile.
124
125
    :param filepath: Basename of file, no extension.
126
    :type filepath: str
127
128
    :param filename: Name of file to pack.
129
    :type filename: str
130
131
    :param strength: Compression strength. 5 is normal, 9 is ultra.
132
    :type strength: int
133
    """
134
    with tarfile.open("{0}.tar.gz".format(filepath), 'w:gz', compresslevel=strength) as gzfile:
135
        gzfile.add(filename, filter=None)
136
137
138
def tgz_verify(filepath):
139
    """
140
    Verify that a tar.gz file is valid and working.
141
142
    :param filepath: Filename.
143
    :type filepath: str
144
    """
145
    if smart_is_tarfile(filepath):
146
        with tarfile.open(filepath, "r:gz") as thefile:
147
            mems = thefile.getmembers()
148
        return False if not mems else True
149
    else:
150
        return False
151
152
153
@decorators.timer
154
def tbz_compress(filepath, filename, strength=5):
155
    """
156
    Pack a file into a bzip2 tarfile.
157
158
    :param filepath: Basename of file, no extension.
159
    :type filepath: str
160
161
    :param filename: Name of file to pack.
162
    :type filename: str
163
164
    :param strength: Compression strength. 5 is normal, 9 is ultra.
165
    :type strength: int
166
    """
167
    with tarfile.open("{0}.tar.bz2".format(filepath), 'w:bz2', compresslevel=strength) as bzfile:
168
        bzfile.add(filename, filter=None)
169
170
171
def tbz_verify(filepath):
172
    """
173
    Verify that a tar.bz2 file is valid and working.
174
175
    :param filepath: Filename.
176
    :type filepath: str
177
    """
178
    if smart_is_tarfile(filepath):
179
        with tarfile.open(filepath, "r:bz2") as thefile:
180
            mems = thefile.getmembers()
181
        return False if not mems else True
182
    else:
183
        return False
184
185
186
@decorators.timer
187
def txz_compress(filepath, filename):
188
    """
189
    Pack a file into a LZMA tarfile.
190
191
    :param filepath: Basename of file, no extension.
192
    :type filepath: str
193
194
    :param filename: Name of file to pack.
195
    :type filename: str
196
    """
197
    if not utilities.new_enough(3):
198
        pass
199
    else:
200
        with tarfile.open("{0}.tar.xz".format(filepath), 'w:xz') as xzfile:
201
            xzfile.add(filename, filter=None)
202
203
204
def txz_verify(filepath):
205
    """
206
    Verify that a tar.xz file is valid and working.
207
208
    :param filepath: Filename.
209
    :type filepath: str
210
    """
211
    if not utilities.new_enough(3):
212
        return None
213
    else:
214
        if smart_is_tarfile(filepath):
215
            with tarfile.open(filepath, "r:xz") as thefile:
216
                mems = thefile.getmembers()
217
            return False if not mems else True
218
        else:
219
            return False
220
221
222
@decorators.timer
223
def zip_compress(filepath, filename):
224
    """
225
    Pack a file into a DEFLATE zipfile.
226
227
    :param filepath: Basename of file, no extension.
228
    :type filepath: str
229
230
    :param filename: Name of file to pack.
231
    :type filename: str
232
    """
233
    with zipfile.ZipFile("{0}.zip".format(filepath), 'w', zipfile.ZIP_DEFLATED, allowZip64=True) as zfile:
1 ignored issue
show
Coding Style introduced by
This line is too long as per the coding-style (106/100).

This check looks for lines that are too long. You can specify the maximum line length.

Loading history...
234
        zfile.write(filename)
235
236
237
def zip_verify(filepath):
238
    """
239
    Verify that a .zip file is valid and working.
240
241
    :param filepath: Filename.
242
    :type filepath: str
243
    """
244
    if zipfile.is_zipfile(filepath):
245
        try:
246
            with zipfile.ZipFile(filepath, "r") as zfile:
247
                brokens = zfile.testzip()
248
        except zipfile.BadZipFile:
249
            brokens = filepath
250
        return brokens != filepath
251
    else:
252
        return False
253
254
255
def filter_method(method, szexe=None):
256
    """
257
    Make sure methods are OK.
258
259
    :param method: Compression method to use.
260
    :type method: str
261
262
    :param szexe: Path to 7z executable, if needed.
263
    :type szexe: str
264
    """
265
    if not utilities.new_enough(3) and method == "txz":
266
        method = "zip"  # fallback
267
    if method == "7z" and szexe is None:
268
        ifexists = utilities.prep_seven_zip()  # see if 7z exists
269
        if not ifexists:
270
            method = "zip"  # fallback
271
        else:
272
            szexe = utilities.get_seven_zip(False)
273
    return method
274
275
276
def calculate_strength():
277
    """
278
    Determine zip/gzip/bzip2 strength by OS bit setting.
279
    """
280
    strength = 9 if utilities.is_amd64() else 5
281
    return strength
282
283
284
def filtercomp(files, criterion, critargs, boolfilt=True):
285
    """
286
    :param files: Files to work on.
287
    :type files: list(str)
288
289
    :param criterion: Function to use for evaluation.
290
    :type criterion: func
291
292
    :param critargs: Arguments for function, other than file.
293
    :type critargs: list
294
295
    :param boolfilt: True if comparing criterion, False if comparing not criterion.
296
    :type boolfilt: bool
297
    """
298
    if boolfilt:
299
        fx2 = [file for file in files if criterion(file, *critargs)]
1 ignored issue
show
Coding Style introduced by
Usage of * or ** arguments should usually be done with care.

Generally, there is nothing wrong with usage of * or ** arguments. For readability of the code base, we suggest to not over-use these language constructs though.

For more information, we can recommend this blog post from Ned Batchelder including its comments which also touches this aspect.

Loading history...
300
    else:
301
        fx2 = [file for file in files if not criterion(file, *critargs)]
1 ignored issue
show
Coding Style introduced by
Usage of * or ** arguments should usually be done with care.

Generally, there is nothing wrong with usage of * or ** arguments. For readability of the code base, we suggest to not over-use these language constructs though.

For more information, we can recommend this blog post from Ned Batchelder including its comments which also touches this aspect.

Loading history...
302
    return fx2
303
304
305
def compressfilter(filepath, selective=False):
306
    """
307
    Filter directory listing of working directory.
308
309
    :param filepath: Working directory. Required.
310
    :type filepath: str
311
312
    :param selective: Only compress autoloaders. Default is false.
313
    :type selective: bool
314
    """
315
    arx = bbconstants.ARCS
316
    pfx = bbconstants.PREFIXES
317
    files = [file for file in os.listdir(filepath) if not os.path.isdir(file)]
318
    if selective:
319
        filt0 = filtercomp(files, utilities.prepends, (pfx, ""))
320
        filt1 = filtercomp(filt0, utilities.prepends, ("", arx), False)
321
        filt2 = filtercomp(filt1, utilities.prepends, ("", ".exe"))
322
    else:
323
        filt2 = filtercomp(files, utilities.prepends, ("", arx), False)
324
    filt3 = [os.path.join(filepath, file) for file in filt2]
325
    return filt3
326
327
328
def compress(filepath, method="7z", szexe=None, selective=False, errors=False):
329
    """
330
    Compress all autoloader files in a given folder, with a given method.
331
332
    :param filepath: Working directory. Required.
333
    :type filepath: str
334
335
    :param method: Compression type. Default is "7z".
336
    :type method: str
337
338
    :param szexe: Path to 7z executable, if needed.
339
    :type szexe: str
340
341
    :param selective: Only compress autoloaders. Default is false.
342
    :type selective: bool
343
344
    :param errors: Print completion status message. Default is false.
345
    :type errors: bool
346
    """
347
    method = filter_method(method, szexe)
348
    files = compressfilter(filepath, selective)
349
    for file in files:
350
        filename = os.path.splitext(os.path.basename(file))[0]
351
        fileloc = os.path.join(filepath, filename)
352
        print("COMPRESSING: {0}.exe".format(filename))
353
        if method == "7z":
354
            sz_compress(fileloc, file, szexe, calculate_strength(), errors)
355
        elif method == "tgz":
356
            tgz_compress(fileloc, file, calculate_strength())
357
        elif method == "txz":
358
            txz_compress(fileloc, file)
359
        elif method == "tbz":
360
            tbz_compress(fileloc, file, calculate_strength())
361
        elif method == "tar":
362
            tar_compress(fileloc, file)
363
        elif method == "zip":
364
            zip_compress(fileloc, file)
365
    return True
366
367
368
def verify(thepath, method="7z", szexe=None, selective=False):
369
    """
370
    Verify specific archive files in a given folder.
371
372
    :param thepath: Working directory. Required.
373
    :type thepath: str
374
375
    :param method: Compression type. Default is "7z". Defined in source.
376
    :type method: str
377
378
    :param szexe: Path to 7z executable, if needed.
379
    :type szexe: str
380
381
    :param selective: Only verify autoloaders. Default is false.
382
    :type selective: bool
383
    """
384
    method = filter_method(method, szexe)
385
    pathlist = [os.path.join(thepath, file) for file in os.listdir(thepath)]
386
    files = [file for file in pathlist if not os.path.isdir(file)]
387
    for file in files:
388
        filt = file.endswith(bbconstants.ARCS)
389
        if selective:
390
            filt = filt and file.startswith(bbconstants.PREFIXES)
391
        if filt:
392
            print("VERIFYING: {0}".format(file))
393
            if file.endswith(".7z") and szexe is not None:
394
                verif = sz_verify(os.path.abspath(file), szexe)
395 View Code Duplication
            elif file.endswith(".tar.gz"):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
396
                verif = tgz_verify(file)
397
            elif file.endswith(".tar.xz"):
398
                verif = txz_verify(file)
399
            elif file.endswith(".tar.bz2"):
400
                verif = tbz_verify(file)
401
            elif file.endswith(".tar"):
402
                verif = tar_verify(file)
403
            elif file.endswith(".zip"):
404
                verif = zip_verify(file)
405
            if not verif:
406
                print("{0} IS BROKEN!".format((file)))
407
            return verif
408
409
410
def compress_suite(filepath, method="7z", szexe=None, selective=False):
411
    """
412
    Wrap compression and verification into one.
413
414
    :param filepath: Working directory. Required.
415
    :type filepath: str
416 View Code Duplication
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
417
    :param method: Compression type. Default is "7z". Defined in source.
418
    :type method: str
419
420
    :param szexe: Path to 7z executable, if needed.
421
    :type szexe: str
422
423
    :param selective: Only compress autoloaders. Default is false.
424
    :type selective: bool
425
    """
426
    compress(filepath, method, szexe, selective)
427
    verify(filepath, method, szexe, selective)
428
429
430
def compress_config_loader(homepath=None):
431
    """
432
    Read a ConfigParser file to get compression preferences.
433
434
    :param homepath: Folder containing ini file. Default is user directory.
435
    :type homepath: str
436
    """
437
    config = configparser.ConfigParser()
438
    if homepath is None:
439
        homepath = os.path.expanduser("~")
440
    conffile = os.path.join(homepath, "bbarchivist.ini")
441
    if not os.path.exists(conffile):
442
        open(conffile, 'w').close()
443
    config.read(conffile)
444
    if not config.has_section('compression'):
445
        config['compression'] = {}
446
    compini = config['compression']
447
    method = compini.get('method', fallback="7z")
448
    if not utilities.new_enough(3) and method == "txz":
449
        method = "zip"  # for 3.2 compatibility
450
    return method
451
452
453
def compress_config_writer(method=None, homepath=None):
454
    """
455
    Write a ConfigParser file to store compression preferences.
456
457
    :param method: Method to use.
458
    :type method: str
459
460
    :param homepath: Folder containing ini file. Default is user directory.
461
    :type homepath: str
462
    """
463
    if method is None:
464
        method = compress_config_loader()
465
    config = configparser.ConfigParser()
466
    if homepath is None:
467
        homepath = os.path.expanduser("~")
468
    conffile = os.path.join(homepath, "bbarchivist.ini")
469
    if not os.path.exists(conffile):
470
        open(conffile, 'w').close()
471
    config.read(conffile)
472
    if not config.has_section('compression'):
473
        config['compression'] = {}
474
    config['compression']['method'] = method
475
    with open(conffile, "w") as configfile:
476
        config.write(configfile)
477