Completed
Push — master ( de6c4c...cc1262 )
by John
03:54
created

verify_bulk_loaders()   A

Complexity

Conditions 2

Size

Total Lines 13

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 6
CRAP Score 2

Importance

Changes 3
Bugs 0 Features 0
Metric Value
cc 2
c 3
b 0
f 0
dl 0
loc 13
ccs 6
cts 6
cp 1
crap 2
rs 9.4285
1
#!/usr/bin/env python3
2 5
"""This module is used for miscellaneous utilities."""
3
4 5
import os  # path work
5 5
import argparse  # argument parser for filters
6 5
import platform  # platform info
7 5
import glob  # cap grabbing
8 5
import hashlib  # base url creation
9 5
import threading  # get thread for spinner
10 5
import time  # spinner delay
11 5
import sys  # streams, version info
12 5
import itertools  # spinners gonna spin
13 5
import subprocess  # loader verification
14 5
from bbarchivist import bbconstants  # cap location, version, filename bits
15 5
from bbarchivist import compat  # backwards compat
16 5
from bbarchivist import dummy  # useless stdout
17 5
from bbarchivist import exceptions  # exceptions
18 5
from bbarchivist import iniconfig  # config parsing
19
20 5
__author__ = "Thurask"
21 5
__license__ = "WTFPL v2"
22 5
__copyright__ = "2015-2017 Thurask"
23
24
25 5
def grab_datafile(datafile):
26
    """
27
    Figure out where a datafile is.
28
29
    :param datafile: Datafile to check.
30
    :type datafile: bbconstants.Datafile
31
    """
32 5
    try:
33 5
        afile = glob.glob(os.path.join(os.getcwd(), datafile.filename))[0]
34 5
    except IndexError:
35 5
        afile = datafile.location if datafile.name == "cfp" else grab_capini(datafile)
36 5
    return os.path.abspath(afile)
37
38
39 5
def grab_capini(datafile):
40
    """
41
    Get cap location from .ini file, and write if it's new.
42
43
    :param datafile: Datafile to check.
44
    :type datafile: bbconstants.Datafile
45
    """
46
    try:
47
        apath = cappath_config_loader()
48
        afile = glob.glob(apath)[0]
49
    except IndexError:
50
        cappath_config_writer(datafile.location)
51
        return bbconstants.CAP.location  # no ini cap
52
    else:
53
        cappath_config_writer(os.path.abspath(afile))
54
        return os.path.abspath(afile)  # ini cap:
55
56
57 5
def grab_cap():
58
    """
59
    Figure out where cap is, local, specified or system-supplied.
60
    """
61 5
    return grab_datafile(bbconstants.CAP)
62
63
64 5
def grab_cfp():
65
    """
66
    Figure out where cfp is, local or system-supplied.
67
    """
68 5
    return grab_datafile(bbconstants.CFP)
69
70
71 5
def new_enough(minver):
72
    """
73
    Check if we're at or above a minimum Python version.
74
75
    :param minver: Minimum Python version (3.minver).
76
    :type minver: int
77
    """
78 5
    return False if minver > sys.version_info[1] else True
79
80
81 5
def dirhandler(directory, defaultdir):
82
    """
83
    If directory is None, turn it into defaultdir.
84
85
    :param directory: Target directory.
86
    :type directory: str
87
88
    :param defaultdir: Default directory.
89
    :type defaultdir: str
90
    """
91 5
    directory = defaultdir if directory is None else directory
92 5
    return directory
93
94
95 5
def fsizer(file_size):
96
    """
97
    Raw byte file size to human-readable string.
98
99
    :param file_size: Number to parse.
100
    :type file_size: float
101
    """
102 5
    if file_size is None:
103 5
        file_size = 0
104 5
    fsize = float(file_size)
105 5
    for sfix in ['B', 'kB', 'MB', 'GB', 'TB', 'PB', 'EB', 'ZB']:
106 5
        if fsize < 1024.0:
107 5
            size = "{0:3.2f}{1}".format(fsize, sfix)
108 5
            break
109
        else:
110 5
            fsize /= 1024.0
111
    else:
112 5
        size = "{0:3.2f}{1}".format(fsize, 'YB')
113 5
    return size
114
115
116 5
def signed_file_args(files):
117
    """
118
    Check if there are between 1 and 6 files supplied to argparse.
119
120
    :param files: List of signed files, between 1 and 6 strings.
121
    :type files: list(str)
122
    """
