bbarchivist.barutils   F
last analyzed

Complexity

Total Complexity 68

Size/Duplication

Total Lines 516
Duplicated Lines 0 %

Test Coverage

Coverage 100%

Importance

Changes 0
Metric Value
eloc 178
dl 0
loc 516
rs 2.96
c 0
b 0
f 0
ccs 177
cts 177
cp 1
wmc 68

28 Functions

Rating   Name   Duplication   Size   Complexity  
A persistent_move() 0 17 3
A atomic_move() 0 14 2
A indiv_folder_remove() 0 15 3
A persistent_remove() 0 14 4
A make_folder() 0 13 2
A remove_empty_folders() 0 9 2
A move_loaders() 0 24 1
A bar_tester() 0 13 3
A move_loader_pairs() 0 18 2
A remove_empty_folder() 0 21 4
A extract_bars() 0 13 3
A create_blitz() 0 19 4
A extract_individual_bar() 0 15 2
A move_bars() 0 19 3
A get_sha512_from_manifest() 0 15 3
A replace_bar_pair() 0 15 1
A remove_unpacked_loaders() 0 14 1
A replace_bars_bulk() 0 12 2
A make_dirs() 0 18 1
A dirsizer() 0 17 2
A make_dirpairs() 0 20 1
A extract_signed_file() 0 16 3
A retrieve_sha512() 0 15 2
A get_sha512_manifest() 0 16 4
A loader_sorter() 0 15 1
A verify_sha512() 0 21 4
A remove_signed_files() 0 12 4
A move_loaders_prep() 0 13 1

How to fix   Complexity   

Complexity

Complex classes like bbarchivist.barutils often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
#!/usr/bin/env python3
2 5
"""This module is used to operate with bar files."""
3
4 5
import base64  # encoding for hashes
5 5
import hashlib  # get hashes
6 5
import os  # filesystem read
7 5
import shutil  # folder operations
8 5
import zipfile  # zip extract, zip compresssion
9
10 5
from bbarchivist import bbconstants  # premade stuff
11 5
from bbarchivist import exceptions  # exception handling
12 5
from bbarchivist import utilities  # platform determination
13
14 5
__author__ = "Thurask"
15 5
__license__ = "WTFPL v2"
16 5
__copyright__ = "2015-2019 Thurask"
17
18
19 5
def extract_bars(filepath):
20
    """
21
    Extract .signed files from .bar files.
22
    Use system zlib.
23
24
    :param filepath: Path to bar file directory.
25
    :type filepath: str
26
    """
27 5
    try:
28 5
        for file in os.listdir(filepath):
29 5
            extract_individual_bar(file, filepath)
30 5
    except (RuntimeError, OSError) as exc:
31 5
        exceptions.handle_exception(exc, "EXTRACTION FAILURE", None)
32
33
34 5
def extract_individual_bar(file, filepath):
35
    """
36
    Generate bar file contents and extract signed files.
37
38
    :param file: Bar file to extract.
39
    :type file: str
40
41
    :param filepath: Path to bar file directory.
42
    :type filepath: str
43
    """
44 5
    if file.endswith(".bar"):
45 5
        print("EXTRACTING: {0}".format(file))
46 5
        zfile = zipfile.ZipFile(os.path.join(filepath, file), 'r')
47 5
        names = zfile.namelist()
48 5
        extract_signed_file(zfile, names, filepath)
49
50
51 5
def extract_signed_file(zfile, names, filepath):
52
    """
53
    Extract signed file from a provided bar.
54
55
    :param zfile: Open (!!!) ZipFile instance.
56
    :type zfile: zipfile.ZipFile
57
58
    :param names: List of bar file contents.
59
    :type names: list(str)
60
61
    :param filepath: Path to bar file directory.
62
    :type filepath: str
63
    """
64 5
    for name in names:
65 5
        if str(name).endswith(".signed"):
66 5
            zfile.extract(name, filepath)
67
68
69 5
def get_sha512_manifest(zfile):
70
    """
71
    Get MANIFEST.MF from a bar file.
72
73
    :param zfile: Open (!!!) ZipFile instance.
74
    :type zfile: zipfile.ZipFile
75
    """
76 5
    names = zfile.namelist()
77 5
    manifest = None
78 5
    for name in names:
79 5
        if name.endswith("MANIFEST.MF"):
80 5
            manifest = name
81 5
            break
82 5
    if manifest is None:
83 5
        raise SystemExit
84 5
    return manifest
85
86
87 5
def get_sha512_from_manifest(manf):
88
    """
89
    Retrieve asset name and hash from MANIFEST.MF file.
90
91
    :param manf: Content of MANIFEST.MF file, in bytes.
92
    :type manf: list(bytes)
93
    """
94 5
    alist = []
