Completed
Push — master ( 031bf5...ff3ed3 )
by John
03:05
created

persistent_remove()   A

Complexity

Conditions 4

Size

Total Lines 14

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 5
CRAP Score 4.3731

Importance

Changes 0
Metric Value
cc 4
c 0
b 0
f 0
dl 0
loc 14
ccs 5
cts 7
cp 0.7143
crap 4.3731
rs 9.2
1
#!/usr/bin/env python3
2 4
"""This module is used to operate with bar files."""
3
4 4
import os  # filesystem read
5 4
import zipfile  # zip extract, zip compresssion
6 4
import shutil  # folder operations
7 4
import base64  # encoding for hashes
8 4
import hashlib   # get hashes
9 4
from bbarchivist import utilities  # platform determination
10 4
from bbarchivist import exceptions  # exception handling
11 4
from bbarchivist import bbconstants  # premade stuff
12
13 4
__author__ = "Thurask"
14 4
__license__ = "WTFPL v2"
15 4
__copyright__ = "Copyright 2015-2016 Thurask"
16
17
18 4
def extract_bars(filepath):
19
    """
20
    Extract .signed files from .bar files.
21
    Use system zlib.
22
23
    :param filepath: Path to bar file directory.
24
    :type filepath: str
25
    """
26 4
    try:
27 4
        for file in os.listdir(filepath):
28 4
            if file.endswith(".bar"):
29 4
                print("EXTRACTING: {0}".format(file))
30 4
                zfile = zipfile.ZipFile(os.path.join(filepath, file), 'r')
31 4
                names = zfile.namelist()
32 4
                for name in names:
33 4
                    if str(name).endswith(".signed"):
34 4
                        zfile.extract(name, filepath)
35 4
    except (RuntimeError, OSError) as exc:
36 4
        exceptions.handle_exception(exc, "EXTRACTION FAILURE", None)
37
38
39 4
def retrieve_sha512(filename):
40
    """
41
    Get the premade, Base64 encoded SHA512 hash of a signed file in a bar.
42
43
    :param filename: Bar file to check.
44
    :type filename: str
45
    """
46 4
    try:
47 4
        zfile = zipfile.ZipFile(filename, 'r')
48 4
        names = zfile.namelist()
49 4
        manifest = None
50 4
        for name in names:
51 4
            if name.endswith("MANIFEST.MF"):
52 4
                manifest = name
53 4
                break
54 4
        if manifest is None:
55 4
            raise SystemExit
56 4
        manf = zfile.read(manifest).splitlines()
57 4
        alist = []
58 4
        for idx, line in enumerate(manf):
59 4
            if line.endswith(b"signed"):
60 4
                alist.append(manf[idx])
61 4
                alist.append(manf[idx + 1])
62 4
        assetname = alist[0].split(b": ")[1]
63 4
        assethash = alist[1].split(b": ")[1]
64 4
        return assetname, assethash  # (b"blabla.signed", b"somehash")
65 4
    except (RuntimeError, OSError, zipfile.BadZipFile) as exc:
66 4
        exceptions.handle_exception(exc, "EXTRACTION FAILURE", None)
67
68
69 4
def verify_sha512(filename, inithash):
70
    """
71
    Compare the original hash value with the current.
72
73
    :param filename: Signed file to check.
74
    :type filename: str
75
76
    :param inithash: Original SHA512 hash, as bytestring.
77
    :type inithash: bytes
78
    """
79 4
    sha512 = hashlib.sha512()
80 4
    with open(filename, 'rb') as file:
81 4
        while True:
82 4
            data = file.read(16 * 1024 * 1024)
83 4
            if not data:
84 4
                break
85 4
            sha512.update(data)
86 4
    rawdigest = sha512.digest()  # must be bytestring, not hexadecimalized str
87 4
    b64h = base64.b64encode(rawdigest, altchars=b"-_")  # replace some chars
88 4
    b64h = b64h.strip(b"==")  # remove padding
89 4
    return b64h == inithash
90
91
92 4
def bar_tester(filepath):
93
    """
94
    Use zipfile in order to test a bar for errors.
95
96
    :param filepath: Path to bar file.
97
    :type filepath: str
98
    """
99 4
    try:
100 4
        with zipfile.ZipFile(filepath, "r") as zfile:
101 4
            brokens = zfile.testzip()
102 4
    except zipfile.BadZipFile:
103 4
        brokens = filepath
104 4
    return brokens
105
106
107 4
def remove_empty_folders(a_folder):
108
    """
109
    Remove empty folders in a given folder using os.walk().
110
111
    :param a_folder: Target folder.
112
    :type a_folder: str
113
    """
114 4
    for curdir, subdirs, files in os.walk(a_folder):
