Completed
Push — master ( b87cef...9aaddc )
by John
01:21
created

make_dirpairs()   A

Complexity

Conditions 1

Size

Total Lines 20

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 1
c 1
b 0
f 0
dl 0
loc 20
rs 9.4285
1
#!/usr/bin/env python3
2
"""This module is used to operate with bar files."""
3
4
import os  # filesystem read
5
import zipfile  # zip extract, zip compresssion
6
import shutil  # folder operations
7
import base64  # encoding for hashes
8
import hashlib   # get hashes
9
from bbarchivist import utilities  # platform determination
10
from bbarchivist import bbconstants  # premade stuff
11
12
__author__ = "Thurask"
13
__license__ = "WTFPL v2"
14
__copyright__ = "Copyright 2015-2016 Thurask"
15
16
17
def extract_bars(filepath):
18
    """
19
    Extract .signed files from .bar files.
20
    Use system zlib.
21
22
    :param filepath: Path to bar file directory.
23
    :type filepath: str
24
    """
25
    try:
26
        for file in os.listdir(filepath):
27
            if file.endswith(".bar"):
28
                print("EXTRACTING: {0}".format(file))
29
                zfile = zipfile.ZipFile(file, 'r')
30
                names = zfile.namelist()
31
                for name in names:
32
                    if str(name).endswith(".signed"):
33
                        zfile.extract(name, filepath)
34
    except (RuntimeError, OSError) as exc:
35
        print("EXTRACTION FAILURE")
36
        print(str(exc))
37
        print("DID IT DOWNLOAD PROPERLY?")
38
39
40
def retrieve_sha512(filename):
41
    """
42
    Get the premade, Base64 encoded SHA512 hash of a signed file in a bar.
43
44
    :param filename: Bar file to check.
45
    :type filename: str
46
    """
47
    try:
48
        zfile = zipfile.ZipFile(filename, 'r')
49
        names = zfile.namelist()
50
        manifest = None
51
        for name in names:
52
            if name.endswith("MANIFEST.MF"):
53
                manifest = name
54
                break
55
        if manifest is None:
56
            raise SystemExit
57
        manf = zfile.read(manifest).splitlines()
58
        alist = []
59
        for idx, line in enumerate(manf):
60
            if line.endswith(b"signed"):
61
                alist.append(manf[idx])
62
                alist.append(manf[idx + 1])
63
        assetname = alist[0].split(b": ")[1]
64
        assethash = alist[1].split(b": ")[1]
65
        return assetname, assethash  # (b"blabla.signed", b"somehash")
66
    except (RuntimeError, OSError, zipfile.BadZipFile) as exc:
67
        print("EXTRACTION FAILURE")
68
        print(str(exc))
69
        print("DID IT DOWNLOAD PROPERLY?")
70
71
72
def verify_sha512(filename, inithash):
73
    """
74
    Compare the original hash value with the current.
75
76
    :param filename: Signed file to check.
77
    :type filename: str
78
79
    :param inithash: Original SHA512 hash, as bytestring.
80
    :type inithash: bytes
81
    """
82
    sha512 = hashlib.sha512()
83
    with open(filename, 'rb') as file:
84
        while True:
85
            data = file.read(16 * 1024 * 1024)
86
            if not data:
87
                break
88
            sha512.update(data)
89
    rawdigest = sha512.digest()  # must be bytestring, not hexadecimalized str
90
    b64h = base64.b64encode(rawdigest, altchars=b"-_")  # replace some chars
91
    b64h = b64h.strip(b"==")  # remove padding
92
    return b64h == inithash
93
94
95
def bar_tester(filepath):
96
    """
97
    Use zipfile in order to test a bar for errors.
98
99
    :param filepath: Path to bar file.
100
    :type filepath: str
101
    """
102
    try:
103
        with zipfile.ZipFile(filepath, "r") as zfile:
104
            brokens = zfile.testzip()
105
    except zipfile.BadZipFile:
106
        brokens = filepath