95 5
    for idx, line in enumerate(manf):
96 5
        if line.endswith(b"signed"):
97 5
            alist.append(manf[idx])
98 5
            alist.append(manf[idx + 1])
99 5
    assetname = alist[0].split(b": ")[1]
100 5
    assethash = alist[1].split(b": ")[1]
101 5
    return assetname, assethash
102
103
104 5
def retrieve_sha512(filename):
105
    """
106
    Get the premade, Base64 encoded SHA512 hash of a signed file in a bar.
107
108
    :param filename: Bar file to check.
109
    :type filename: str
110
    """
111 5
    try:
112 5
        zfile = zipfile.ZipFile(filename, 'r')
113 5
        manifest = get_sha512_manifest(zfile)
114 5
        manf = zfile.read(manifest).splitlines()
115 5
        assetname, assethash = get_sha512_from_manifest(manf)
116 5
        return assetname, assethash  # (b"blabla.signed", b"somehash")
117 5
    except (RuntimeError, OSError, zipfile.BadZipFile) as exc:
118 5
        exceptions.handle_exception(exc, "EXTRACTION FAILURE", None)
119
120
121 5
def verify_sha512(filename, inithash):
122
    """
123
    Compare the original hash value with the current.
124
125
    :param filename: Signed file to check.
126
    :type filename: str
127
128
    :param inithash: Original SHA512 hash, as bytestring.
129
    :type inithash: bytes
130
    """
131 5
    sha512 = hashlib.sha512()
132 5
    with open(filename, 'rb') as file:
133 5
        while True:
134 5
            data = file.read(16 * 1024 * 1024)
135 5
            if not data:
136 5
                break
137 5
            sha512.update(data)
138 5
    rawdigest = sha512.digest()  # must be bytestring, not hexadecimalized str
139 5
    b64h = base64.b64encode(rawdigest, altchars=b"-_")  # replace some chars
140 5
    b64h = b64h.strip(b"==")  # remove padding
141 5
    return b64h == inithash
142
143
144 5
def bar_tester(filepath):
145
    """
146
    Use zipfile in order to test a bar for errors.
147
148
    :param filepath: Path to bar file.
149
    :type filepath: str
150
    """
151 5
    try:
152 5
        with zipfile.ZipFile(filepath, "r") as zfile:
153 5
            brokens = zfile.testzip()
154 5
    except zipfile.BadZipFile:
155 5
        brokens = filepath
156 5
    return brokens
157
158
159 5
def remove_empty_folder(curdir, subdirs, files):
160
    """
161
    Remove a folder if it's empty.
162
163
    :param curdir: Target folder.
164
    :type curdir: str
165
166
    :param subdirs: Subdirectories inside target folder.
167
    :type subdirs: list(str)
168
169
    :param files: Files inside target folder.
170
    :type files: list(str)
171
    """
172 5
    while True:
173 5
        try:
174 5
            indiv_folder_remove(curdir, subdirs, files)
175 5
        except OSError:
176 5
            continue
177 5
        except NotImplementedError:
178 5
            break
179 5
        break
180
181
182 5
def indiv_folder_remove(curdir, subdirs, files):
183
    """
184
    Remove a folder if it's empty, the actual function.
185
186
    :param curdir: Target folder.
187
    :type curdir: str
188
189
    :param subdirs: Subdirectories inside target folder.
190
    :type subdirs: list(str)
191
192
    :param files: Files inside target folder.
193
    :type files: list(str)
194
    """
195 5
    if not subdirs and not files:
196 5
        os.rmdir(curdir)
197
198
199 5
def remove_empty_folders(a_folder):
200
    """
201
    Remove empty folders in a given folder using os.walk().
202
203
    :param a_folder: Target folder.
204
    :type a_folder: str
205
    """
206 5
    for curdir, subdirs, files in os.walk(a_folder):
207 5
        remove_empty_folder(curdir, subdirs, files)
208
209
210 5
def persistent_remove(afile):
211
    """
212
    Remove a file, and if it doesn't want to remove, keep at it.
213
214
    :param afile: Path to file you want terminated with extreme prejudice.
215
    :type afile: str
216
    """
217 5
    while True:
218 5
        try:
219 5
            os.remove(afile)
220 5
        except OSError:
221 5
            continue
222
        else:
223 5
            break
224
225
226 5
def remove_signed_files(a_folder):
227
    """
228
    Remove signed files from a given folder.
229
230
    :param a_folder: Target folder.
231
    :type a_folder: str
232
    """
233 5
    files = [os.path.abspath(os.path.join(a_folder, file)) for file in os.listdir(a_folder)]
234 5
    for afile in files:
235 5
        if afile.endswith(".signed") and os.path.exists(afile):