115 4
        while True:
116 4
            try:
117 4
                if not subdirs and not files:
118 4
                    os.rmdir(curdir)
119 4
            except OSError:
120
                continue
121 4
            except NotImplementedError:
122 4
                break
123 4
            break
124
125
126 4
def persistent_remove(afile):
127
    """
128
    Remove a file, and if it doesn't want to remove, keep at it.
129
130
    :param afile: Path to file you want terminated with extreme prejudice.
131
    :type afile: str
132
    """
133 4
    while True:
134 4
        try:
135 4
            os.remove(afile)
136
        except PermissionError:
137
            continue
138
        else:
139 4
            break
140
141
142 4
def remove_signed_files(a_folder):
143
    """
144
    Remove signed files from a given folder.
145
146
    :param a_folder: Target folder.
147
    :type a_folder: str
148
    """
149 4
    files = [os.path.abspath(os.path.join(a_folder, file)) for file in os.listdir(a_folder)]
150 4
    for afile in files:
151 4
        if afile.endswith(".signed") and os.path.exists(afile):
152 4
            print("REMOVING: {0}".format(os.path.basename(afile)))
153 4
            persistent_remove(afile)
154
155
156 4
def remove_unpacked_loaders(osdir, raddir, radios):
157
    """
158
    Remove uncompressed loader folders.
159
160
    :param osdir: OS loader folder.
161
    :type osdir: str
162
163
    :param raddir: Radio loader folder.
164
    :type raddir: str
165
166
    :param radios: If we made radios this run.
167
    :type radios: bool
168
    """
169 4
    shutil.rmtree(osdir)
170 4
    if radios:
171 4
        shutil.rmtree(raddir)
172
173
174 4
def create_blitz(a_folder, swver):
175
    """
176
    Create a blitz file: a zipped archive of all app/core/radio bars.
177
178
    :param a_folder: Target folder.
179
    :type a_folder: str
180
181
    :param swver: Software version to title the blitz.
182
    :type swver: str
183
    """
184 4
    fname = "Blitz-{0}.zip".format(swver)
185 4
    with zipfile.ZipFile(fname, 'w', zipfile.ZIP_DEFLATED, allowZip64=True) as zfile:
186 4
        for root, dirs, files in os.walk(a_folder):
187 4
            del dirs
188 4
            for file in files:
189 4
                print("ZIPPING: {0}".format(utilities.stripper(file)))
190 4
                abs_filename = os.path.join(root, file)
191 4
                abs_arcname = os.path.basename(abs_filename)
192 4
                zfile.write(abs_filename, abs_arcname)
193
194
195 4
def move_loaders(ldir,
196
                 exedir_os, exedir_rad,
197
                 zipdir_os, zipdir_rad):
198
    """
199
    Move autoloaders to zipped and loaders directories in localdir.
200
201
    :param ldir: Local directory, containing files you wish to move.
202
    :type ldir: str
203
204
    :param exedir_os: Large autoloader .exe destination.
205
    :type exedir_os: str
206
207
    :param exedir_rad: Small autoloader .exe destination.
208
    :type exedir_rad: str
209
210
    :param zipdir_os: Large autoloader archive destination.
211
    :type zipdir_os: str
212
213
    :param zipdir_rad: Small autoloader archive destination.
214
    :type zipdir_rad: str
215
    """
216 4
    arx = bbconstants.ARCS
217 4
    pfx = bbconstants.PREFIXES
218 4
    loaders = [os.path.join(ldir, file) for file in os.listdir(ldir) if utilities.prepends(file, pfx, ".exe")]
219 4
    for file in loaders:
220 4
        print("MOVING: {0}".format(os.path.basename(file)))
221 4
        exedest_os = os.path.join(exedir_os, os.path.basename(file))
222 4
        exedest_rad = os.path.join(exedir_rad, os.path.basename(file))
223 4
        loader_sorter(file, exedest_os, exedest_rad)
224 4
    zippeds = [os.path.join(ldir, file) for file in os.listdir(ldir) if utilities.prepends(file, pfx, arx)]
225 4
    for file in zippeds:
226 4
        print("MOVING: {0}".format(os.path.basename(file)))
227 4
        zipdest_os = os.path.join(zipdir_os, os.path.basename(file))
228 4
        zipdest_rad = os.path.join(zipdir_rad, os.path.basename(file))
229 4
        loader_sorter(file, zipdest_os, zipdest_rad)
230
231
232 4
def loader_sorter(file, osdir, raddir):
233
    """
234
    Sort loaders based on size.
235
236
    :param file: The file to sort. Absolute paths, please.
237
    :type file: str
238
239
    :param osdir: Large file destination.
240
    :type osdir: str
241
242
    :param raddir: Small file destination.
243
    :type raddir: str
244
    """
