Test Failed
Branch master (c56792)
by John
02:11
created

bbarchivist.utilities   F

Complexity

Total Complexity 161

Size/Duplication

Total Lines 1130
Duplicated Lines 0 %

Test Coverage

Coverage 96.98%

Importance

Changes 0
Metric Value
eloc 414
dl 0
loc 1130
rs 0.6314
c 0
b 0
f 0
ccs 386
cts 398
cp 0.9698
wmc 161

69 Functions

Rating   Name   Duplication   Size   Complexity  
A prep_seven_zip() 0 14 2
A cond_do() 0 20 3
A signed_file_args() 0 11 4
A file_exists() 0 10 2
A grab_cap() 0 5 1
A bulkfilter_printer() 0 10 2
A droidlookup_hashtype() 0 11 2
A pop_stl1() 0 17 2
A prep_seven_zip_path() 0 17 2
A valid_carrier() 0 15 4
A valid_method_poptxz() 0 10 4
A prep_filesize() 0 11 2
A verify_bulk_loaders() 0 13 2
A new_enough() 0 8 2
A escreens_pin() 0 16 4
A droidlookup_devicetype() 0 15 4
A return_and_delete() 0 11 2
A is_windows() 0 5 1
A newer_103() 0 12 2
A get_seven_zip() 0 11 2
A get_core_count() 0 12 4
A stripper() 0 8 1
A filter_osversion() 0 16 2
A lprint() 0 9 2
A is_amd64() 0 5 1
A grab_datafile() 0 12 3
A positive_integer() 0 11 2
B cond_check() 0 26 3
A one_and_none() 0 12 2
A cappath_config_loader() 0 10 1
A wsz_registry() 0 8 1
B generate_lazy_urls() 0 26 2
A spinner_clear() 0 6 1
B bulk_urls() 0 36 3
A cappath_config_writer() 0 13 2
A win_seven_zip() 0 19 4
A i2b() 0 8 1
A dirhandler() 0 12 2
A create_bar_url() 0 20 2
A def_args() 0 8 1
A list_workers() 0 12 2
A valid_method() 0 13 2
A prepends() 0 14 1
A verify_bulk_loaders_filefilter() 0 9 3
A prep_logfile() 0 9 1
A talkaprint() 0 12 2
A filter_1031() 0 17 2
A wsz_local_bad() 0 10 1
A verify_bulk_loaders_brokens() 0 9 3
A prep_logfile_folder() 0 10 2
A win_seven_zip_local() 0 14 2
A s2b() 0 8 1
A grab_cfp() 0 5 1
A wsz_local_good() 0 10 2
A wsz_filecount() 0 6 3
A bulk_urls_altsw() 0 19 3
A line_begin() 0 6 1
A increment() 0 17 2
A bulkfilter() 0 9 3
A grab_capini() 0 16 3
A fsizer() 0 17 4
A escreens_duration() 0 11 2
A create_base_url() 0 13 1
A filter_urls() 0 18 2
B generate_urls() 0 36 3
A cpu_workers() 0 8 1
A verify_loader_integrity() 0 18 4
A prep_seven_zip_posix() 0 14 3
A format_app_name() 0 9 1

7 Methods

Rating   Name   Duplication   Size   Complexity  
A Spinner.stop() 0 5 1
A Spinner.__init__() 0 6 1
A SpinManager.start() 0 7 1
A Spinner.after() 0 11 2
A SpinManager.stop() 0 10 2
A SpinManager.__init__() 0 10 1
A SpinManager.loop() 0 12 3

How to fix   Complexity   

Complexity

Complex classes like bbarchivist.utilities often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
#!/usr/bin/env python3
0 ignored issues
show
coding-style introduced by
Too many lines in module (1129/1000)
Loading history...
2 5
"""This module is used for miscellaneous utilities."""
3
4 5
import argparse  # argument parser for filters
5 5
import glob  # cap grabbing
6 5
import hashlib  # base url creation
7 5
import itertools  # spinners gonna spin
8 5
import os  # path work
9 5
import platform  # platform info
10 5
import subprocess  # loader verification
11 5
import sys  # streams, version info
12 5
import threading  # get thread for spinner
13 5
import time  # spinner delay
14
15 5
from bbarchivist import bbconstants  # cap location, version, filename bits
16 5
from bbarchivist import compat  # backwards compat
17 5
from bbarchivist import dummy  # useless stdout
18 5
from bbarchivist import exceptions  # exceptions
19 5
from bbarchivist import iniconfig  # config parsing
20
21 5
__author__ = "Thurask"
22 5
__license__ = "WTFPL v2"
23 5
__copyright__ = "2015-2018 Thurask"
24
25
26 5
def grab_datafile(datafile):
27
    """
28
    Figure out where a datafile is.
29
30
    :param datafile: Datafile to check.
31
    :type datafile: bbconstants.Datafile
32
    """
33 5
    try:
34 5
        afile = glob.glob(os.path.join(os.getcwd(), datafile.filename))[0]
35 5
    except IndexError:
36 5
        afile = datafile.location if datafile.name == "cfp" else grab_capini(datafile)
37 5
    return os.path.abspath(afile)
38
39
40 5
def grab_capini(datafile):
41
    """
42
    Get cap location from .ini file, and write if it's new.
43
44
    :param datafile: Datafile to check.
45
    :type datafile: bbconstants.Datafile
46
    """