236 5
            print("REMOVING: {0}".format(os.path.basename(afile)))
237 5
            persistent_remove(afile)
238
239
240 5
def remove_unpacked_loaders(osdir, raddir, radios):
241
    """
242
    Remove uncompressed loader folders.
243
244
    :param osdir: OS loader folder.
245
    :type osdir: str
246
247
    :param raddir: Radio loader folder.
248
    :type raddir: str
249
250
    :param radios: If we made radios this run.
251
    :type radios: bool
252
    """
253 5
    utilities.cond_do(shutil.rmtree, [osdir, raddir], condition=radios)
254
255
256 5
def create_blitz(a_folder, swver):
257
    """
258
    Create a blitz file: a zipped archive of all app/core/radio bars.
259
260
    :param a_folder: Target folder.
261
    :type a_folder: str
262
263
    :param swver: Software version to title the blitz.
264
    :type swver: str
265
    """
266 5
    fname = "Blitz-{0}.zip".format(swver)
267 5
    with zipfile.ZipFile(fname, 'w', zipfile.ZIP_DEFLATED, allowZip64=True) as zfile:
268 5
        for root, dirs, files in os.walk(a_folder):
269 5
            del dirs
270 5
            for file in files:
271 5
                print("ZIPPING: {0}".format(utilities.stripper(file)))
272 5
                abs_filename = os.path.join(root, file)
273 5
                abs_arcname = os.path.basename(abs_filename)
274 5
                zfile.write(abs_filename, abs_arcname)
275
276
277 5
def move_loaders_prep(ldir, suf):
278
    """
279
    Prepare a list of filenames for moving loaders.
280
281
    :param ldir:
282
    :type ldir: str
283
284
    :param suf: Suffix(es) to check.
285
    :type suf: str or list or tuple
286
    """
287 5
    pfx = bbconstants.PREFIXES
288 5
    loaders = [os.path.join(ldir, file) for file in os.listdir(ldir) if utilities.prepends(file, pfx, suf)]
289 5
    return loaders
290
291
292 5
def move_loaders(ldir, exedir_os, exedir_rad, zipdir_os, zipdir_rad):
293
    """
294
    Move autoloaders to zipped and loaders directories in localdir.
295
296
    :param ldir: Local directory, containing files you wish to move.
297
    :type ldir: str
298
299
    :param exedir_os: Large autoloader .exe destination.
300
    :type exedir_os: str
301
302
    :param exedir_rad: Small autoloader .exe destination.
303
    :type exedir_rad: str
304
305
    :param zipdir_os: Large autoloader archive destination.
306
    :type zipdir_os: str
307
308
    :param zipdir_rad: Small autoloader archive destination.
309
    :type zipdir_rad: str
310
    """
311 5
    arx = bbconstants.ARCS
312 5
    loaders = move_loaders_prep(ldir, ".exe")
313 5
    move_loader_pairs(loaders, exedir_os, exedir_rad)
314 5
    zippeds = move_loaders_prep(ldir, arx)
315 5
    move_loader_pairs(zippeds, zipdir_os, zipdir_rad)
316
317
318 5
def move_loader_pairs(files, dir_os, dir_rad):
319
    """
320
    Move autoloaders to zipped/loaders directories.
321
322
    :param files: List of autoloader files.
323
    :type files: list(str)
324
325
    :param dir_os: Large autoloader destination.
326
    :type dir_os: str
327
328
    :param dir_rad: Small autoloader destination.
329
    :type dir_rad: str
330
    """
331 5
    for file in files:
332 5
        print("MOVING: {0}".format(os.path.basename(file)))
333 5
        dest_os = os.path.join(dir_os, os.path.basename(file))
334 5
        dest_rad = os.path.join(dir_rad, os.path.basename(file))
335 5
        loader_sorter(file, dest_os, dest_rad)
336
337
338 5
def dirsizer(file, osdir, raddir, maxsize=90 * 1000 * 1000):
339
    """
340
    Return output directory based in input filesize.
341
342
    :param file: The file to sort. Absolute paths, please.
343
    :type file: str
344
345
    :param osdir: Large file destination.
346
    :type osdir: str
347
348
    :param raddir: Small file destination.
349
    :type raddir: str
350
351
    :param maxsize: Return osdir if filesize > maxsize else raddir. Default is 90MB.
352
    :type maxsize: int
353
    """
354 5
    return osdir if os.path.getsize(file) > maxsize else raddir
355
356
357 5
def loader_sorter(file, osdir, raddir):
358
    """
359
    Sort loaders based on size.
360
361
    :param file: The file to sort. Absolute paths, please.
362
    :type file: str
363
364
    :param osdir: Large file destination.
365
    :type osdir: str
366
367
    :param raddir: Small file destination.
368
    :type raddir: str
369
    """