245 4
    if os.path.getsize(file) > 90000000:
246 4
        while True:
247 4
            try:
248 4
                shutil.move(file, osdir)
249
            except shutil.Error:
250
                os.remove(file)
251
                continue
252 4
            break
253
    else:
254 4
        while True:
255 4
            try:
256 4
                shutil.move(file, raddir)
257
            except shutil.Error:
258
                os.remove(file)
259
                continue
260 4
            break
261
262
263 4
def move_bars(localdir, osdir, radiodir):
264
    """
265
    Move bar files to subfolders of a given folder.
266
267
    :param localdir: Directory to use.
268
    :type localdir: str
269
270
    :param osdir: OS file directory (large bars).
271
    :type osdir: str
272
273
    :param radiodir: Radio file directory (small bars).
274
    :type radiodir: str
275
    """
276 4
    for files in os.listdir(localdir):
277 4
        if files.endswith(".bar"):
278 4
            print("MOVING: {0}".format(files))
279 4
            bardest_os = os.path.join(osdir, files)
280 4
            bardest_radio = os.path.join(radiodir, files)
281
            # even the fattest radio is less than 90MB
282 4
            if os.path.getsize(os.path.join(localdir, files)) > 90000000:
283 4
                try:
284 4
                    shutil.move(os.path.join(localdir, files), osdir)
285
                except shutil.Error:
286
                    os.remove(bardest_os)
287
            else:
288 4
                try:
289 4
                    shutil.move(os.path.join(localdir, files), radiodir)
290
                except shutil.Error:
291
                    os.remove(bardest_radio)
292
293
294 4
def replace_bar_pair(localdir, osfile, radfile):
295
    """
296
    Move pair of OS and radio bars to a given folder.
297
298
    :param localdir: Final bar directory.
299
    :type localdir: str
300
301
    :param osfile: Path to OS file.
302
    :type osfile: str
303
304
    :param radfile: Path to radio file.
305
    :type radfile: str
306
    """
307 4
    shutil.move(osfile, localdir)
308 4
    shutil.move(radfile, localdir)
309
310
311 4
def replace_bars_bulk(localdir, barfiles):
312
    """
313
    Move set of OS and radio bars to a given folder.
314
315
    :param localdir: Final bar directory.
316
    :type localdir: str
317
318
    :param barfiles: List of OS/radio file paths.
319
    :type barfiles: list(str)
320
    """
321 4
    for barfile in barfiles:
322 4
        shutil.move(barfile, os.path.abspath(localdir))
323
324
325 4
def make_folder(localdir, root):
326
    """
327
    Make a folder if it doesn't exist.
328
329
    :param localdir: Top level folder.
330
    :type localdir: str
331
332
    :param root: Folder to create.
333
    :type root: str
334
    """
335 4
    if not os.path.exists(os.path.join(localdir, root)):
336 4
        os.makedirs(os.path.join(localdir, root), exist_ok=True)
337 4
    return os.path.join(localdir, root)
338
339
340 4
def make_dirpairs(localdir, root, osversion, radioversion):
341
    """
342
    Create a pair of directories, with OS/radio versions for names.
343
344
    :param localdir: Top level folder.
345
    :type localdir: str
346
347
    :param root: Name for folder containing OS/radio pairs.
348
    :type root: str
349
350
    :param osversion: OS version.
351
    :type osversion: str
352
353
    :param radioversion: Radio version.
354
    :type radioversion: str
355
    """
356 4
    rootdir = make_folder(localdir, root)
357 4
    osdir = make_folder(rootdir, osversion)
358 4
    radiodir = make_folder(rootdir, radioversion)
359 4
    return osdir, radiodir
360
361
362 4
def make_dirs(localdir, osversion, radioversion):
363
    """
364
    Create the directory tree needed for archivist/lazyloader.
365
366
    :param localdir: Root folder.
367
    :type localdir: str
368
369
    :param osversion: OS version.
370
    :type osversion: str
371
372
    :param radioversion: Radio version.
373
    :type radioversion: str
374
    """
375 4
    os.makedirs(localdir, exist_ok=True)
376 4
    bardir_os, bardir_radio = make_dirpairs(localdir, "bars", osversion, radioversion)
377 4
    loaderdir_os, loaderdir_radio = make_dirpairs(localdir, "loaders", osversion, radioversion)
378 4
    zipdir_os, zipdir_radio = make_dirpairs(localdir, "zipped", osversion, radioversion)
379
    return (bardir_os, bardir_radio, loaderdir_os, loaderdir_radio, zipdir_os, zipdir_radio)
380