47 5
    try:
48 5
        apath = cappath_config_loader()
49 5
        afile = glob.glob(apath)[0]
50 5
    except IndexError:
51 5
        cappath_config_writer(datafile.location)
52 5
        return bbconstants.CAP.location  # no ini cap
53
    else:
54 5
        cappath_config_writer(os.path.abspath(afile))
55 5
        return os.path.abspath(afile)  # ini cap
56
57
58 5
def grab_cap():
59
    """
60
    Figure out where cap is, local, specified or system-supplied.
61
    """
62 5
    return grab_datafile(bbconstants.CAP)
63
64
65 5
def grab_cfp():
66
    """
67
    Figure out where cfp is, local or system-supplied.
68
    """
69 5
    return grab_datafile(bbconstants.CFP)
70
71
72 5
def new_enough(minver):
73
    """
74
    Check if we're at or above a minimum Python version.
75
76
    :param minver: Minimum Python version (3.minver).
77
    :type minver: int
78
    """
79 5
    return False if minver > sys.version_info[1] else True
80
81
82 5
def dirhandler(directory, defaultdir):
83
    """
84
    If directory is None, turn it into defaultdir.
85
86
    :param directory: Target directory.
87
    :type directory: str
88
89
    :param defaultdir: Default directory.
90
    :type defaultdir: str
91
    """
92 5
    directory = defaultdir if directory is None else directory
93 5
    return directory
94
95
96 5
def fsizer(file_size):
97
    """
98
    Raw byte file size to human-readable string.
99
100
    :param file_size: Number to parse.
101
    :type file_size: float
102
    """
103 5
    fsize = prep_filesize(file_size)
104 5
    for sfix in ['B', 'kB', 'MB', 'GB', 'TB', 'PB', 'EB', 'ZB']:
105 5
        if fsize < 1024.0:
106 5
            size = "{0:3.2f}{1}".format(fsize, sfix)
107 5
            break
108
        else:
109 5
            fsize /= 1024.0
110
    else:
111 5
        size = "{0:3.2f}{1}".format(fsize, 'YB')
112 5
    return size
113
114
115 5
def prep_filesize(file_size):
116
    """
117
    Convert file size to float.
118
119
    :param file_size: Number to parse.
120
    :type file_size: float
121
    """
122 5
    if file_size is None:
123 5
        file_size = 0.0
124 5
    fsize = float(file_size)
125 5
    return fsize
126
127
128 5
def signed_file_args(files):
129
    """
130
    Check if there are between 1 and 6 files supplied to argparse.
131
132
    :param files: List of signed files, between 1 and 6 strings.
133
    :type files: list(str)
134
    """
135 5
    filelist = [file for file in files if file]
136 5
    if not 1 <= len(filelist) <= 6:
137 5
        raise argparse.ArgumentError(argument=None, message="Requires 1-6 signed files")
138 5
    return files
139
140
141 5
def file_exists(file):
142
    """
143
    Check if file exists, raise argparse error if it doesn't.
144
145
    :param file: Path to a file, including extension.
146
    :type file: str
147
    """
148 5
    if not os.path.exists(file):
149 5
        raise argparse.ArgumentError(argument=None, message="{0} not found.".format(file))
150 5
    return file
151
152
153 5
def positive_integer(input_int):
154
    """
155
    Check if number > 0, raise argparse error if it isn't.
156
157
    :param input_int: Integer to check.
158
    :type input_int: str
159
    """
160 5
    if int(input_int) <= 0:
161 5
        info = "{0} is not >=0.".format(str(input_int))
162 5
        raise argparse.ArgumentError(argument=None, message=info)
163 5
    return int(input_int)
164
165
166 5
def valid_method_poptxz(methodlist):
167
    """
168
    Remove .tar.xz support if system is too old.
169
170
    :param methodlist: List of all methods.
171
    :type methodlist: tuple(str)
172
    """
173 5
    if not new_enough(3):
174 5
        methodlist = [x for x in bbconstants.METHODS if x != "txz"]
175 5
    return methodlist
176
177
178 5
def valid_method(method):
179
    """
180
    Check if compression method is valid, raise argparse error if it isn't.
181
182
    :param method: Compression method to check.
183
    :type method: str
184
    """
185 5
    methodlist = bbconstants.METHODS
186 5
    methodlist = valid_method_poptxz(methodlist)
187 5
    if method not in methodlist:
188 5
        info = "Invalid method {0}.".format(method)
189 5
        raise argparse.ArgumentError(argument=None, message=info)
190 5
    return method
191
192
193 5
def valid_carrier(mcc_mnc):
194
    """
195
    Check if MCC/MNC is valid (1-3 chars), raise argparse error if it isn't.
196
197
    :param mcc_mnc: MCC/MNC to check.
198
    :type mcc_mnc: str
199
    """
200 5
    if not str(mcc_mnc).isdecimal():
201 5
        infod = "Non-integer {0}.".format(str(mcc_mnc))
202 5
        raise argparse.ArgumentError(argument=None, message=infod)
203 5
    if len(str(mcc_mnc)) > 3 or not str(mcc_mnc):
204 5
        infol = "{0} is invalid.".format(str(mcc_mnc))