107
    return brokens
108
109
110
def remove_empty_folders(a_folder):
111
    """
112
    Remove empty folders in a given folder using os.walk().
113
114
    :param a_folder: Target folder.
115
    :type a_folder: str
116
    """
117
    for curdir, subdirs, files in os.walk(a_folder):
118
        while True:
119
            try:
120
                if not subdirs and not files:
121
                    os.rmdir(curdir)
122
            except OSError:
123
                continue
124
            except NotImplementedError:
125
                break
126
            break
127
128
129
def remove_signed_files(a_folder):
130
    """
131
    Remove signed files from a given folder.
132
133
    :param a_folder: Target folder.
134
    :type a_folder: str
135
    """
136
    files = [os.path.join(a_folder, file) for file in os.listdir(a_folder)]
137
    files = [os.path.abspath(file) for file in files]
138
    for file in files:
139
        if file.endswith(".signed") and os.path.exists(file):
140
            print("REMOVING: {0}".format(os.path.basename(file)))
141
            while True:
142
                try:
143
                    os.remove(os.path.abspath(file))
144
                except PermissionError:
145
                    continue
146
                else:
147
                    break
148
149
150
def remove_unpacked_loaders(osdir, raddir, radios):
151
    """
152
    Remove uncompressed loader folders.
153
154
    :param osdir: OS loader folder.
155
    :type osdir: str
156
157
    :param raddir: Radio loader folder.
158
    :type raddir: str
159
160
    :param radios: If we made radios this run.
161
    :type radios: bool
162
    """
163
    shutil.rmtree(osdir)
164
    if radios:
165
        shutil.rmtree(raddir)
166
167
168
def create_blitz(a_folder, swver):
169
    """
170
    Create a blitz file: a zipped archive of all app/core/radio bars.
171
172
    :param a_folder: Target folder.
173
    :type a_folder: str
174
175
    :param swver: Software version to title the blitz.
176
    :type swver: str
177
    """
178
    fname = "Blitz-{0}.zip".format(swver)
179
    with zipfile.ZipFile(fname, 'w', zipfile.ZIP_DEFLATED, allowZip64=True) as zfile:
180
        for root, dirs, files in os.walk(a_folder):
181
            del dirs
182
            for file in files:
183
                print("ZIPPING: {0}".format(utilities.stripper(file)))
184
                abs_filename = os.path.join(root, file)
185
                abs_arcname = os.path.basename(abs_filename)
186
                zfile.write(abs_filename, abs_arcname)
187
188
189
def move_loaders(a_dir,
190
                 exedir_os, exedir_rad,
191
                 zipdir_os, zipdir_rad):
192
    """
193
    Move autoloaders to zipped and loaders directories in localdir.
194
195
    :param a_dir: Local directory, containing files you wish to move.
196
    :type a_dir: str
197
198
    :param exedir_os: Large autoloader .exe destination.
199
    :type exedir_os: str
200
201
    :param exedir_rad: Small autoloader .exe destination.
202
    :type exedir_rad: str
203
204
    :param zipdir_os: Large autoloader archive destination.
205
    :type zipdir_os: str
206
207
    :param zipdir_rad: Small autoloader archive destination.
208
    :type zipdir_rad: str
209
    """
210
    arx = bbconstants.ARCS
211
    pfx = bbconstants.PREFIXES
212
    loaders = [file for file in os.listdir(a_dir) if utilities.prepends(file, pfx, ".exe")]
213
    for file in loaders:
214
        print("MOVING: {0}".format(file))
215
        exedest_os = os.path.join(exedir_os, file)
216
        exedest_rad = os.path.join(exedir_rad, file)
217
        loader_sorter(file, exedest_os, exedest_rad)
218
    zippeds = [file for file in os.listdir(a_dir) if utilities.prepends(file, pfx, arx)]
219
    for file in zippeds:
220
        print("MOVING: {0}".format(file))
221
        zipdest_os = os.path.join(zipdir_os, file)