123 5
    filelist = [file for file in files if file]
124 5
    if not 1 <= len(filelist) <= 6:
125 5
        raise argparse.ArgumentError(argument=None, message="Requires 1-6 signed files")
126 5
    return files
127
128
129 5
def file_exists(file):
130
    """
131
    Check if file exists, raise argparse error if it doesn't.
132
133
    :param file: Path to a file, including extension.
134
    :type file: str
135
    """
136 5
    if not os.path.exists(file):
137 5
        raise argparse.ArgumentError(argument=None, message="{0} not found.".format(file))
138 5
    return file
139
140
141 5
def positive_integer(input_int):
142
    """
143
    Check if number > 0, raise argparse error if it isn't.
144
145
    :param input_int: Integer to check.
146
    :type input_int: str
147
    """
148 5
    if int(input_int) <= 0:
149 5
        info = "{0} is not >=0.".format(str(input_int))
150 5
        raise argparse.ArgumentError(argument=None, message=info)
151 5
    return int(input_int)
152
153
154 5
def valid_method(method):
155
    """
156
    Check if compression method is valid, raise argparse error if it isn't.
157
158
    :param method: Compression method to check.
159
    :type method: str
160
    """
161 5
    methodlist = bbconstants.METHODS
162 5
    if not new_enough(3):
163 5
        methodlist = [x for x in bbconstants.METHODS if x != "txz"]
164 5
    if method not in methodlist:
165 5
        info = "Invalid method {0}.".format(method)
166 5
        raise argparse.ArgumentError(argument=None, message=info)
167 5
    return method
168
169
170 5
def valid_carrier(mcc_mnc):
171
    """
172
    Check if MCC/MNC is valid (1-3 chars), raise argparse error if it isn't.
173
174
    :param mcc_mnc: MCC/MNC to check.
175
    :type mcc_mnc: str
176
    """
177 5
    if not str(mcc_mnc).isdecimal():
178 5
        infod = "Non-integer {0}.".format(str(mcc_mnc))
179 5
        raise argparse.ArgumentError(argument=None, message=infod)
180 5
    if len(str(mcc_mnc)) > 3 or len(str(mcc_mnc)) == 0:
181 5
        infol = "{0} is invalid.".format(str(mcc_mnc))
182 5
        raise argparse.ArgumentError(argument=None, message=infol)
183
    else:
184 5
        return mcc_mnc
185
186
187 5
def escreens_pin(pin):
188
    """
189
    Check if given PIN is valid, raise argparse error if it isn't.
190
191
    :param pin: PIN to check.
192
    :type pin: str
193
    """
194 5
    if len(pin) == 8:
195 5
        try:
196 5
            int(pin, 16)  # hexadecimal-ness
197 5
        except ValueError:
198 5
            raise argparse.ArgumentError(argument=None, message="Invalid PIN.")
199
        else:
200 5
            return pin.lower()
201
    else:
202 5
        raise argparse.ArgumentError(argument=None, message="Invalid PIN.")
203
204
205 5
def escreens_duration(duration):
206
    """
207
    Check if Engineering Screens duration is valid.
208
209
    :param duration: Duration to check.
210
    :type duration: int
211
    """
212 5
    if int(duration) in (1, 3, 6, 15, 30):
213 5
        return int(duration)
214
    else:
215 5
        raise argparse.ArgumentError(argument=None, message="Invalid duration.")
216
217
218 5
def droidlookup_hashtype(method):
219
    """
220
    Check if Android autoloader lookup hash type is valid.
221
222
    :param method: None for regular OS links, "sha256/512" for SHA256 or 512 hash.
223
    :type method: str
224
    """
225 5
    if method.lower() in ("sha512", "sha256"):
226 5
        return method.lower()
227
    else:
228 5
        raise argparse.ArgumentError(argument=None, message="Invalid type.")
229
230
231 5
def droidlookup_devicetype(device):
232
    """
233
    Check if Android autoloader device type is valid.
234
235
    :param device: Android autoloader types to check.
236
    :type device: str
237
    """
238 5
    devices = ("Priv", "DTEK50", "DTEK60", "KEYone", "Aurora")
239 5
    if device is None:
240 5
        return None
241
    else:
242 5
        for dev in devices:
243 5
            if dev.lower() == device.lower():
244 5
                return dev
245 5
        raise argparse.ArgumentError(argument=None, message="Invalid device.")
