Completed
Push — master ( cc1262...7611ec )
by John
04:48
created

remove_empty_folder()   B

Complexity

Conditions 6

Size

Total Lines 22

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 10
CRAP Score 6

Importance

Changes 0
Metric Value
cc 6
dl 0
loc 22
ccs 10
cts 10
cp 1
crap 6
rs 7.7857
c 0
b 0
f 0
1
#!/usr/bin/env python3
2 4
"""This module is used to operate with bar files."""
3
4 5
import os  # filesystem read
5 5
import zipfile  # zip extract, zip compresssion
6 5
import shutil  # folder operations
7 5
import base64  # encoding for hashes
8 5
import hashlib   # get hashes
9 5
from bbarchivist import utilities  # platform determination
10 5
from bbarchivist import exceptions  # exception handling
11 5
from bbarchivist import bbconstants  # premade stuff
12
13 5
__author__ = "Thurask"
14 5
__license__ = "WTFPL v2"
15 5
__copyright__ = "2015-2017 Thurask"
16
17
18 5
def extract_bars(filepath):
19
    """
20
    Extract .signed files from .bar files.
21
    Use system zlib.
22
23
    :param filepath: Path to bar file directory.
24
    :type filepath: str
25
    """
26 5
    try:
27 5
        for file in os.listdir(filepath):
28 5
            extract_individual_bar(file, filepath)
29 5
    except (RuntimeError, OSError) as exc:
30 5
        exceptions.handle_exception(exc, "EXTRACTION FAILURE", None)
31
32
33 5
def extract_individual_bar(file, filepath):
34
    """
35
    Generate bar file contents and extract signed files.
36
37
    :param file: Bar file to extract.
38
    :type file: str
39
40
    :param filepath: Path to bar file directory.
41
    :type filepath: str
42
    """
43 5
    if file.endswith(".bar"):
44 5
        print("EXTRACTING: {0}".format(file))
45 5
        zfile = zipfile.ZipFile(os.path.join(filepath, file), 'r')
46 5
        names = zfile.namelist()
47 5
        extract_signed_file(zfile, names, filepath)
48
49
50 5
def extract_signed_file(zfile, names, filepath):
51
    """
52
    Extract signed file from a provided bar.
53
54
    :param zfile: Open (!!!) ZipFile instance.
55
    :type zfile: zipfile.ZipFile
56
57
    :param names: List of bar file contents.
58
    :type names: list(str)
59
60
    :param filepath: Path to bar file directory.
61
    :type filepath: str
62
    """
63 5
    for name in names:
64 5
        if str(name).endswith(".signed"):
65 5
            zfile.extract(name, filepath)
66
67
68 5
def get_sha512_manifest(zfile):
69
    """
70
    Get MANIFEST.MF from a bar file.
71
72
    :param zfile: Open (!!!) ZipFile instance.
73
    :type zfile: zipfile.ZipFile
74
    """
75 5
    names = zfile.namelist()
76 5
    manifest = None
77 5
    for name in names:
78 5
        if name.endswith("MANIFEST.MF"):
79 5
            manifest = name
80 5
            break
81 5
    if manifest is None:
82 5
        raise SystemExit
83 5
    return manifest
84
85
86 5
def get_sha512_from_manifest(manf):
87
    """
88
    Retrieve asset name and hash from MANIFEST.MF file.
89
90
    :param manf: Content of MANIFEST.MF file, in bytes.
91
    :type manf: list(bytes)
92
    """
93 5
    alist = []
94 5
    for idx, line in enumerate(manf):
95 5
        if line.endswith(b"signed"):
96 5
            alist.append(manf[idx])
97 5
            alist.append(manf[idx + 1])
98 5
    assetname = alist[0].split(b": ")[1]
99 5
    assethash = alist[1].split(b": ")[1]
100 5
    return assetname, assethash
101
102
103 5
def retrieve_sha512(filename):
104
    """
105
    Get the premade, Base64 encoded SHA512 hash of a signed file in a bar.
106
107
    :param filename: Bar file to check.
108
    :type filename: str
109
    """
110 5
    try:
111 5
        zfile = zipfile.ZipFile(filename, 'r')
112 5
        manifest = get_sha512_manifest(zfile)
113 5
        manf = zfile.read(manifest).splitlines()