205 5
        raise argparse.ArgumentError(argument=None, message=infol)
206
    else:
207 5
        return mcc_mnc
208
209
210 5
def escreens_pin(pin):
211
    """
212
    Check if given PIN is valid, raise argparse error if it isn't.
213
214
    :param pin: PIN to check.
215
    :type pin: str
216
    """
217 5
    if len(pin) == 8:
218 5
        try:
219 5
            int(pin, 16)  # hexadecimal-ness
220 5
        except ValueError:
221 5
            raise argparse.ArgumentError(argument=None, message="Invalid PIN.")
222
        else:
223 5
            return pin.lower()
224
    else:
225 5
        raise argparse.ArgumentError(argument=None, message="Invalid PIN.")
226
227
228 5
def escreens_duration(duration):
229
    """
230
    Check if Engineering Screens duration is valid.
231
232
    :param duration: Duration to check.
233
    :type duration: int
234
    """
235 5
    if int(duration) in (1, 3, 6, 15, 30):
236 5
        return int(duration)
237
    else:
238 5
        raise argparse.ArgumentError(argument=None, message="Invalid duration.")
239
240
241 5
def droidlookup_hashtype(method):
242
    """
243
    Check if Android autoloader lookup hash type is valid.
244
245
    :param method: None for regular OS links, "sha256/512" for SHA256 or 512 hash.
246
    :type method: str
247
    """
248 5
    if method.lower() in ("sha512", "sha256"):
249 5
        return method.lower()
250
    else:
251 5
        raise argparse.ArgumentError(argument=None, message="Invalid type.")
252
253
254 5
def droidlookup_devicetype(device):
255
    """
256
    Check if Android autoloader device type is valid.
257
258
    :param device: Android autoloader types to check.
259
    :type device: str
260
    """
261 5
    devices = ("Priv", "DTEK50", "DTEK60", "KEYone", "Aurora", "Motion")
262 5
    if device is None:
263 5
        return None
264
    else:
265 5
        for dev in devices:
266 5
            if dev.lower() == device.lower():
267 5
                return dev
268 5
        raise argparse.ArgumentError(argument=None, message="Invalid device.")
269
270
271 5
def s2b(input_check):
272
    """
273
    Return Boolean interpretation of string input.
274
275
    :param input_check: String to check if it means True or False.
276
    :type input_check: str
277
    """
278 5
    return str(input_check).lower() in ("yes", "true", "t", "1", "y")
279
280
281 5
def i2b(input_check):
282
    """
283
    Return Boolean interpretation of typed input.
284
285
    :param input_check: Query to feed into input function.
286
    :type input_check: str
287
    """
288 5
    return s2b(input(input_check))
289
290
291 5
def is_amd64():
292
    """
293
    Check if script is running on an AMD64 system (Python can be 32/64, this is for subprocess)
294
    """
295 5
    return platform.machine().endswith("64")
296
297
298 5
def is_windows():
299
    """
300
    Check if script is running on Windows.
301
    """
302 5
    return platform.system() == "Windows"
303
304
305 5
def talkaprint(msg, talkative=False):
306
    """
307
    Print only if asked to.
308
309
    :param msg: Message to print.
310
    :type msg: str
311
312
    :param talkative: Whether to output to screen. False by default.
313
    :type talkative: bool
314
    """
315 5
    if talkative:
316 5
        print(msg)
317
318
319 5
def get_seven_zip(talkative=False):
320
    """
321
    Return name of 7-Zip executable.
322
    On POSIX, it MUST be 7za.
323
    On Windows, it can be installed or supplied with the script.
324
    :func:`win_seven_zip` is used to determine if it's installed.
325
326
    :param talkative: Whether to output to screen. False by default.
327
    :type talkative: bool
328
    """
329 5
    return win_seven_zip(talkative) if is_windows() else "7za"
330
331
332 5
def win_seven_zip(talkative=False):
333
    """
334
    For Windows, check where 7-Zip is ("where", pretty much).
335
    Consult registry first for any installed instances of 7-Zip.
336
337
    :param talkative: Whether to output to screen. False by default.
338
    :type talkative: bool
339
    """
340 5
    talkaprint("CHECKING INSTALLED FILES...", talkative)
341 5
    try:
342 5
        path = wsz_registry()
343 5
    except OSError as exc:
344 5
        if talkative:
345 5
            exceptions.handle_exception(exc, xit=None)
346 5
        talkaprint("TRYING LOCAL FILES...", talkative)
347 5
        return win_seven_zip_local(talkative)
348
    else:
349
        talkaprint("7ZIP USING INSTALLED FILES", talkative)
350
        return '"{0}"'.format(os.path.join(path[0], "7z.exe"))
351
352
353 5
def wsz_registry():
354
    """
355
    Check Windows registry for 7-Zip executable location.
356
    """
357
    import winreg  # windows registry
1 ignored issue
show
introduced by
Unable to import 'winreg'
Loading history...
358
    hk7z = winreg.OpenKey(winreg.HKEY_CURRENT_USER, "Software\\7-Zip")
359
    path = winreg.QueryValueEx(hk7z, "Path")
360
    return path
361
362
363 5
def win_seven_zip_local(talkative=False):
364
    """
365
    If 7-Zip isn't in the registry, fall back onto supplied executables.
366
    If *those* aren't there, return "error".
367
368
    :param talkative: Whether to output to screen. False by default.
369
    :type talkative: bool
370
    """