246
247
248 5
def s2b(input_check):
249
    """
250
    Return Boolean interpretation of string input.
251
252
    :param input_check: String to check if it means True or False.
253
    :type input_check: str
254
    """
255 5
    return str(input_check).lower() in ("yes", "true", "t", "1", "y")
256
257
258 5
def is_amd64():
259
    """
260
    Check if script is running on an AMD64 system (Python can be 32/64, this is for subprocess)
261
    """
262 5
    return platform.machine().endswith("64")
263
264
265 5
def is_windows():
266
    """
267
    Check if script is running on Windows.
268
    """
269 5
    return platform.system() == "Windows"
270
271
272 5
def talkaprint(msg, talkative=False):
273
    """
274
    Print only if asked to.
275
276
    :param msg: Message to print.
277
    :type msg: str
278
279
    :param talkative: Whether to output to screen. False by default.
280
    :type talkative: bool
281
    """
282 5
    if talkative:
283 5
        print(msg)
284
285
286 5
def get_seven_zip(talkative=False):
287
    """
288
    Return name of 7-Zip executable.
289
    On POSIX, it MUST be 7za.
290
    On Windows, it can be installed or supplied with the script.
291
    :func:`win_seven_zip` is used to determine if it's installed.
292
293
    :param talkative: Whether to output to screen. False by default.
294
    :type talkative: bool
295
    """
296 5
    return win_seven_zip(talkative) if is_windows() else "7za"
297
298
299 5
def win_seven_zip(talkative=False):
300
    """
301
    For Windows, check where 7-Zip is ("where", pretty much).
302
    Consult registry first for any installed instances of 7-Zip.
303
304
    :param talkative: Whether to output to screen. False by default.
305
    :type talkative: bool
306
    """
307
    talkaprint("CHECKING INSTALLED FILES...", talkative)
308
    try:
309
        import winreg  # windows registry
310
        hk7z = winreg.OpenKey(winreg.HKEY_CURRENT_USER, "Software\\7-Zip")
311
        path = winreg.QueryValueEx(hk7z, "Path")
312
    except OSError as exc:
313
        if talkative:
314
            exceptions.handle_exception(exc, xit=None)
315
        talkaprint("TRYING LOCAL FILES...", talkative)
316
        return win_seven_zip_local(talkative)
317
    else:
318
        talkaprint("7ZIP USING INSTALLED FILES", talkative)
319
        return '"{0}"'.format(os.path.join(path[0], "7z.exe"))
320
321
322 5
def win_seven_zip_local(talkative=False):
323
    """
324
    If 7-Zip isn't in the registry, fall back onto supplied executables.
325
    If *those* aren't there, return "error".
326
327
    :param talkative: Whether to output to screen. False by default.
328
    :type talkative: bool
329
    """
330
    filecount = len([x for x in os.listdir(os.getcwd()) if x in ["7za.exe", "7z.exe"]])
331
    if filecount == 2:
332
        talkaprint("7ZIP USING LOCAL FILES", talkative)
333
        szexe = "7za.64.exe" if is_amd64() else "7za.exe"
334
    else:
335
        talkaprint("NO LOCAL FILES", talkative)
336
        szexe = "error"
337
    return szexe
338
339
340 5
def get_core_count():
341
    """
342
    Find out how many CPU cores this system has.
343
    """
344 5
    try:
345 5
        cores = str(compat.enum_cpus())  # 3.4 and up
346
    except NotImplementedError:
347
        cores = "1"  # 3.2-3.3
348
    else:
349 5
        if compat.enum_cpus() is None:
350
            cores = "1"
351 5
    return cores
352
353
354 5
def prep_seven_zip_path(path, talkative=False):
355
    """
356
    Print p7zip path on POSIX, or notify if not there.
357
358
    :param path: Path to use.
359
    :type path: str
360
361
    :param talkative: Whether to output to screen. False by default.
362
    :type talkative: bool
363
    """
364 5
    if path is None:
365 5
        talkaprint("NO 7ZIP\nPLEASE INSTALL p7zip", talkative)
366 5
        return False
367
    else:
368 5
        talkaprint("7ZIP FOUND AT {0}".format(path), talkative)
369 5
        return True
370
371
372 5
def prep_seven_zip_posix(talkative=False):
373
    """
374
    Check for p7zip on POSIX.
375
376
    :param talkative: Whether to output to screen. False by default.
377
    :type talkative: bool
378
    """
379 5
    try:
380 5
        path = compat.where_which("7za")
381 5
    except ImportError:
