Completed
Push — master ( 2df547...733f1c )
by John
01:24
created

compressfilter()   D

Complexity

Conditions 13

Size

Total Lines 21

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 13
dl 0
loc 21
rs 4.2483
c 0
b 0
f 0

How to fix   Complexity   

Complexity

Complex classes like compressfilter() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
#!/usr/bin/env python3
2
"""This module is used to operate with archives."""
3
4
import os  # filesystem read
5
import subprocess  # invocation of 7z, cap
6
import zipfile  # zip compresssion
7
import tarfile  # txz/tbz/tgz/tar compression
8
import configparser  # config parsing, duh
9
from bbarchivist import utilities  # platform determination
10
from bbarchivist import bbconstants  # premade stuff
11
from bbarchivist import decorators  # timer
12
13
__author__ = "Thurask"
14
__license__ = "WTFPL v2"
15
__copyright__ = "Copyright 2015-2016 Thurask"
16
17
18
def smart_is_tarfile(filepath):
19
    """
20
    :func:`tarfile.is_tarfile` plus error handling.
21
22
    :param filepath: Filename.
23
    :type filepath: str
24
    """
25
    try:
26
        istar = tarfile.is_tarfile(filepath)
27
    except (OSError, IOError):
28
        return False
29
    else:
30
        return istar
31
32
33
@decorators.timer
34
def sz_compress(filepath, filename, szexe=None, strength=5, errors=False):
35
    """
36
    Pack a file into a LZMA2 7z file.
37
38
    :param filepath: Basename of file, no extension.
39
    :type filepath: str
40
41
    :param filename: Name of file to pack.
42
    :type filename: str
43
44
    :param szexe: Path to 7z executable.
45
    :type szexe: str
46
47
    :param strength: Compression strength. 5 is normal, 9 is ultra.
48
    :type strength: int
49
50
    :param errors: Print completion status message. Default is false.
51
    :type errors: bool
52
    """
53
    szcodes = {
54
        0: "NO ERRORS",
55
        1: "COMPLETED WITH WARNINGS",
56
        2: "FATAL ERROR",
57
        7: "COMMAND LINE ERROR",
58
        8: "OUT OF MEMORY ERROR",
59
        255: "USER STOPPED PROCESS"
60
    }
61
    strength = str(strength)
62
    rawname = os.path.dirname(filepath)
63
    thr = str(utilities.get_core_count())
64
    fold = os.path.join(rawname, filename)
65
    cmd = '{0} a -mx{1} -m0=lzma2 -mmt{2} "{3}.7z" "{4}"'.format(szexe, strength,
66
                                                                 thr, filepath, fold)
67
    with open(os.devnull, 'wb') as dnull:
68
        excode = subprocess.call(cmd, stdout=dnull, stderr=subprocess.STDOUT, shell=True)
69
    if errors:
70
        print(szcodes[excode])
71
72
73
def sz_verify(filepath, szexe=None):
74
    """
75
    Verify that a .7z file is valid and working.
76
77
    :param filepath: Filename.
78
    :type filepath: str
79
80
    :param szexe: Path to 7z executable.
81
    :type szexe: str
82
    """
83
    filepath = os.path.abspath(filepath)
84
    cmd = '{0} t "{1}"'.format(szexe, filepath)
85
    with open(os.devnull, 'wb') as dnull:
86
        excode = subprocess.call(cmd, stdout=dnull, stderr=subprocess.STDOUT, shell=True)
87
    return excode == 0
88
89
90
@decorators.timer
91
def tar_compress(filepath, filename):
92
    """
93
    Pack a file into an uncompressed tarfile.
94
95
    :param filepath: Basename of file, no extension.
96
    :type filepath: str
97
98
    :param filename: Name of file to pack.
99
    :type filename: str
100
    """
101
    with tarfile.open("{0}.tar".format(filepath), 'w:') as tfile:
102
        tfile.add(filename, filter=None)