370 5
    outdir = dirsizer(file, osdir, raddir)
371 5
    persistent_move(file, outdir)
372
373
374 5
def move_bars(localdir, osdir, radiodir):
375
    """
376
    Move bar files to subfolders of a given folder.
377
378
    :param localdir: Directory to use.
379
    :type localdir: str
380
381
    :param osdir: OS file directory (large bars).
382
    :type osdir: str
383
384
    :param radiodir: Radio file directory (small bars).
385
    :type radiodir: str
386
    """
387 5
    for files in os.listdir(localdir):
388 5
        if files.endswith(".bar"):
389 5
            print("MOVING: {0}".format(files))
390 5
            herefile = os.path.join(localdir, files)
391 5
            outdir = dirsizer(herefile, osdir, radiodir)
392 5
            atomic_move(herefile, outdir)
393
394
395 5
def persistent_move(infile, outdir):
396
    """
397
    Move file to given folder, removing file if it exists in folder.
398
399
    :param infile: Path to file to move.
400
    :type infile: str
401
402
    :param outdir: Directory to move to.
403
    :type outdir: str
404
    """
405 5
    while True:
406 5
        try:
407 5
            shutil.move(infile, outdir)
408 5
        except shutil.Error:
409 5
            os.remove(infile)
410 5
            continue
411 5
        break
412
413
414 5
def atomic_move(infile, outdir):
415
    """
416
    Move file to given folder, removing if things break.
417
418
    :param infile: Path to file to move.
419
    :type infile: str
420
421
    :param outdir: Directory to move to.
422
    :type outdir: str
423
    """
424 5
    try:
425 5
        shutil.move(infile, outdir)
426 5
    except shutil.Error:
427 5
        os.remove(os.path.join(outdir, infile))
428
429
430 5
def replace_bar_pair(localdir, osfile, radfile):
431
    """
432
    Move pair of OS and radio bars to a given folder.
433
434
    :param localdir: Final bar directory.
435
    :type localdir: str
436
437
    :param osfile: Path to OS file.
438
    :type osfile: str
439
440
    :param radfile: Path to radio file.
441
    :type radfile: str
442
    """
443 5
    shutil.move(osfile, localdir)
444 5
    shutil.move(radfile, localdir)
445
446
447 5
def replace_bars_bulk(localdir, barfiles):
448
    """
449
    Move set of OS and radio bars to a given folder.
450
451
    :param localdir: Final bar directory.
452
    :type localdir: str
453
454
    :param barfiles: List of OS/radio file paths.
455
    :type barfiles: list(str)
456
    """
457 5
    for barfile in barfiles:
458 5
        shutil.move(barfile, os.path.abspath(localdir))
459
460
461 5
def make_folder(localdir, root):
462
    """
463
    Make a folder if it doesn't exist.
464
465
    :param localdir: Top level folder.
466
    :type localdir: str
467
468
    :param root: Folder to create.
469
    :type root: str
470
    """
471 5
    if not os.path.exists(os.path.join(localdir, root)):
472 5
        os.makedirs(os.path.join(localdir, root), exist_ok=True)
473 5
    return os.path.join(localdir, root)
474
475
476 5
def make_dirpairs(localdir, root, osversion, radioversion):
477
    """
478
    Create a pair of directories, with OS/radio versions for names.
479
480
    :param localdir: Top level folder.
481
    :type localdir: str
482
483
    :param root: Name for folder containing OS/radio pairs.
484
    :type root: str
485
486
    :param osversion: OS version.
487
    :type osversion: str
488
489
    :param radioversion: Radio version.
490
    :type radioversion: str
491
    """
492 5
    rootdir = make_folder(localdir, root)
493 5
    osdir = make_folder(rootdir, osversion)
494 5
    radiodir = make_folder(rootdir, radioversion)
495 5
    return osdir, radiodir
496
497
498 5
def make_dirs(localdir, osversion, radioversion):
499
    """
500
    Create the directory tree needed for archivist/lazyloader.
501
502
    :param localdir: Root folder.
503
    :type localdir: str
504
505
    :param osversion: OS version.
506
    :type osversion: str
507
508
    :param radioversion: Radio version.
509
    :type radioversion: str
510
    """
511 5
    os.makedirs(localdir, exist_ok=True)
512 5
    bardir_os, bardir_radio = make_dirpairs(localdir, "bars", osversion, radioversion)
513 5
    loaderdir_os, loaderdir_radio = make_dirpairs(localdir, "loaders", osversion, radioversion)
514 5
    zipdir_os, zipdir_radio = make_dirpairs(localdir, "zipped", osversion, radioversion)
515
    return (bardir_os, bardir_radio, loaderdir_os, loaderdir_radio, zipdir_os, zipdir_radio)
516