371 5
    filecount = wsz_filecount()
372 5
    if filecount == 2:
373 5
        szexe = wsz_local_good(talkative)
374
    else:
375 5
        szexe = wsz_local_bad(talkative)
376 5
    return szexe
377
378
379 5
def wsz_filecount():
380
    """
381
    Get count of 7-Zip executables in local folder.
382
    """
383 5
    filecount = len([x for x in os.listdir(os.getcwd()) if x in ["7za.exe", "7za64.exe"]])
384 5
    return filecount
385
386
387 5
def wsz_local_good(talkative=False):
388
    """
389
    Get 7-Zip exe name if everything is good.
390
391
    :param talkative: Whether to output to screen. False by default.
392
    :type talkative: bool
393
    """
394 5
    talkaprint("7ZIP USING LOCAL FILES", talkative)
395 5
    szexe = "7za64.exe" if is_amd64() else "7za.exe"
396 5
    return szexe
397
398
399 5
def wsz_local_bad(talkative=False):
400
    """
401
    Handle 7-Zip exe name in case of issues.
402
403
    :param talkative: Whether to output to screen. False by default.
404
    :type talkative: bool
405
    """
406 5
    talkaprint("NO LOCAL FILES", talkative)
407 5
    szexe = "error"
408 5
    return szexe
409
410
411 5
def get_core_count():
412
    """
413
    Find out how many CPU cores this system has.
414
    """
415 5
    try:
416 5
        cores = str(compat.enum_cpus())  # 3.4 and up
417 5
    except NotImplementedError:
418 5
        cores = "1"  # 3.2-3.3
419
    else:
420 5
        if compat.enum_cpus() is None:
421 5
            cores = "1"
422 5
    return cores
423
424
425 5
def prep_seven_zip_path(path, talkative=False):
426
    """
427
    Print p7zip path on POSIX, or notify if not there.
428
429
    :param path: Path to use.
430
    :type path: str
431
432
    :param talkative: Whether to output to screen. False by default.
433
    :type talkative: bool
434
    """
435 5
    if path is None:
436 5
        talkaprint("NO 7ZIP\nPLEASE INSTALL p7zip", talkative)
437 5
        sentinel = False
438
    else:
439 5
        talkaprint("7ZIP FOUND AT {0}".format(path), talkative)
440 5
        sentinel = True
441
    return sentinel
442
443 5
444
def prep_seven_zip_posix(talkative=False):
445
    """
446
    Check for p7zip on POSIX.
447
448
    :param talkative: Whether to output to screen. False by default.
449
    :type talkative: bool
450 5
    """
451 5
    try:
452 5
        path = compat.where_which("7za")
453 5
    except ImportError:
454 5
        talkaprint("PLEASE INSTALL SHUTILWHICH WITH PIP", talkative)
455
        return False
456 5
    else:
457
        return prep_seven_zip_path(path, talkative)
458
459 5
460
def prep_seven_zip(talkative=False):
461
    """
462
    Check for presence of 7-Zip.
463
    On POSIX, check for p7zip.
464
    On Windows, check for 7-Zip.
465
466
    :param talkative: Whether to output to screen. False by default.
467
    :type talkative: bool
468 5
    """
469
    if is_windows():
470
        final = get_seven_zip(talkative) != "error"
471 5
    else:
472
        final = prep_seven_zip_posix(talkative)
473
    return final
474 5
475
476
def increment(version, inc=3):
477
    """
478
    Increment version by given number. For repeated lookups.
479
480
    :param version: w.x.y.ZZZZ, becomes w.x.y.(ZZZZ + increment).
481
    :type version: str
482
483
    :param inc: What to increment by. Default is 3.
484 5
    :type inc: str
485 5
    """
486 5
    splitos = version.split(".")
487 5
    splitos[3] = int(splitos[3])
488 5
    if splitos[3] > 9996:  # prevent overflow
489 5
        splitos[3] = 0
490 5
    splitos[3] += int(inc)
491
    splitos[3] = str(splitos[3])
492
    return ".".join(splitos)
493 5
494
495
def stripper(name):
496
    """
497
    Strip fluff from bar filename.
498
499
    :param name: Bar filename, must contain '-nto+armle-v7+signed.bar'.
500 5
    :type name: str
501
    """
502
    return name.replace("-nto+armle-v7+signed.bar", "")
503 5
504
505
def create_base_url(softwareversion):
506
    """
507
    Make the root URL for production server files.
508
509
    :param softwareversion: Software version to hash.
510
    :type softwareversion: str
511 5
    """
512 5
    # Hash software version
513
    swhash = hashlib.sha1(softwareversion.encode('utf-8'))
514 5
    hashedsoftwareversion = swhash.hexdigest()
515 5
    # Root of all urls
516
    baseurl = "http://cdn.fs.sl.blackberry.com/fs/qnx/production/{0}".format(hashedsoftwareversion)
517
    return baseurl
518 5
519
520
def format_app_name(appname):
521
    """
522
    Convert long reverse DNS name to short name.
523
524
    :param appname: Application name (ex. sys.pim.calendar -> "calendar")
525 5
    :type appname: str
526 5
    """
527
    final = appname.split(".")[-1]