103
104
105
def tar_verify(filepath):
106
    """
107
    Verify that a tar file is valid and working.
108
109
    :param filepath: Filename.
110
    :type filepath: str
111
    """
112
    if smart_is_tarfile(filepath):
113
        with tarfile.open(filepath, "r:") as thefile:
114
            mems = thefile.getmembers()
115
        return False if not mems else True
116
    else:
117
        return False
118
119
120
@decorators.timer
121
def tgz_compress(filepath, filename, strength=5):
122
    """
123
    Pack a file into a gzip tarfile.
124
125
    :param filepath: Basename of file, no extension.
126
    :type filepath: str
127
128
    :param filename: Name of file to pack.
129
    :type filename: str
130
131
    :param strength: Compression strength. 5 is normal, 9 is ultra.
132
    :type strength: int
133
    """
134
    with tarfile.open("{0}.tar.gz".format(filepath), 'w:gz', compresslevel=strength) as gzfile:
135
        gzfile.add(filename, filter=None)
136
137
138
def tgz_verify(filepath):
139
    """
140
    Verify that a tar.gz file is valid and working.
141
142
    :param filepath: Filename.
143
    :type filepath: str
144
    """
145
    if smart_is_tarfile(filepath):
146
        with tarfile.open(filepath, "r:gz") as thefile:
147
            mems = thefile.getmembers()
148
        return False if not mems else True
149
    else:
150
        return False
151
152
153
@decorators.timer
154
def tbz_compress(filepath, filename, strength=5):
155
    """
156
    Pack a file into a bzip2 tarfile.
157
158
    :param filepath: Basename of file, no extension.
159
    :type filepath: str
160
161
    :param filename: Name of file to pack.
162
    :type filename: str
163
164
    :param strength: Compression strength. 5 is normal, 9 is ultra.
165
    :type strength: int
166
    """
167
    with tarfile.open("{0}.tar.bz2".format(filepath), 'w:bz2', compresslevel=strength) as bzfile:
168
        bzfile.add(filename, filter=None)
169
170
171
def tbz_verify(filepath):
172
    """
173
    Verify that a tar.bz2 file is valid and working.
174
175
    :param filepath: Filename.
176
    :type filepath: str
177
    """
178
    if smart_is_tarfile(filepath):
179
        with tarfile.open(filepath, "r:bz2") as thefile:
180
            mems = thefile.getmembers()
181
        return False if not mems else True
182
    else:
183
        return False
184
185
186
@decorators.timer
187
def txz_compress(filepath, filename):
188
    """
189
    Pack a file into a LZMA tarfile.
190
191
    :param filepath: Basename of file, no extension.
192
    :type filepath: str
193
194
    :param filename: Name of file to pack.
195
    :type filename: str
196
    """
197
    if not utilities.new_enough(3):
198
        pass
199
    else:
200
        with tarfile.open("{0}.tar.xz".format(filepath), 'w:xz') as xzfile:
201
            xzfile.add(filename, filter=None)
202
203
204
def txz_verify(filepath):
205
    """
206
    Verify that a tar.xz file is valid and working.
207
208
    :param filepath: Filename.
209
    :type filepath: str
210
    """
211
    if not utilities.new_enough(3):
212
        return None
213
    else:
214
        if smart_is_tarfile(filepath):
215
            with tarfile.open(filepath, "r:xz") as thefile:
216
                mems = thefile.getmembers()
217
            return False if not mems else True
218
        else:
219
            return False
220
221
222
@decorators.timer
223
def zip_compress(filepath, filename):
224
    """
225
    Pack a file into a DEFLATE zipfile.
226
227
    :param filepath: Basename of file, no extension.
228
    :type filepath: str
229
230
    :param filename: Name of file to pack.
231
    :type filename: str
232
    """
233
    with zipfile.ZipFile("{0}.zip".format(filepath), 'w', zipfile.ZIP_DEFLATED, allowZip64=True) as zfile:
1 ignored issue
show
Coding Style introduced by
This line is too long as per the coding-style (106/100).

This check looks for lines that are too long. You can specify the maximum line length.

Loading history...
234
        zfile.write(filename)
235
236
237
def zip_verify(filepath):
238
    """
239
    Verify that a .zip file is valid and working.
240
241
    :param filepath: Filename.
242
    :type filepath: str
243
    """
244
    if zipfile.is_zipfile(filepath):
245
        try:
246
            with zipfile.ZipFile(filepath, "r") as zfile:
247
                brokens = zfile.testzip()
248
        except zipfile.BadZipFile:
249
            brokens = filepath
250
        return brokens != filepath
251
    else:
252
        return False
253
254
255
def filter_method(method, szexe=None):
256
    """
257
    Make sure methods are OK.
258
259
    :param method: Compression method to use.
260
    :type method: str
261
262
    :param szexe: Path to 7z executable, if needed.
263
    :type szexe: str
264
    """
265
    if not utilities.new_enough(3) and method == "txz":
266
        method = "zip"  # fallback
267
    if method == "7z" and szexe is None:
268
        ifexists = utilities.prep_seven_zip()  # see if 7z exists
269
        if not ifexists:
270
            method = "zip"  # fallback
271
        else:
272
            szexe = utilities.get_seven_zip(False)
273
    return method
274
275
276
def calculate_strength():
277
    """
278
    Determine zip/gzip/bzip2 strength by OS bit setting.
279
    """
280
    strength = 9 if utilities.is_amd64() else 5
281
    return strength
282
283
284
def compressfilter(filepath, selective=False):
285
    """
286
    Filter directory listing of working directory.
287
288
    :param filepath: Working directory. Required.
289
    :type filepath: str
290
291
    :param selective: Only compress autoloaders. Default is false.
292
    :type selective: bool
293
    """
294
    arx = bbconstants.ARCS
295
    pfx = bbconstants.PREFIXES
296
    files = [file for file in os.listdir(filepath) if not os.path.isdir(file)]
297
    if selective:
298
        filt0 = [file for file in files if utilities.prepends(file, pfx, "")]
299
        filt1 = [file for file in filt0 if not utilities.prepends(file, "", arx)]
300
        filt2 = [file for file in filt1 if utilities.prepends(file, "", ".exe")]
301
    else:
302
        filt2 = [file for file in files if not utilities.prepends(file, "", arx)]
303
    filt3 = [os.path.join(filepath, file) for file in filt2]
304
    return filt3
305
306
307
def compress(filepath, method="7z", szexe=None, selective=False, errors=False):
308
    """
309
    Compress all autoloader files in a given folder, with a given method.
310
311
    :param filepath: Working directory. Required.
312
    :type filepath: str
313
314
    :param method: Compression type. Default is "7z".
315
    :type method: str
316
317
    :param szexe: Path to 7z executable, if needed.
318
    :type szexe: str
319
320
    :param selective: Only compress autoloaders. Default is false.
321
    :type selective: bool
322
323
    :param errors: Print completion status message. Default is false.
324
    :type errors: bool
325
    """
326
    method = filter_method(method, szexe)
327
    files = compressfilter(filepath, selective)
328
    for file in files:
329
        filename = os.path.splitext(os.path.basename(file))[0]
330
        fileloc = os.path.join(filepath, filename)
331
        print("COMPRESSING: {0}.exe".format(filename))
332
        if method == "7z":
333
            sz_compress(fileloc, file, szexe, calculate_strength(), errors)
334
        elif method == "tgz":
335
            tgz_compress(fileloc, file, calculate_strength())
336
        elif method == "txz":
337
            txz_compress(fileloc, file)
338
        elif method == "tbz":
339
            tbz_compress(fileloc, file, calculate_strength())
340
        elif method == "tar":
341
            tar_compress(fileloc, file)
342
        elif method == "zip":
343
            zip_compress(fileloc, file)
344
    return True