382 5
        talkaprint("PLEASE INSTALL SHUTILWHICH WITH PIP", talkative)
383 5
        return False
384
    else:
385 5
        return prep_seven_zip_path(path, talkative)
386
387
388 5
def prep_seven_zip(talkative=False):
389
    """
390
    Check for presence of 7-Zip.
391
    On POSIX, check for p7zip.
392
    On Windows, check for 7-Zip.
393
394
    :param talkative: Whether to output to screen. False by default.
395
    :type talkative: bool
396
    """
397 5
    if is_windows():
398
        return get_seven_zip(talkative) != "error"
399
    else:
400 5
        return prep_seven_zip_posix(talkative)
401
402
403 5
def increment(version, inc=3):
404
    """
405
    Increment version by given number. For repeated lookups.
406
407
    :param version: w.x.y.ZZZZ, becomes w.x.y.(ZZZZ + increment).
408
    :type version: str
409
410
    :param inc: What to increment by. Default is 3.
411
    :type inc: str
412
    """
413 5
    splitos = version.split(".")
414 5
    splitos[3] = int(splitos[3])
415 5
    if splitos[3] > 9996:  # prevent overflow
416 5
        splitos[3] = 0
417 5
    splitos[3] += int(inc)
418 5
    splitos[3] = str(splitos[3])
419 5
    return ".".join(splitos)
420
421
422 5
def stripper(name):
423
    """
424
    Strip fluff from bar filename.
425
426
    :param name: Bar filename, must contain '-nto+armle-v7+signed.bar'.
427
    :type name: str
428
    """
429 5
    return name.replace("-nto+armle-v7+signed.bar", "")
430
431
432 5
def create_base_url(softwareversion):
433
    """
434
    Make the root URL for production server files.
435
436
    :param softwareversion: Software version to hash.
437
    :type softwareversion: str
438
    """
439
    # Hash software version
440 5
    swhash = hashlib.sha1(softwareversion.encode('utf-8'))
441 5
    hashedsoftwareversion = swhash.hexdigest()
442
    # Root of all urls
443 5
    baseurl = "http://cdn.fs.sl.blackberry.com/fs/qnx/production/{0}".format(hashedsoftwareversion)
444 5
    return baseurl
445
446
447 5
def format_app_name(appname):
448
    """
449
    Convert long reverse DNS name to short name.
450
451
    :param appname: Application name (ex. sys.pim.calendar -> "calendar")
452
    :type appname: str
453
    """
454 5
    final = appname.split(".")[-1]
455 5
    return final
456
457
458 5
def create_bar_url(softwareversion, appname, appversion, clean=False):
459
    """
460
    Make the URL for any production server file.
461
462
    :param softwareversion: Software version to hash.
463
    :type softwareversion: str
464
465
    :param appname: Application name, preferably like on server.
466
    :type appname: str
467
468
    :param appversion: Application version.
469
    :type appversion: str
470
471
    :param clean: Whether or not to clean up app name. Default is False.
472
    :type clean: bool
473
    """
474 5
    baseurl = create_base_url(softwareversion)
475 5
    if clean:
476 5
        appname = format_app_name(appname)
477 5
    return "{0}/{1}-{2}-nto+armle-v7+signed.bar".format(baseurl, appname, appversion)
478
479
480 5
def generate_urls(softwareversion, osversion, radioversion, core=False):
481
    """
482
    Generate a list of OS URLs and a list of radio URLs based on input.
483
484
    :param softwareversion: Software version to hash.
485
    :type softwareversion: str
486
487
    :param osversion: OS version.
488
    :type osversion: str
489
490
    :param radioversion: Radio version.
491
    :type radioversion: str
492
493
    :param core: Whether or not to return core URLs as well.
494
    :type core: bool
495
    """
496 5
    osurls = [
497
        create_bar_url(softwareversion, "winchester.factory_sfi.desktop", osversion),
498
        create_bar_url(softwareversion, "qc8960.factory_sfi.desktop", osversion),
499
        create_bar_url(softwareversion, "qc8960.factory_sfi.desktop", osversion),
500
        create_bar_url(softwareversion, "qc8974.factory_sfi.desktop", osversion)
501
    ]
502 5
    radiourls = [
503
        create_bar_url(softwareversion, "m5730", radioversion),
504
        create_bar_url(softwareversion, "qc8960", radioversion),
505
        create_bar_url(softwareversion, "qc8960.omadm", radioversion),
506
        create_bar_url(softwareversion, "qc8960.wtr", radioversion),
507
        create_bar_url(softwareversion, "qc8960.wtr5", radioversion),
508
        create_bar_url(softwareversion, "qc8930.wtr5", radioversion),
509
        create_bar_url(softwareversion, "qc8974.wtr2", radioversion)
510
    ]