528
    return final
529 5
530
531
def create_bar_url(softwareversion, appname, appversion, clean=False):
532
    """
533
    Make the URL for any production server file.
534
535
    :param softwareversion: Software version to hash.
536
    :type softwareversion: str
537
538
    :param appname: Application name, preferably like on server.
539
    :type appname: str
540
541
    :param appversion: Application version.
542
    :type appversion: str
543
544
    :param clean: Whether or not to clean up app name. Default is False.
545 5
    :type clean: bool
546 5
    """
547 5
    baseurl = create_base_url(softwareversion)
548 5
    if clean:
549
        appname = format_app_name(appname)
550
    return "{0}/{1}-{2}-nto+armle-v7+signed.bar".format(baseurl, appname, appversion)
551 5
552
553
def generate_urls(softwareversion, osversion, radioversion, core=False):
554
    """
555
    Generate a list of OS URLs and a list of radio URLs based on input.
556
557
    :param softwareversion: Software version to hash.
558
    :type softwareversion: str
559
560
    :param osversion: OS version.
561
    :type osversion: str
562
563
    :param radioversion: Radio version.
564
    :type radioversion: str
565
566
    :param core: Whether or not to return core URLs as well.
567 5
    :type core: bool
568
    """
569
    osurls = [
570
        create_bar_url(softwareversion, "winchester.factory_sfi.desktop", osversion),
571
        create_bar_url(softwareversion, "qc8960.factory_sfi.desktop", osversion),
572
        create_bar_url(softwareversion, "qc8960.factory_sfi.desktop", osversion),
573 5
        create_bar_url(softwareversion, "qc8974.factory_sfi.desktop", osversion)
574
    ]
575
    radiourls = [
576
        create_bar_url(softwareversion, "m5730", radioversion),
577
        create_bar_url(softwareversion, "qc8960", radioversion),
578
        create_bar_url(softwareversion, "qc8960.omadm", radioversion),
579
        create_bar_url(softwareversion, "qc8960.wtr", radioversion),
580
        create_bar_url(softwareversion, "qc8960.wtr5", radioversion),
581
        create_bar_url(softwareversion, "qc8930.wtr5", radioversion),
582 5
        create_bar_url(softwareversion, "qc8974.wtr2", radioversion)
583 5
    ]
584 5
    coreurls = []
585 5
    osurls, radiourls = filter_urls(osurls, radiourls, osversion)
586 5
    if core:
587
        coreurls = [x.replace(".desktop", "") for x in osurls]
588
    return osurls, radiourls, coreurls
589 5
590
591
def newer_103(splitos, third):
592
    """
593
    Return True if given split OS version is 10.3.X or newer.
594
595
    :param splitos: OS version, split on the dots: [10, 3, 3, 2205]
596
    :type: list(int)
597
598
    :param third: The X in 10.3.X.
599 5
    :type third: int
600 5
    """
601
    newer = True if ((splitos[1] >= 4) or (splitos[1] == 3 and splitos[2] >= third)) else False
602
    return newer
603 5
604
605
def filter_urls(osurls, radiourls, osversion):
606
    """
607
    Filter lists of OS and radio URLs.
608
609
    :param osurls: List of OS URLs.
610
    :type osurls: list(str)
611
612
    :param radiourls: List of radio URLs.
613
    :type radiourls: list(str)
614
615
    :param osversion: OS version.
616 5
    :type osversion: str
617 5
    """
618 5
    splitos = [int(i) for i in osversion.split(".")]
619 5
    osurls[2] = filter_1031(osurls[2], splitos, 5)  # Z3 10.3.1+
620 5
    osurls[3] = filter_1031(osurls[3], splitos, 6)  # Passport 10.3.1+
621
    osurls[0], radiourls[0] = pop_stl1(osurls[0], radiourls[0], splitos)  # STL100-1 10.3.3+
622
    return osurls, radiourls
623 5
624
625
def filter_1031(osurl, splitos, device):
626
    """
627
    Modify URLs to reflect changes in 10.3.1.
628
629
    :param osurl: OS URL to modify.
630
    :type osurl: str
631
632
    :param splitos: OS version, split and cast to int: [10, 3, 2, 2876]
633
    :type splitos: list(int)
634
635
    :param device: Device to use.
636 5
    :type device: int
637 5
    """
638 5
    if newer_103(splitos, 1):
639 5
        filterdict = {5: ("qc8960.factory_sfi", "qc8960.factory_sfi_hybrid_qc8x30"), 6: ("qc8974.factory_sfi", "qc8960.factory_sfi_hybrid_qc8974")}
640
        osurl = filter_osversion(osurl, device, filterdict)
641
    return osurl
642 5
643
644
def pop_stl1(osurl, radiourl, splitos):
645
    """
646
    Replace STL100-1 links in 10.3.3+.
647
648
    :param osurl: OS URL to modify.
649
    :type osurl: str
650
651
    :param radiourl: Radio URL to modify.
652
    :type radiourl: str
653
654
    :param splitos: OS version, split and cast to int: [10, 3, 3, 2205]
655 5
    :type splitos: list(int)
656 5
    """
657 5
    if newer_103(splitos, 3):
658 5
        osurl = osurl.replace("winchester", "qc8960")  # duplicates get filtered out later
659
        radiourl = radiourl.replace("m5730", "qc8960")
