Completed
Push — master ( 12684e...031bf5 )
by John
02:38
created

replace_bars_bulk()   A

Complexity

Conditions 2

Size

Total Lines 12

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 3
CRAP Score 2

Importance

Changes 0
Metric Value
cc 2
dl 0
loc 12
rs 9.4285
c 0
b 0
f 0
ccs 3
cts 3
cp 1
crap 2
1
#!/usr/bin/env python3
2 4
"""This module is used to operate with bar files."""
3
4 4
import os  # filesystem read
5 4
import zipfile  # zip extract, zip compresssion
6 4
import shutil  # folder operations
7 4
import base64  # encoding for hashes
8 4
import hashlib   # get hashes
9 4
from bbarchivist import utilities  # platform determination
10 4
from bbarchivist import exceptions  # exception handling
11 4
from bbarchivist import bbconstants  # premade stuff
12
13 4
__author__ = "Thurask"
14 4
__license__ = "WTFPL v2"
15 4
__copyright__ = "Copyright 2015-2016 Thurask"
16
17
18 4
def extract_bars(filepath):
19
    """
20
    Extract .signed files from .bar files.
21
    Use system zlib.
22
23
    :param filepath: Path to bar file directory.
24
    :type filepath: str
25
    """
26 4
    try:
27 4
        for file in os.listdir(filepath):
28 4
            if file.endswith(".bar"):
29 4
                print("EXTRACTING: {0}".format(file))
30 4
                zfile = zipfile.ZipFile(os.path.join(filepath, file), 'r')
31 4
                names = zfile.namelist()
32 4
                for name in names:
33 4
                    if str(name).endswith(".signed"):
34 4
                        zfile.extract(name, filepath)
35 4
    except (RuntimeError, OSError) as exc:
36 4
        exceptions.handle_exception(exc, "EXTRACTION FAILURE", None)
37
38
39 4
def retrieve_sha512(filename):
40
    """
41
    Get the premade, Base64 encoded SHA512 hash of a signed file in a bar.
42
43
    :param filename: Bar file to check.
44
    :type filename: str
45
    """
46 4
    try:
47 4
        zfile = zipfile.ZipFile(filename, 'r')
48 4
        names = zfile.namelist()
49 4
        manifest = None
50 4
        for name in names:
51 4
            if name.endswith("MANIFEST.MF"):
52 4
                manifest = name
53 4
                break
54 4
        if manifest is None:
55 4
            raise SystemExit
56 4
        manf = zfile.read(manifest).splitlines()
57 4
        alist = []
58 4
        for idx, line in enumerate(manf):
59 4
            if line.endswith(b"signed"):
60 4
                alist.append(manf[idx])
61 4
                alist.append(manf[idx + 1])
62 4
        assetname = alist[0].split(b": ")[1]
63 4
        assethash = alist[1].split(b": ")[1]
64 4
        return assetname, assethash  # (b"blabla.signed", b"somehash")
65 4
    except (RuntimeError, OSError, zipfile.BadZipFile) as exc:
66 4
        exceptions.handle_exception(exc, "EXTRACTION FAILURE", None)
67
68
69 4
def verify_sha512(filename, inithash):
70
    """
71
    Compare the original hash value with the current.
72
73
    :param filename: Signed file to check.
74
    :type filename: str
75
76
    :param inithash: Original SHA512 hash, as bytestring.
77
    :type inithash: bytes
78
    """
79 4
    sha512 = hashlib.sha512()
80 4
    with open(filename, 'rb') as file:
81 4
        while True:
82 4
            data = file.read(16 * 1024 * 1024)
83 4
            if not data:
84 4
                break
85 4
            sha512.update(data)
86 4
    rawdigest = sha512.digest()  # must be bytestring, not hexadecimalized str
87 4
    b64h = base64.b64encode(rawdigest, altchars=b"-_")  # replace some chars
88 4
    b64h = b64h.strip(b"==")  # remove padding
89 4
    return b64h == inithash
90
91
92 4
def bar_tester(filepath):
93
    """
94
    Use zipfile in order to test a bar for errors.
95
96
    :param filepath: Path to bar file.
97
    :type filepath: str
98
    """
99 4
    try:
100 4
        with zipfile.ZipFile(filepath, "r") as zfile:
101 4
            brokens = zfile.testzip()
102 4
    except zipfile.BadZipFile:
103 4
        brokens = filepath
104 4
    return brokens
105
106
107 4
def remove_empty_folders(a_folder):
108
    """
109
    Remove empty folders in a given folder using os.walk().
110
111
    :param a_folder: Target folder.
112
    :type a_folder: str
113
    """