511 5
    coreurls = []
512 5
    osurls, radiourls = filter_urls(osurls, radiourls, osversion)
513 5
    if core:
514 5
        coreurls = [x.replace(".desktop", "") for x in osurls]
515 5
    return osurls, radiourls, coreurls
516
517
518 5
def newer_103(splitos, third):
519
    """
520
    Return True if given split OS version is 10.3.X or newer.
521
522
    :param splitos: OS version, split on the dots: [10, 3, 3, 2205]
523
    :type: list(int)
524
525
    :param third: The X in 10.3.X.
526
    :type third: int
527
    """
528 5
    newer = True if ((splitos[1] >= 4) or (splitos[1] == 3 and splitos[2] >= third)) else False
529 5
    return newer
530
531
532 5
def filter_urls(osurls, radiourls, osversion):
533
    """
534
    Filter lists of OS and radio URLs.
535
536
    :param osurls: List of OS URLs.
537
    :type osurls: list(str)
538
539
    :param radiourls: List of radio URLs.
540
    :type radiourls: list(str)
541
542
    :param osversion: OS version.
543
    :type osversion: str
544
    """
545 5
    splitos = [int(i) for i in osversion.split(".")]
546 5
    osurls[2] = filter_1031(osurls[2], splitos, 5)  # Z3 10.3.1+
547 5
    osurls[3] = filter_1031(osurls[3], splitos, 6)  # Passport 10.3.1+
548 5
    osurls[0], radiourls[0] = pop_stl1(osurls[0], radiourls[0], splitos)  # STL100-1 10.3.3+
549 5
    return osurls, radiourls
550
551
552 5
def filter_1031(osurl, splitos, device):
553
    """
554
    Modify URLs to reflect changes in 10.3.1.
555
556
    :param osurl: OS URL to modify.
557
    :type osurl: str
558
559
    :param splitos: OS version, split and cast to int: [10, 3, 2, 2876]
560
    :type splitos: list(int)
561
562
    :param device: Device to use.
563
    :type device: int
564
    """
565 5
    if newer_103(splitos, 1):
566 5
        filterdict = {5: ("qc8960.factory_sfi", "qc8960.factory_sfi_hybrid_qc8x30"), 6: ("qc8974.factory_sfi", "qc8960.factory_sfi_hybrid_qc8974")}
567 5
        osurl = filter_osversion(osurl, device, filterdict)
568 5
    return osurl
569
570
571 5
def pop_stl1(osurl, radiourl, splitos):
572
    """
573
    Replace STL100-1 links in 10.3.3+.
574
575
    :param osurl: OS URL to modify.
576
    :type osurl: str
577
578
    :param radiourl: Radio URL to modify.
579
    :type radiourl: str
580
581
    :param splitos: OS version, split and cast to int: [10, 3, 3, 2205]
582
    :type splitos: list(int)
583
    """
584 5
    if newer_103(splitos, 3):
585 5
        osurl = osurl.replace("winchester", "qc8960")  # duplicates get filtered out later
586 5
        radiourl = radiourl.replace("m5730", "qc8960")
587 5
    return osurl, radiourl
588
589
590 5
def filter_osversion(osurl, device, filterdict):
591
    """
592
    Modify URLs based on device index and dictionary of changes.
593
594
    :param osurl: OS URL to modify.
595
    :type osurl: str
596
597
    :param device: Device to use.
598
    :type device: int
599
600
    :param filterdict: Dictionary of changes: {device : (before, after)}
601
    :type filterdict: dict(int:(str, str))
602
    """
603 5
    if device in filterdict.keys():
604 5
        osurl = osurl.replace(filterdict[device][0], filterdict[device][1])
605 5
    return osurl
606
607
608 5
def generate_lazy_urls(softwareversion, osversion, radioversion, device):
609
    """
610
    Generate a pair of OS/radio URLs based on input.
611
612
    :param softwareversion: Software version to hash.
613
    :type softwareversion: str
614
615
    :param osversion: OS version.
616
    :type osversion: str
617
618
    :param radioversion: Radio version.
619
    :type radioversion: str
620
621
    :param device: Device to use.
622
    :type device: int
623
    """