660
    return osurl, radiourl
661 5
662
663
def filter_osversion(osurl, device, filterdict):
664
    """
665
    Modify URLs based on device index and dictionary of changes.
666
667
    :param osurl: OS URL to modify.
668
    :type osurl: str
669
670
    :param device: Device to use.
671
    :type device: int
672
673
    :param filterdict: Dictionary of changes: {device : (before, after)}
674 5
    :type filterdict: dict(int:(str, str))
675 5
    """
676 5
    if device in filterdict.keys():
677
        osurl = osurl.replace(filterdict[device][0], filterdict[device][1])
678
    return osurl
679 5
680
681
def generate_lazy_urls(softwareversion, osversion, radioversion, device):
682
    """
683
    Generate a pair of OS/radio URLs based on input.
684
685
    :param softwareversion: Software version to hash.
686
    :type softwareversion: str
687
688
    :param osversion: OS version.
689
    :type osversion: str
690
691
    :param radioversion: Radio version.
692
    :type radioversion: str
693
694
    :param device: Device to use.
695 5
    :type device: int
696 5
    """
697
    splitos = [int(i) for i in osversion.split(".")]
698 5
    rads = ["m5730", "qc8960", "qc8960.omadm", "qc8960.wtr",
699
            "qc8960.wtr5", "qc8930.wtr4", "qc8974.wtr2"]
700 5
    oses = ["winchester.factory", "qc8960.factory", "qc8960.verizon",
701 5
            "qc8974.factory"]
702 5
    maps = {0:0, 1:1, 2:2, 3:1, 4:1, 5:1, 6:3}
703 5
    osurl = create_bar_url(softwareversion, "{0}_sfi.desktop".format(oses[maps[device]]), osversion)
704 5
    radiourl = create_bar_url(softwareversion, rads[device], radioversion)
705
    osurl = filter_1031(osurl, splitos, device)
706
    return osurl, radiourl
707 5
708
709
def bulk_urls(softwareversion, osversion, radioversion, core=False, altsw=None):
710
    """
711
    Generate all URLs, plus extra Verizon URLs.
712
713
    :param softwareversion: Software version to hash.
714
    :type softwareversion: str
715
716
    :param osversion: OS version.
717
    :type osversion: str
718
719
    :param radioversion: Radio version.
720
    :type radioversion: str
721
722
    :param device: Device to use.
723
    :type device: int
724
725
    :param core: Whether or not to return core URLs as well.
726
    :type core: bool
727
728
    :param altsw: Radio software release, if not the same as OS.
729 5
    :type altsw: str
730 5
    """
731 5
    baseurl = create_base_url(softwareversion)
732 5
    osurls, radurls, coreurls = generate_urls(softwareversion, osversion, radioversion, core)
733 5
    vzwos, vzwrad = generate_lazy_urls(softwareversion, osversion, radioversion, 2)
734 5
    osurls.append(vzwos)
735 5
    radurls.append(vzwrad)
736 5
    vzwcore = vzwos.replace("sfi.desktop", "sfi")
737 5
    if core:
738 5
        coreurls.append(vzwcore)
739 5
    osurls = list(set(osurls))  # pop duplicates
740 5
    radurls = list(set(radurls))
741 5
    if core:
742 5
        coreurls = list(set(coreurls))
743
    radurls = bulk_urls_altsw(radurls, baseurl, altsw)
744
    return osurls, coreurls, radurls
745 5
746
747
def bulk_urls_altsw(radurls, baseurl, altsw=None):
748
    """
749
    Handle alternate software release for radio.
750
751
    :param radurls: List of radio URLs.
752
    :type radurls: list(str)
753
754
    :param baseurl: Base URL (from http to hashed SW release).
755
    :type baseurl: str
756
757
    :param altsw: Radio software release, if not the same as OS.
758 5
    :type altsw: str
759 5
    """
760 5
    if altsw is not None:
761 5
        altbase = create_base_url(altsw)
762 5
        radiourls2 = [rad.replace(baseurl, altbase) for rad in radurls]
763 5
        radurls = radiourls2
764
        del radiourls2
765
    return radurls
766 5
767
768
def line_begin():
769
    """
770 5
    Go to beginning of line, to overwrite whatever's there.
771 5
    """
772
    sys.stdout.write("\r")
773
    sys.stdout.flush()
774 5
775
776
def spinner_clear():
777
    """
778 5
    Get rid of any spinner residue left in stdout.
779 5
    """
780
    sys.stdout.write("\b \b")
781
    sys.stdout.flush()
782 5
783
784
class Spinner(object):
0 ignored issues
show
Unused Code introduced by
The variable __class__ seems to be unused.
Loading history...
785
    """
786
    A basic spinner using itertools. No need for progress.
787 5
    """
788 5
789 5
    def __init__(self):
790
        """
791 5
        Generate the itertools wheel.
792
        """
793
        self.wheel = itertools.cycle(['-', '/', '|', '\\'])
794
        self.file = dummy.UselessStdout()
795 5
796 5
    def after(self):
797 5
        """
798 5
        Iterate over itertools.cycle, write to file.
799 5
        """
800
        try:
801
            self.file.write(next(self.wheel))
802
            self.file.flush()
803 5
            self.file.write("\b\r")
804
            self.file.flush()