114 4
    for curdir, subdirs, files in os.walk(a_folder):
115 4
        while True:
116 4
            try:
117 4
                if not subdirs and not files:
118 4
                    os.rmdir(curdir)
119 4
            except OSError:
120
                continue
121 4
            except NotImplementedError:
122 4
                break
123 4
            break
124
125
126 4
def remove_signed_files(a_folder):
127
    """
128
    Remove signed files from a given folder.
129
130
    :param a_folder: Target folder.
131
    :type a_folder: str
132
    """
133 4
    files = [os.path.join(a_folder, file) for file in os.listdir(a_folder)]
134 4
    files = [os.path.abspath(file) for file in files]
135 4
    for file in files:
136 4
        if file.endswith(".signed") and os.path.exists(file):
137 4
            print("REMOVING: {0}".format(os.path.basename(file)))
138 4
            while True:
139 4
                try:
140 4
                    os.remove(os.path.abspath(file))
141
                except PermissionError:
142
                    continue
143
                else:
144 4
                    break
145
146
147 4
def remove_unpacked_loaders(osdir, raddir, radios):
148
    """
149
    Remove uncompressed loader folders.
150
151
    :param osdir: OS loader folder.
152
    :type osdir: str
153
154
    :param raddir: Radio loader folder.
155
    :type raddir: str
156
157
    :param radios: If we made radios this run.
158
    :type radios: bool
159
    """
160 4
    shutil.rmtree(osdir)
161 4
    if radios:
162 4
        shutil.rmtree(raddir)
163
164
165 4
def create_blitz(a_folder, swver):
166
    """
167
    Create a blitz file: a zipped archive of all app/core/radio bars.
168
169
    :param a_folder: Target folder.
170
    :type a_folder: str
171
172
    :param swver: Software version to title the blitz.
173
    :type swver: str
174
    """
175 4
    fname = "Blitz-{0}.zip".format(swver)
176 4
    with zipfile.ZipFile(fname, 'w', zipfile.ZIP_DEFLATED, allowZip64=True) as zfile:
177 4
        for root, dirs, files in os.walk(a_folder):
178 4
            del dirs
179 4
            for file in files:
180 4
                print("ZIPPING: {0}".format(utilities.stripper(file)))
181 4
                abs_filename = os.path.join(root, file)
182 4
                abs_arcname = os.path.basename(abs_filename)
183 4
                zfile.write(abs_filename, abs_arcname)
184
185
186 4
def move_loaders(ldir,
187
                 exedir_os, exedir_rad,
188
                 zipdir_os, zipdir_rad):
189
    """
190
    Move autoloaders to zipped and loaders directories in localdir.
191
192
    :param ldir: Local directory, containing files you wish to move.
193
    :type ldir: str
194
195
    :param exedir_os: Large autoloader .exe destination.
196
    :type exedir_os: str
197
198
    :param exedir_rad: Small autoloader .exe destination.
199
    :type exedir_rad: str
200
201
    :param zipdir_os: Large autoloader archive destination.
202
    :type zipdir_os: str
203
204
    :param zipdir_rad: Small autoloader archive destination.
205
    :type zipdir_rad: str
206
    """
207 4
    arx = bbconstants.ARCS
208 4
    pfx = bbconstants.PREFIXES
209 4
    loaders = [os.path.join(ldir, file) for file in os.listdir(ldir) if utilities.prepends(file, pfx, ".exe")]
210 4
    for file in loaders:
211 4
        print("MOVING: {0}".format(os.path.basename(file)))
212 4
        exedest_os = os.path.join(exedir_os, os.path.basename(file))
213 4
        exedest_rad = os.path.join(exedir_rad, os.path.basename(file))
214 4
        loader_sorter(file, exedest_os, exedest_rad)
215 4
    zippeds = [os.path.join(ldir, file) for file in os.listdir(ldir) if utilities.prepends(file, pfx, arx)]
216 4
    for file in zippeds:
217 4
        print("MOVING: {0}".format(os.path.basename(file)))
218 4
        zipdest_os = os.path.join(zipdir_os, os.path.basename(file))
219 4
        zipdest_rad = os.path.join(zipdir_rad, os.path.basename(file))
220 4
        loader_sorter(file, zipdest_os, zipdest_rad)
221
222
223 4
def loader_sorter(file, osdir, raddir):
224
    """
225
    Sort loaders based on size.
226
227
    :param file: The file to sort. Absolute paths, please.
228
    :type file: str
229
230
    :param osdir: Large file destination.
231
    :type osdir: str
232
233
    :param raddir: Small file destination.
234
    :type raddir: str
235
    """