222
        zipdest_rad = os.path.join(zipdir_rad, file)
223
        loader_sorter(file, zipdest_os, zipdest_rad)
224
225
226
def loader_sorter(file, osdir, raddir):
227
    """
228
    Sort loaders based on size.
229
230
    :param file: The file to sort. Absolute paths, please.
231
    :type file: str
232
233
    :param osdir: Large file destination.
234
    :type osdir: str
235
236
    :param raddir: Small file destination.
237
    :type raddir: str
238
    """
239
    if os.path.getsize(file) > 90000000:
240
        while True:
241
            try:
242
                shutil.move(file, osdir)
243
            except shutil.Error:
244
                os.remove(file)
245
                continue
246
            break
247
    else:
248
        while True:
249
            try:
250
                shutil.move(file, raddir)
251
            except shutil.Error:
252
                os.remove(file)
253
                continue
254
            break
255
256
257
def move_bars(localdir, osdir, radiodir):
258
    """
259
    Move bar files to subfolders of a given folder.
260
261
    :param localdir: Directory to use.
262
    :type localdir: str
263
264
    :param osdir: OS file directory (large bars).
265
    :type osdir: str
266
267
    :param radiodir: Radio file directory (small bars).
268
    :type radiodir: str
269
    """
270
    for files in os.listdir(localdir):
271
        if files.endswith(".bar"):
272
            print("MOVING: {0}".format(files))
273
            bardest_os = os.path.join(osdir, files)
274
            bardest_radio = os.path.join(radiodir, files)
275
            # even the fattest radio is less than 90MB
276
            if os.path.getsize(os.path.join(localdir, files)) > 90000000:
277
                try:
278
                    shutil.move(os.path.join(localdir, files), osdir)
279
                except shutil.Error:
280
                    os.remove(bardest_os)
281
            else:
282
                try:
283
                    shutil.move(os.path.join(localdir, files), radiodir)
284
                except shutil.Error:
285
                    os.remove(bardest_radio)
286
287
288
def make_folder(localdir, root):
289
    """
290
    Make a folder if it doesn't exist.
291
292
    :param localdir: Top level folder.
293
    :type localdir: str
294
295
    :param root: Folder to create.
296
    :type root: str
297
    """
298
    if not os.path.exists(os.path.join(localdir, root)):
299
        os.makedirs(os.path.join(localdir, root), exist_ok=True)
300
    return os.path.join(localdir, root)
301
302
303
def make_dirpairs(localdir, root, osversion, radioversion):
304
    """
305
    Create a pair of directories, with OS/radio versions for names.
306
307
    :param localdir: Top level folder.
308
    :type localdir: str
309
310
    :param root: Name for folder containing OS/radio pairs.
311
    :type root: str
312
313
    :param osversion: OS version.
314
    :type osversion: str
315
316
    :param radioversion: Radio version.
317
    :type radioversion: str
318
    """
319
    rootdir = make_folder(localdir, root)
320
    osdir = make_folder(rootdir, osversion)
321
    radiodir = make_folder(rootdir, radioversion)
322
    return osdir, radiodir
323
324
325
def make_dirs(localdir, osversion, radioversion):
326
    """
327
    Create the directory tree needed for archivist/lazyloader.
328
329
    :param localdir: Root folder.
330
    :type localdir: str
331
332
    :param osversion: OS version.
333
    :type osversion: str
334
335
    :param radioversion: Radio version.
336
    :type radioversion: str
337
    """
338
    os.makedirs(localdir, exist_ok=True)
339
    bardir_os, bardir_radio = make_dirpairs(localdir, "bars", osversion, radioversion)
340
    loaderdir_os, loaderdir_radio = make_dirpairs(localdir, "loaders", osversion, radioversion)
341
    zipdir_os, zipdir_radio = make_dirpairs(localdir, "zipped", osversion, radioversion)
342
    return (bardir_os, bardir_radio, loaderdir_os, loaderdir_radio, zipdir_os, zipdir_radio)
343