114 5
        assetname, assethash = get_sha512_from_manifest(manf)
115 5
        return assetname, assethash  # (b"blabla.signed", b"somehash")
116 5
    except (RuntimeError, OSError, zipfile.BadZipFile) as exc:
117 5
        exceptions.handle_exception(exc, "EXTRACTION FAILURE", None)
118
119
120 5
def verify_sha512(filename, inithash):
121
    """
122
    Compare the original hash value with the current.
123
124
    :param filename: Signed file to check.
125
    :type filename: str
126
127
    :param inithash: Original SHA512 hash, as bytestring.
128
    :type inithash: bytes
129
    """
130 5
    sha512 = hashlib.sha512()
131 5
    with open(filename, 'rb') as file:
132 5
        while True:
133 5
            data = file.read(16 * 1024 * 1024)
134 5
            if not data:
135 5
                break
136 5
            sha512.update(data)
137 5
    rawdigest = sha512.digest()  # must be bytestring, not hexadecimalized str
138 5
    b64h = base64.b64encode(rawdigest, altchars=b"-_")  # replace some chars
139 5
    b64h = b64h.strip(b"==")  # remove padding
140 5
    return b64h == inithash
141
142
143 5
def bar_tester(filepath):
144
    """
145
    Use zipfile in order to test a bar for errors.
146
147
    :param filepath: Path to bar file.
148
    :type filepath: str
149
    """
150 5
    try:
151 5
        with zipfile.ZipFile(filepath, "r") as zfile:
152 5
            brokens = zfile.testzip()
153 5
    except zipfile.BadZipFile:
154 5
        brokens = filepath
155 5
    return brokens
156
157
158 5
def remove_empty_folder(curdir, subdirs, files):
159
    """
160
    Remove a folder if it's empty.
161
162
    :param curdir: Target folder.
163
    :type curdir: str
164
165
    :param subdirs: Subdirectories inside target folder.
166
    :type subdirs: list(str)
167
168
    :param files: Files inside target folder.
169
    :type files: list(str)
170
    """
171 5
    while True:
172 5
        try:
173 5
            if not subdirs and not files:
174 5
                os.rmdir(curdir)
175 5
        except OSError:
176 5
            continue
177 5
        except NotImplementedError:
178 5
            break
179 5
        break
180
181
182 5
def remove_empty_folders(a_folder):
183
    """
184
    Remove empty folders in a given folder using os.walk().
185
186
    :param a_folder: Target folder.
187
    :type a_folder: str
188
    """
189 5
    for curdir, subdirs, files in os.walk(a_folder):
190 5
        remove_empty_folder(curdir, subdirs, files)
191
192
193 5
def persistent_remove(afile):
194
    """
195
    Remove a file, and if it doesn't want to remove, keep at it.
196
197
    :param afile: Path to file you want terminated with extreme prejudice.
198
    :type afile: str
199
    """
200 5
    while True:
201 5
        try:
202 5
            os.remove(afile)
203 5
        except PermissionError:
204 5
            continue
205
        else:
206 5
            break
207
208
209 5
def remove_signed_files(a_folder):
210
    """
211
    Remove signed files from a given folder.
212
213
    :param a_folder: Target folder.
214
    :type a_folder: str
215
    """
216 5
    files = [os.path.abspath(os.path.join(a_folder, file)) for file in os.listdir(a_folder)]
217 5
    for afile in files:
218 5
        if afile.endswith(".signed") and os.path.exists(afile):
219 5
            print("REMOVING: {0}".format(os.path.basename(afile)))
220 5
            persistent_remove(afile)
221
222
223 5
def remove_unpacked_loaders(osdir, raddir, radios):
224
    """
225
    Remove uncompressed loader folders.
226
227
    :param osdir: OS loader folder.
228
    :type osdir: str
229
230
    :param raddir: Radio loader folder.
231
    :type raddir: str
232
233
    :param radios: If we made radios this run.
234
    :type radios: bool
235
    """
236 5
    utilities.cond_do(shutil.rmtree, [osdir, raddir], condition=radios)
237
238
239 5
def create_blitz(a_folder, swver):
240
    """
241
    Create a blitz file: a zipped archive of all app/core/radio bars.
242
243
    :param a_folder: Target folder.
244
    :type a_folder: str
245
246
    :param swver: Software version to title the blitz.
247
    :type swver: str
248
    """