345
346
347
def verify(thepath, method="7z", szexe=None, selective=False):
348
    """
349
    Verify specific archive files in a given folder.
350
351
    :param thepath: Working directory. Required.
352
    :type thepath: str
353
354
    :param method: Compression type. Default is "7z". Defined in source.
355
    :type method: str
356
357
    :param szexe: Path to 7z executable, if needed.
358
    :type szexe: str
359
360
    :param selective: Only verify autoloaders. Default is false.
361
    :type selective: bool
362
    """
363
    method = filter_method(method, szexe)
364
    pathlist = [os.path.join(thepath, file) for file in os.listdir(thepath)]
365
    files = [file for file in pathlist if not os.path.isdir(file)]
366
    for file in files:
367
        filt = file.endswith(bbconstants.ARCS)
368
        if selective:
369
            filt = filt and file.startswith(bbconstants.PREFIXES)
370
        if filt:
371
            print("VERIFYING: {0}".format(file))
372
            if file.endswith(".7z") and szexe is not None:
373
                verif = sz_verify(os.path.abspath(file), szexe)
374
            elif file.endswith(".tar.gz"):
375
                verif = tgz_verify(file)
376
            elif file.endswith(".tar.xz"):
377
                verif = txz_verify(file)
378
            elif file.endswith(".tar.bz2"):
379
                verif = tbz_verify(file)
380
            elif file.endswith(".tar"):
381
                verif = tar_verify(file)
382
            elif file.endswith(".zip"):
383
                verif = zip_verify(file)
384
            if not verif:
385
                print("{0} IS BROKEN!".format((file)))
386
            return verif
387
388
389
def compress_suite(filepath, method="7z", szexe=None, selective=False):
390
    """
391
    Wrap compression and verification into one.
392
393
    :param filepath: Working directory. Required.
394
    :type filepath: str
395 View Code Duplication
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
396
    :param method: Compression type. Default is "7z". Defined in source.
397
    :type method: str
398
399
    :param szexe: Path to 7z executable, if needed.
400
    :type szexe: str
401
402
    :param selective: Only compress autoloaders. Default is false.
403
    :type selective: bool
404
    """
405
    compress(filepath, method, szexe, selective)
406
    verify(filepath, method, szexe, selective)
407
408
409
def compress_config_loader(homepath=None):
410
    """
411
    Read a ConfigParser file to get compression preferences.
412
413
    :param homepath: Folder containing ini file. Default is user directory.
414
    :type homepath: str
415
    """
416 View Code Duplication
    config = configparser.ConfigParser()
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
417
    if homepath is None:
418
        homepath = os.path.expanduser("~")
419
    conffile = os.path.join(homepath, "bbarchivist.ini")
420
    if not os.path.exists(conffile):
421
        open(conffile, 'w').close()
422
    config.read(conffile)
423
    if not config.has_section('compression'):
424
        config['compression'] = {}
425
    compini = config['compression']
426
    method = compini.get('method', fallback="7z")
427
    if not utilities.new_enough(3) and method == "txz":
428
        method = "zip"  # for 3.2 compatibility
429
    return method
430
431
432
def compress_config_writer(method=None, homepath=None):
433
    """
434
    Write a ConfigParser file to store compression preferences.
435
436
    :param method: Method to use.
437
    :type method: str
438
439
    :param homepath: Folder containing ini file. Default is user directory.
440
    :type homepath: str
441
    """
442
    if method is None:
443
        method = compress_config_loader()
444
    config = configparser.ConfigParser()
445
    if homepath is None:
446
        homepath = os.path.expanduser("~")
447
    conffile = os.path.join(homepath, "bbarchivist.ini")
448
    if not os.path.exists(conffile):
449
        open(conffile, 'w').close()
450
    config.read(conffile)
451
    if not config.has_section('compression'):
452
        config['compression'] = {}
453
    config['compression']['method'] = method
454
    with open(conffile, "w") as configfile:
455
        config.write(configfile)
456