624 5
    splitos = [int(i) for i in osversion.split(".")]
625 5
    rads = ["m5730", "qc8960", "qc8960.omadm", "qc8960.wtr",
626
            "qc8960.wtr5", "qc8930.wtr4", "qc8974.wtr2"]
627 5
    oses = ["winchester.factory", "qc8960.factory", "qc8960.verizon",
628
            "qc8974.factory"]
629 5
    maps = {0:0, 1:1, 2:2, 3:1, 4:1, 5:1, 6:3}
630 5
    osurl = create_bar_url(softwareversion, "{0}_sfi.desktop".format(oses[maps[device]]), osversion)
631 5
    radiourl = create_bar_url(softwareversion, rads[device], radioversion)
632 5
    osurl = filter_1031(osurl, splitos, device)
633 5
    return osurl, radiourl
634
635
636 5
def bulk_urls(softwareversion, osversion, radioversion, core=False, altsw=None):
637
    """
638
    Generate all URLs, plus extra Verizon URLs.
639
640
    :param softwareversion: Software version to hash.
641
    :type softwareversion: str
642
643
    :param osversion: OS version.
644
    :type osversion: str
645
646
    :param radioversion: Radio version.
647
    :type radioversion: str
648
649
    :param device: Device to use.
650
    :type device: int
651
652
    :param core: Whether or not to return core URLs as well.
653
    :type core: bool
654
655
    :param altsw: Radio software release, if not the same as OS.
656
    :type altsw: str
657
    """
658 5
    baseurl = create_base_url(softwareversion)
659 5
    osurls, radurls, coreurls = generate_urls(softwareversion, osversion, radioversion, core)
660 5
    vzwos, vzwrad = generate_lazy_urls(softwareversion, osversion, radioversion, 2)
661 5
    osurls.append(vzwos)
662 5
    radurls.append(vzwrad)
663 5
    vzwcore = vzwos.replace("sfi.desktop", "sfi")
664 5
    if core:
665 5
        coreurls.append(vzwcore)
666 5
    osurls = list(set(osurls))  # pop duplicates
667 5
    radurls = list(set(radurls))
668 5
    if core:
669 5
        coreurls = list(set(coreurls))
670 5
    if altsw is not None:
671 5
        altbase = create_base_url(altsw)
672 5
        radiourls2 = []
673 5
        for rad in radurls:
674 5
            radiourls2.append(rad.replace(baseurl, altbase))
675 5
        radurls = radiourls2
676 5
        del radiourls2
677 5
    return osurls, coreurls, radurls
678
679
680 5
def line_begin():
681
    """
682
    Go to beginning of line, to overwrite whatever's there.
683
    """
684 5
    sys.stdout.write("\r")
685 5
    sys.stdout.flush()
686
687
688 5
def spinner_clear():
689
    """
690
    Get rid of any spinner residue left in stdout.
691
    """
692 5
    sys.stdout.write("\b \b")
693 5
    sys.stdout.flush()
694
695
696 5
class Spinner(object):
697
    """
698
    A basic spinner using itertools. No need for progress.
699
    """
700
701 5
    def __init__(self):
702 5
        self.wheel = itertools.cycle(['-', '/', '|', '\\'])
703 5
        self.file = dummy.UselessStdout()
704
705 5
    def after(self):
706
        """
707
        Iterate over itertools.cycle, write to file.
708
        """
709 5
        try:
710 5
            self.file.write(next(self.wheel))
711 5
            self.file.flush()
712 5
            self.file.write("\b\r")
713 5
            self.file.flush()
714
        except (KeyboardInterrupt, SystemExit):
715
            self.stop()
716
717 5
    def stop(self):
718
        """
719
        Kill output.
720
        """
721 5
        self.file = dummy.UselessStdout()
722
723
724 5
class SpinManager(object):
725
    """
726
    Wraps around the itertools spinner, runs it in another thread.
727
    """
728
729 5
    def __init__(self):
730 5
        spinner = Spinner()
731 5
        self.spinner = spinner
732 5
        self.thread = threading.Thread(target=self.loop, args=())
733 5
        self.thread.daemon = True
734 5
        self.scanning = False
735 5
        self.spinner.file = dummy.UselessStdout()
736
737 5
    def start(self):
738
        """
739
        Begin the spinner.
740
        """
741 5
        self.spinner.file = sys.stderr
742 5
        self.scanning = True
743 5
        self.thread.start()
744
745 5
    def loop(self):
746
        """
747
        Spin if scanning, clean up if not.
748
        """