805
        except (KeyboardInterrupt, SystemExit):
806
            self.stop()
807 5
808
    def stop(self):
809
        """
810 5
        Kill output.
811
        """
812
        self.file = dummy.UselessStdout()
813
814
815 5
class SpinManager(object):
0 ignored issues
show
Unused Code introduced by
The variable __class__ seems to be unused.
Loading history...
816 5
    """
817 5
    Wraps around the itertools spinner, runs it in another thread.
818 5
    """
819 5
820 5
    def __init__(self):
821 5
        """
822
        Start the spinner thread.
823 5
        """
824
        spinner = Spinner()
825
        self.spinner = spinner
826
        self.thread = threading.Thread(target=self.loop, args=())
827 5
        self.thread.daemon = True
828 5
        self.scanning = False
829 5
        self.spinner.file = dummy.UselessStdout()
830
831 5
    def start(self):
832
        """
833
        Begin the spinner.
834
        """
835 5
        self.spinner.file = sys.stderr
836 5
        self.scanning = True
837 5
        self.thread.start()
838 5
839 5
    def loop(self):
840
        """
841
        Spin if scanning, clean up if not.
842
        """
843
        while self.scanning:
844 5
            time.sleep(0.5)
845
            try:
846
                line_begin()
847
                self.spinner.after()
848 5
            except (KeyboardInterrupt, SystemExit):
849 5
                self.scanning = False
850 5
                self.stop()
851 5
852 5
    def stop(self):
853 5
        """
854
        Stop the spinner.
855
        """
856 5
        self.spinner.stop()
857
        self.scanning = False
858
        spinner_clear()
859
        line_begin()
860
        if not is_windows():
861
            print("\n")
862
863 5
864 5
def return_and_delete(target):
865 5
    """
866 5
    Read text file, then delete it. Return contents.
867
868
    :param target: Text file to read.
869 5
    :type target: str
870
    """
871
    with open(target, "r") as thefile:
872
        content = thefile.read()
873
    os.remove(target)
874
    return content
875
876 5
877 5
def verify_loader_integrity(loaderfile):
878
    """
879 5
    Test for created loader integrity. Windows-only.
880 5
881 5
    :param loaderfile: Path to loader.
882 5
    :type loaderfile: str
883 5
    """
884 5
    if not is_windows():
885 5
        pass
886 5
    else:
887
        excode = None
888
        try:
889 5
            with open(os.devnull, 'rb') as dnull:
890
                cmd = "{0} fileinfo".format(loaderfile)
891
                excode = subprocess.call(cmd, stdout=dnull, stderr=subprocess.STDOUT)
892
        except OSError:
893
            excode = -1
894
        return excode == 0  # 0 if OK, non-zero if something broke
895
896 5
897 5
def bulkfilter_printer(afile):
898 5
    """
899
    Print filename and verify a loader file.
900
901 5
    :param afile: Path to file.
902
    :type afile: str
903
    """
904
    print("TESTING: {0}".format(os.path.basename(afile)))
905
    if not verify_loader_integrity(afile):
906
        return os.path.basename(afile)
907
908 5
909 5
def bulkfilter(files):
910
    """
911
    Verify all loader files in a given list.
912 5
913
    :param files: List of files.
914
    :type files: list(str)
915
    """
916
    brokens = [bulkfilter_printer(file) for file in files if prepends(os.path.basename(file), bbconstants.PREFIXES, ".exe")]
917
    return brokens
918
919 5
920 5
def verify_bulk_loaders(ldir):
921
    """
922 5
    Run :func:`verify_loader_integrity` for all files in a dir.
923 5
924 5
    :param ldir: Directory to use.
925
    :type ldir: str
926
    """
927 5
    if not is_windows():
928
        pass
929
    else:
930
        files = verify_bulk_loaders_filefilter(ldir)
931
        brokens = verify_bulk_loaders_brokens(files)
932
        return brokens
933
934 5
935 5
def verify_bulk_loaders_filefilter(ldir):
936
    """
937
    Prepare file names for :func:`verify_bulk_loaders`.
938 5
939
    :param ldir: Directory to use.
940
    :type ldir: str
941
    """
942
    files = [os.path.join(ldir, file) for file in os.listdir(ldir) if not os.path.isdir(file)]
943
    return files
944
945 5
946 5
def verify_bulk_loaders_brokens(files):
947
    """
948
    Prepare filtered file list for :func:`verify_bulk_loaders`.
949 5
950
    :param files: List of files.
951
    :type files: list(str)
952
    """
953
    brokens = [file for file in bulkfilter(files) if file]
954
    return brokens
955
956
957
def list_workers(input_data, workerlimit):
958
    """
959 5
    Count number of threads, either length of iterable or provided limit.
960 5
961
    :param input_data: Input data, some iterable.
962
    :type input_data: list
963 5
964
    :param workerlimit: Maximum number of workers.
965
    :type workerlimit: int
966
    """
967
    runners = len(input_data) if len(input_data) < workerlimit else workerlimit
968
    return runners
969
970 5
971
def cpu_workers(input_data):
972
    """
973 5
    Count number of CPU workers, smaller of number of threads and length of data.
974
975
    :param input_data: Input data, some iterable.
976
    :type input_data: list
977 5
    """
978 5
    return list_workers(input_data, compat.enum_cpus())