236 4
    if os.path.getsize(file) > 90000000:
237 4
        while True:
238 4
            try:
239 4
                shutil.move(file, osdir)
240
            except shutil.Error:
241
                os.remove(file)
242
                continue
243 4
            break
244
    else:
245 4
        while True:
246 4
            try:
247 4
                shutil.move(file, raddir)
248
            except shutil.Error:
249
                os.remove(file)
250
                continue
251 4
            break
252
253
254 4
def move_bars(localdir, osdir, radiodir):
255
    """
256
    Move bar files to subfolders of a given folder.
257
258
    :param localdir: Directory to use.
259
    :type localdir: str
260
261
    :param osdir: OS file directory (large bars).
262
    :type osdir: str
263
264
    :param radiodir: Radio file directory (small bars).
265
    :type radiodir: str
266
    """
267 4
    for files in os.listdir(localdir):
268 4
        if files.endswith(".bar"):
269 4
            print("MOVING: {0}".format(files))
270 4
            bardest_os = os.path.join(osdir, files)
271 4
            bardest_radio = os.path.join(radiodir, files)
272
            # even the fattest radio is less than 90MB
273 4
            if os.path.getsize(os.path.join(localdir, files)) > 90000000:
274 4
                try:
275 4
                    shutil.move(os.path.join(localdir, files), osdir)
276
                except shutil.Error:
277
                    os.remove(bardest_os)
278
            else:
279 4
                try:
280 4
                    shutil.move(os.path.join(localdir, files), radiodir)
281
                except shutil.Error:
282
                    os.remove(bardest_radio)
283
284
285 4
def replace_bar_pair(localdir, osfile, radfile):
286
    """
287
    Move pair of OS and radio bars to a given folder.
288
289
    :param localdir: Final bar directory.
290
    :type localdir: str
291
292
    :param osfile: Path to OS file.
293
    :type osfile: str
294
295
    :param radfile: Path to radio file.
296
    :type radfile: str
297
    """
298 4
    shutil.move(osfile, localdir)
299 4
    shutil.move(radfile, localdir)
300
301
302 4
def replace_bars_bulk(localdir, barfiles):
303
    """
304
    Move set of OS and radio bars to a given folder.
305
306
    :param localdir: Final bar directory.
307
    :type localdir: str
308
309
    :param barfiles: List of OS/radio file paths.
310
    :type barfiles: list(str)
311
    """
312 4
    for barfile in barfiles:
313 4
        shutil.move(barfile, os.path.abspath(localdir))
314
315
316 4
def make_folder(localdir, root):
317
    """
318
    Make a folder if it doesn't exist.
319
320
    :param localdir: Top level folder.
321
    :type localdir: str
322
323
    :param root: Folder to create.
324
    :type root: str
325
    """
326 4
    if not os.path.exists(os.path.join(localdir, root)):
327 4
        os.makedirs(os.path.join(localdir, root), exist_ok=True)
328 4
    return os.path.join(localdir, root)
329
330
331 4
def make_dirpairs(localdir, root, osversion, radioversion):
332
    """
333
    Create a pair of directories, with OS/radio versions for names.
334
335
    :param localdir: Top level folder.
336
    :type localdir: str
337
338
    :param root: Name for folder containing OS/radio pairs.
339
    :type root: str
340
341
    :param osversion: OS version.
342
    :type osversion: str
343
344
    :param radioversion: Radio version.
345
    :type radioversion: str
346
    """
347 4
    rootdir = make_folder(localdir, root)
348 4
    osdir = make_folder(rootdir, osversion)
349 4
    radiodir = make_folder(rootdir, radioversion)
350 4
    return osdir, radiodir
351
352
353 4
def make_dirs(localdir, osversion, radioversion):
354
    """
355
    Create the directory tree needed for archivist/lazyloader.
356
357
    :param localdir: Root folder.
358
    :type localdir: str
359
360
    :param osversion: OS version.
361
    :type osversion: str
362
363
    :param radioversion: Radio version.
364
    :type radioversion: str
365
    """
366 4
    os.makedirs(localdir, exist_ok=True)
367 4
    bardir_os, bardir_radio = make_dirpairs(localdir, "bars", osversion, radioversion)
368 4
    loaderdir_os, loaderdir_radio = make_dirpairs(localdir, "loaders", osversion, radioversion)
369 4
    zipdir_os, zipdir_radio = make_dirpairs(localdir, "zipped", osversion, radioversion)
370
    return (bardir_os, bardir_radio, loaderdir_os, loaderdir_radio, zipdir_os, zipdir_radio)
371