749 5
        while self.scanning:
750 5
            time.sleep(0.5)
751 5
            try:
752 5
                line_begin()
753 5
                self.spinner.after()
754
            except (KeyboardInterrupt, SystemExit):
755
                self.scanning = False
756
                self.stop()
757
758 5
    def stop(self):
759
        """
760
        Stop the spinner.
761
        """
762 5
        self.spinner.stop()
763 5
        self.scanning = False
764 5
        spinner_clear()
765 5
        line_begin()
766 5
        if not is_windows():
767 5
            print("\n")
768
769
770 5
def return_and_delete(target):
771
    """
772
    Read text file, then delete it. Return contents.
773
774
    :param target: Text file to read.
775
    :type target: str
776
    """
777 5
    with open(target, "r") as thefile:
778 5
        content = thefile.read()
779 5
    os.remove(target)
780 5
    return content
781
782
783 5
def verify_loader_integrity(loaderfile):
784
    """
785
    Test for created loader integrity. Windows-only.
786
787
    :param loaderfile: Path to loader.
788
    :type loaderfile: str
789
    """
790 5
    if not is_windows():
791 5
        pass
792
    else:
793 5
        excode = None
794 5
        try:
795 5
            with open(os.devnull, 'rb') as dnull:
796 5
                cmd = "{0} fileinfo".format(loaderfile)
797 5
                excode = subprocess.call(cmd, stdout=dnull, stderr=subprocess.STDOUT)
798 5
        except OSError:
799 5
            excode = -1
800 5
        return excode == 0  # 0 if OK, non-zero if something broke
801
802
803 5
def bulkfilter_printer(afile):
804
    """
805
    Print filename and verify a loader file.
806
807
    :param afile: Path to file.
808
    :type afile: str
809
    """
810 5
    print("TESTING: {0}".format(os.path.basename(afile)))
811 5
    if not verify_loader_integrity(afile):
812 5
        return os.path.basename(afile)
813
814
815 5
def bulkfilter(files):
816
    """
817
    Verify all loader files in a given list.
818
819
    :param files: List of files.
820
    :type files: list(str)
821
    """
822 5
    brokens = [bulkfilter_printer(file) for file in files if prepends(os.path.basename(file), bbconstants.PREFIXES, ".exe")]
823 5
    return brokens
824
825
826 5
def verify_bulk_loaders(ldir):
827
    """
828
    Run :func:`verify_loader_integrity` for all files in a dir.
829
830
    :param ldir: Directory to use.
831
    :type ldir: str
832
    """
833 5
    if not is_windows():
834 5
        pass
835
    else:
836 5
        files = verify_bulk_loaders_filefilter(ldir)
837 5
        brokens = verify_bulk_loaders_brokenfilter(files)
838 5
        return brokens
839
840
841 5
def verify_bulk_loaders_filefilter(ldir):
842
    """
843
    Prepare file names for :func:`verify_bulk_loaders`.
844
845
    :param ldir: Directory to use.
846
    :type ldir: str
847
    """
848 5
    files = [os.path.join(ldir, file) for file in os.listdir(ldir) if not os.path.isdir(file)]
849 5
    return files
850
851
852 5
def verify_bulk_loaders_brokenfilter(files):
0 ignored issues
show
Coding Style Naming introduced by
The name verify_bulk_loaders_brokenfilter does not conform to the function naming conventions ([a-z_][a-z0-9_]{2,30}$).

This check looks for invalid names for a range of different identifiers.

You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements.

If your project includes a Pylint configuration file, the settings contained in that file take precedence.

To find out more about Pylint, please refer to their site.

Loading history...
853
    """
854
    Prepare filtered file list for :func:`verify_bulk_loaders`.
855
856
    :param files: List of files.
857
    :type files: list(str)
858
    """
859 5
    brokens = [file for file in bulkfilter(files) if file]
860 5
    return brokens
861
862
863 5
def list_workers(input_data, workerlimit):
864
    """
865
    Count number of threads, either length of iterable or provided limit.
866
867
    :param input_data: Input data, some iterable.
868
    :type input_data: list
869
870
    :param workerlimit: Maximum number of workers.
871
    :type workerlimit: int
872
    """
873 5
    runners = len(input_data) if len(input_data) < workerlimit else workerlimit
874 5
    return runners
875
876
877 5
def cpu_workers(input_data):
878
    """
879
    Count number of CPU workers, smaller of number of threads and length of data.
880
881
    :param input_data: Input data, some iterable.
882
    :type input_data: list
883
    """