979 5
980 5
981 5
def prep_logfile():
982
    """
983
    Prepare log file, labeling it with current date. Select folder based on frozen status.
984 5
    """
985
    logfile = "{0}.txt".format(time.strftime("%Y_%m_%d_%H%M%S"))
986
    basefolder = prep_logfile_folder()
987
    record = os.path.join(basefolder, logfile)
988 5
    open(record, "w").close()
989 5
    return record
990 5
991
992 5
def prep_logfile_folder():
993 5
    """
994
    Prepare folder to write log file to.
995
    """
996 5
    if getattr(sys, 'frozen', False):
997
        basefolder = os.path.join(os.getcwd(), "lookuplogs")
998
        os.makedirs(basefolder, exist_ok=True)
999
    else:
1000
        basefolder = iniconfig.config_homepath(None, True)
1001
    return basefolder
1002
1003
1004
def prepends(file, pre, suf):
1005
    """
1006
    Check if filename starts with/ends with stuff.
1007
1008
    :param file: File to check.
1009 5
    :type file: str
1010
1011
    :param pre: Prefix(es) to check.
1012 5
    :type pre: str or list or tuple
1013
1014
    :param suf: Suffix(es) to check.
1015
    :type suf: str or list or tuple
1016
    """
1017
    return file.startswith(pre) and file.endswith(suf)
1018
1019 5
1020 5
def lprint(iterable):
1021
    """
1022
    A oneliner for 'for item in x: print item'.
1023 5
1024
    :param iterable: Iterable to print.
1025
    :type iterable: list/tuple
1026
    """
1027
    for item in iterable:
1028
        print(item)
1029
1030 5
1031 5
def cappath_config_loader(homepath=None):
1032 5
    """
1033
    Read a ConfigParser file to get cap preferences.
1034
1035 5
    :param homepath: Folder containing ini file. Default is user directory.
1036
    :type homepath: str
1037
    """
1038
    capini = iniconfig.generic_loader('cappath', homepath)
1039
    cappath = capini.get('path', fallback=bbconstants.CAP.location)
1040
    return cappath
1041
1042
1043
def cappath_config_writer(cappath=None, homepath=None):
1044
    """
1045 5
    Write a ConfigParser file to store cap preferences.
1046 5
1047 5
    :param cappath: Method to use.
1048
    :type cappath: str
1049
1050 5
    :param homepath: Folder containing ini file. Default is user directory.
1051
    :type homepath: str
1052
    """
1053
    cappath = grab_cap() if cappath is None else cappath
1054
    results = {"path": cappath}
1055
    iniconfig.generic_writer("cappath", results, homepath)
1056
1057
1058
def one_and_none(first, second):
1059
    """
1060 5
    Check if one element in a pair is None and one isn't.
1061 5
1062
    :param first: To return True, this must be None.
1063
    :type first: str
1064 5
1065
    :param second: To return True, this mustbe false.
1066
    :type second: str
1067
    """
1068
    sentinel = True if first is None and second is not None else False
1069
    return sentinel
1070
1071 5
1072
def def_args(dirs):
1073
    """
1074 5
    Return prepared argument list for most instances of :func:`cond_check:`.
1075
1076
    :param dirs: List of directories.
1077
    :type dirs: list(str)
1078
    """
1079
    return [dirs[4], dirs[5], dirs[2], dirs[3]]
1080
1081
1082
def cond_do(dofunc, goargs, restargs=None, condition=True):
1083
    """
1084
    Do a function, check a condition, then do same function but swap first argument.
1085
1086
    :param dofunc: Function to do.
1087
    :type dofunc: function
1088
1089
    :param goargs: List of variable arguments.
1090 5
    :type goargs: list(str)
1091 5
1092 5
    :param restargs: Rest of arguments, which are constant.
1093 5
    :type restargs: list(str)
1094
1095
    :param condition: Condition to check in order to use secondarg.
1096 5
    :type condition: bool
1097
    """
1098
    restargs = [] if restargs is None else restargs
1099
    dofunc(goargs[0], *restargs)
1100
    if condition:
1101
        dofunc(goargs[1], *restargs)
1102
1103
1104
def cond_check(dofunc, goargs, restargs=None, condition=True, checkif=True, checkifnot=True):
1 ignored issue
show
best-practice introduced by
Too many arguments (6/5)
Loading history...
1105
    """
1106
    Do :func:`cond_do` based on a condition, then do it again based on a second condition.
1107
1108
    :param dofunc: Function to do.
1109
    :type dofunc: function
1110
1111
    :param goargs: List of variable arguments.
1112
    :type goargs: list(str)
1113
1114
    :param restargs: Rest of arguments, which are constant.
1115
    :type restargs: list(str)
1116
1117
    :param condition: Condition to check in order to use secondarg.
1118 5
    :type condition: bool
1119 5
1120 5
    :param checkif: Do :func:`cond_do` if this is True.
1121 5
    :type checkif: bool
1122
1123
    :param checkifnot: Do :func:`cond_do` if this is False.
1124
    :type checkifnot: bool
1125
    """
1126
    if checkif:
1127
        cond_do(dofunc, goargs[0:2], restargs, condition)
1128
    if not checkifnot:
1129
        cond_do(dofunc, goargs[2:4], restargs, condition)
1130