249 5
    fname = "Blitz-{0}.zip".format(swver)
250 5
    with zipfile.ZipFile(fname, 'w', zipfile.ZIP_DEFLATED, allowZip64=True) as zfile:
251 5
        for root, dirs, files in os.walk(a_folder):
252 5
            del dirs
253 5
            for file in files:
254 5
                print("ZIPPING: {0}".format(utilities.stripper(file)))
255 5
                abs_filename = os.path.join(root, file)
256 5
                abs_arcname = os.path.basename(abs_filename)
257 5
                zfile.write(abs_filename, abs_arcname)
258
259
260 5
def move_loaders(ldir,
261
                 exedir_os, exedir_rad,
262
                 zipdir_os, zipdir_rad):
263
    """
264
    Move autoloaders to zipped and loaders directories in localdir.
265
266
    :param ldir: Local directory, containing files you wish to move.
267
    :type ldir: str
268
269
    :param exedir_os: Large autoloader .exe destination.
270
    :type exedir_os: str
271
272
    :param exedir_rad: Small autoloader .exe destination.
273
    :type exedir_rad: str
274
275
    :param zipdir_os: Large autoloader archive destination.
276
    :type zipdir_os: str
277
278
    :param zipdir_rad: Small autoloader archive destination.
279
    :type zipdir_rad: str
280
    """
281 5
    arx = bbconstants.ARCS
282 5
    pfx = bbconstants.PREFIXES
283 5
    loaders = [os.path.join(ldir, file) for file in os.listdir(ldir) if utilities.prepends(file, pfx, ".exe")]
284 5
    move_loader_pairs(loaders, exedir_os, exedir_rad)
285 5
    zippeds = [os.path.join(ldir, file) for file in os.listdir(ldir) if utilities.prepends(file, pfx, arx)]
286 5
    move_loader_pairs(zippeds, zipdir_os, zipdir_rad)
287
288
289 5
def move_loader_pairs(files, dir_os, dir_rad):
290
    """
291
    Move autoloaders to zipped/loaders directories.
292
293
    :param files: List of autoloader files.
294
    :type files: list(str)
295
296
    :param dir_os: Large autoloader destination.
297
    :type dir_os: str
298
299
    :param dir_rad: Small autoloader destination.
300
    :type dir_rad: str
301
    """
302 5
    for file in files:
303 5
        print("MOVING: {0}".format(os.path.basename(file)))
304 5
        dest_os = os.path.join(dir_os, os.path.basename(file))
305 5
        dest_rad = os.path.join(dir_rad, os.path.basename(file))
306 5
        loader_sorter(file, dest_os, dest_rad)
307
308
309 5
def dirsizer(file, osdir, raddir, maxsize=90 * 1000 * 1000):
310
    """
311
    Return output directory based in input filesize.
312
313
    :param file: The file to sort. Absolute paths, please.
314
    :type file: str
315
316
    :param osdir: Large file destination.
317
    :type osdir: str
318
319
    :param raddir: Small file destination.
320
    :type raddir: str
321
322
    :param maxsize: Return osdir if filesize > maxsize else raddir. Default is 90MB.
323
    :type maxsize: int
324
    """
325 5
    return osdir if os.path.getsize(file) > maxsize else raddir
326
327
328 5
def loader_sorter(file, osdir, raddir):
329
    """
330
    Sort loaders based on size.
331
332
    :param file: The file to sort. Absolute paths, please.
333
    :type file: str
334
335
    :param osdir: Large file destination.
336
    :type osdir: str
337
338
    :param raddir: Small file destination.
339
    :type raddir: str
340
    """
341 5
    outdir = dirsizer(file, osdir, raddir)
342 5
    persistent_move(file, outdir)
343
344
345 5
def move_bars(localdir, osdir, radiodir):
346
    """
347
    Move bar files to subfolders of a given folder.
348
349
    :param localdir: Directory to use.
350
    :type localdir: str
351
352
    :param osdir: OS file directory (large bars).
353
    :type osdir: str
354
355
    :param radiodir: Radio file directory (small bars).
356
    :type radiodir: str
357
    """