884 5
    return list_workers(input_data, compat.enum_cpus())
885
886
887 5
def prep_logfile():
888
    """
889
    Prepare log file, labeling it with current date. Select folder based on frozen status.
890
    """
891 5
    logfile = "{0}.txt".format(time.strftime("%Y_%m_%d_%H%M%S"))
892 5
    root = os.getcwd() if getattr(sys, 'frozen', False) else os.path.expanduser("~")
893 5
    basefolder = os.path.join(root, "lookuplogs")
894 5
    os.makedirs(basefolder, exist_ok=True)
895 5
    record = os.path.join(basefolder, logfile)
896 5
    open(record, "w").close()
897 5
    return record
898
899
900 5
def prepends(file, pre, suf):
901
    """
902
    Check if filename starts with/ends with stuff.
903
904
    :param file: File to check.
905
    :type file: str
906
907
    :param pre: Prefix(es) to check.
908
    :type pre: str or list or tuple
909
910
    :param suf: Suffix(es) to check.
911
    :type suf: str or list or tuple
912
    """
913 5
    return file.startswith(pre) and file.endswith(suf)
914
915
916 5
def lprint(iterable):
917
    """
918
    A oneliner for 'for item in x: print item'.
919
920
    :param iterable: Iterable to print.
921
    :type iterable: list/tuple
922
    """
923 5
    for item in iterable:
924 5
        print(item)
925
926
927 5
def cappath_config_loader(homepath=None):
928
    """
929
    Read a ConfigParser file to get cap preferences.
930
931
    :param homepath: Folder containing ini file. Default is user directory.
932
    :type homepath: str
933
    """
934 5
    capini = iniconfig.generic_loader('cappath', homepath)
935 5
    cappath = capini.get('path', fallback=bbconstants.CAP.location)
936 5
    return cappath
937
938
939 5
def cappath_config_writer(cappath=None, homepath=None):
940
    """
941
    Write a ConfigParser file to store cap preferences.
942
943
    :param cappath: Method to use.
944
    :type cappath: str
945
946
    :param homepath: Folder containing ini file. Default is user directory.
947
    :type homepath: str
948
    """
949 5
    cappath = grab_cap() if cappath is None else cappath
950 5
    results = {"path": cappath}
951 5
    iniconfig.generic_writer("cappath", results, homepath)
952
953
954 5
def def_args(dirs):
955
    """
956
    Return prepared argument list for most instances of :func:`cond_check:.
957
958
    :param dirs: List of directories.
959
    :type dirs: list(str)
960
    """
961 5
    return [dirs[4], dirs[5], dirs[2], dirs[3]]
962
963
964 5
def cond_do(dofunc, goargs, restargs=None, condition=True):
965
    """
966
    Do a function, check a condition, then do same function but swap first argument.
967
968
    :param dofunc: Function to do.
969
    :type dofunc: function
970
971
    :param goargs: List of variable arguments.
972
    :type goargs: list(str)
973
974
    :param restargs: Rest of arguments, which are constant.
975
    :type restargs: list(str)
976
977
    :param condition: Condition to check in order to use secondarg.
978
    :type condition: bool
979
    """
980 5
    restargs = [] if restargs is None else restargs
981 5
    dofunc(goargs[0], *restargs)
982 5
    if condition:
983 5
        dofunc(goargs[1], *restargs)
984
985
986 5
def cond_check(dofunc, goargs, restargs=None, condition=True, checkif=True, checkifnot=True):
987
    """
988
    Do :func:`cond_do` based on a condition, then do it again based on a second condition.
989
990
    :param dofunc: Function to do.
991
    :type dofunc: function
992
993
    :param goargs: List of variable arguments.
994
    :type goargs: list(str)
995
996
    :param restargs: Rest of arguments, which are constant.
997
    :type restargs: list(str)
998
999
    :param condition: Condition to check in order to use secondarg.
1000
    :type condition: bool
1001
1002
    :param checkif: Do :func:`cond_do` if this is True.
1003
    :type checkif: bool
1004
1005
    :param checkifnot: Do :func:`cond_do` if this is False.
1006
    :type checkifnot: bool
1007
    """
1008 5
    if checkif:
1009 5
        cond_do(dofunc, goargs[0:2], restargs, condition)
1010 5
    if not checkifnot:
1011
        cond_do(dofunc, goargs[2:4], restargs, condition)
1012