358 5
    for files in os.listdir(localdir):
359 5
        if files.endswith(".bar"):
360 5
            print("MOVING: {0}".format(files))
361 5
            herefile = os.path.join(localdir, files)
362 5
            outdir = dirsizer(herefile, osdir, radiodir)
363 5
            atomic_move(herefile, outdir)
364
365
366 5
def persistent_move(infile, outdir):
367
    """
368
    Move file to given folder, removing file if it exists in folder.
369
370
    :param infile: Path to file to move.
371
    :type infile: str
372
373
    :param outdir: Directory to move to.
374
    :type outdir: str
375
    """
376 5
    while True:
377 5
        try:
378 5
            shutil.move(infile, outdir)
379 5
        except shutil.Error:
380 5
            os.remove(infile)
381 5
            continue
382 5
        break
383
384
385 5
def atomic_move(infile, outdir):
386
    """
387
    Move file to given folder, removing if things break.
388
389
    :param infile: Path to file to move.
390
    :type infile: str
391
392
    :param outdir: Directory to move to.
393
    :type outdir: str
394
    """
395 5
    try:
396 5
        shutil.move(infile, outdir)
397 5
    except shutil.Error:
398 5
        os.remove(os.path.join(outdir, infile))
399
400
401 5
def replace_bar_pair(localdir, osfile, radfile):
402
    """
403
    Move pair of OS and radio bars to a given folder.
404
405
    :param localdir: Final bar directory.
406
    :type localdir: str
407
408
    :param osfile: Path to OS file.
409
    :type osfile: str
410
411
    :param radfile: Path to radio file.
412
    :type radfile: str
413
    """
414 5
    shutil.move(osfile, localdir)
415 5
    shutil.move(radfile, localdir)
416
417
418 5
def replace_bars_bulk(localdir, barfiles):
419
    """
420
    Move set of OS and radio bars to a given folder.
421
422
    :param localdir: Final bar directory.
423
    :type localdir: str
424
425
    :param barfiles: List of OS/radio file paths.
426
    :type barfiles: list(str)
427
    """
428 5
    for barfile in barfiles:
429 5
        shutil.move(barfile, os.path.abspath(localdir))
430
431
432 5
def make_folder(localdir, root):
433
    """
434
    Make a folder if it doesn't exist.
435
436
    :param localdir: Top level folder.
437
    :type localdir: str
438
439
    :param root: Folder to create.
440
    :type root: str
441
    """
442 5
    if not os.path.exists(os.path.join(localdir, root)):
443 5
        os.makedirs(os.path.join(localdir, root), exist_ok=True)
444 5
    return os.path.join(localdir, root)
445
446
447 5
def make_dirpairs(localdir, root, osversion, radioversion):
448
    """
449
    Create a pair of directories, with OS/radio versions for names.
450
451
    :param localdir: Top level folder.
452
    :type localdir: str
453
454
    :param root: Name for folder containing OS/radio pairs.
455
    :type root: str
456
457
    :param osversion: OS version.
458
    :type osversion: str
459
460
    :param radioversion: Radio version.
461
    :type radioversion: str
462
    """
463 5
    rootdir = make_folder(localdir, root)
464 5
    osdir = make_folder(rootdir, osversion)
465 5
    radiodir = make_folder(rootdir, radioversion)
466 5
    return osdir, radiodir
467
468
469 5
def make_dirs(localdir, osversion, radioversion):
470
    """
471
    Create the directory tree needed for archivist/lazyloader.
472
473
    :param localdir: Root folder.
474
    :type localdir: str
475
476
    :param osversion: OS version.
477
    :type osversion: str
478
479
    :param radioversion: Radio version.
480
    :type radioversion: str
481
    """
482 5
    os.makedirs(localdir, exist_ok=True)
483 5
    bardir_os, bardir_radio = make_dirpairs(localdir, "bars", osversion, radioversion)
484 5
    loaderdir_os, loaderdir_radio = make_dirpairs(localdir, "loaders", osversion, radioversion)
485 5
    zipdir_os, zipdir_radio = make_dirpairs(localdir, "zipped", osversion, radioversion)
486
    return (bardir_os, bardir_radio, loaderdir_os, loaderdir_radio, zipdir_os, zipdir_radio)
487