bbarchivist.utilities   F
last analyzed

Complexity

Total Complexity 121

Size/Duplication

Total Lines 995
Duplicated Lines 0 %

Test Coverage

Coverage 96.49%

Importance

Changes 0
Metric Value
eloc 361
dl 0
loc 995
ccs 330
cts 342
cp 0.9649
rs 2
c 0
b 0
f 0
wmc 121

59 Functions

Rating   Name   Duplication   Size   Complexity  
A grab_cap() 0 5 1
A grab_datafile() 0 12 3
A grab_cfp() 0 5 1
A grab_capini() 0 16 3
A prep_seven_zip() 0 14 2
A cond_do() 0 20 3
A bulkfilter_printer() 0 10 2
A pop_stl1() 0 17 2
A prep_seven_zip_path() 0 17 2
A prep_filesize() 0 11 2
A verify_bulk_loaders() 0 13 2
A new_enough() 0 17 4
A return_and_delete() 0 11 2
A is_windows() 0 5 1
A newer_103() 0 12 2
A get_seven_zip() 0 11 2
A get_core_count() 0 12 4
A stripper() 0 8 1
A filter_osversion() 0 16 2
A lprint() 0 9 2
A is_amd64() 0 5 1
A cond_check() 0 26 3
A one_and_none() 0 12 2
A cappath_config_loader() 0 10 1
A wsz_registry() 0 8 1
A generate_lazy_urls() 0 26 1
A spinner_clear() 0 6 1
A bulk_urls() 0 36 3
A cappath_config_writer() 0 13 2
A win_seven_zip() 0 19 4
A i2b() 0 8 1
A dirhandler() 0 12 2
A create_bar_url() 0 20 2
A def_args() 0 8 1
A list_workers() 0 12 2
A prepends() 0 14 1
A verify_bulk_loaders_filefilter() 0 9 1
A prep_logfile() 0 9 1
A talkaprint() 0 12 2
A filter_1031() 0 17 2
A wsz_local_bad() 0 10 1
A prep_logfile_folder() 0 10 2
A verify_bulk_loaders_brokens() 0 9 1
A win_seven_zip_local() 0 14 2
A s2b() 0 8 1
A wsz_local_good() 0 10 2
A wsz_filecount() 0 6 1
A bulk_urls_altsw() 0 19 2
A line_begin() 0 6 1
A increment() 0 17 2
A bulkfilter() 0 9 1
A fsizer() 0 17 4
A create_base_url() 0 13 1
A filter_urls() 0 18 1
A generate_urls() 0 36 2
A cpu_workers() 0 8 1
A verify_loader_integrity() 0 18 4
A prep_seven_zip_posix() 0 14 3
A format_app_name() 0 9 1

7 Methods

Rating   Name   Duplication   Size   Complexity  
A Spinner.stop() 0 5 1
A Spinner.__init__() 0 6 1
A SpinManager.start() 0 7 1
A Spinner.after() 0 11 2
A SpinManager.stop() 0 10 2
A SpinManager.__init__() 0 10 1
A SpinManager.loop() 0 12 3

How to fix   Complexity   

Complexity

Complex classes like bbarchivist.utilities often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
#!/usr/bin/env python3
2 5
"""This module is used for miscellaneous utilities."""
3
4 5
import glob  # cap grabbing
5 5
import hashlib  # base url creation
6 5
import itertools  # spinners gonna spin
7 5
import os  # path work
8 5
import platform  # platform info
9 5
import subprocess  # loader verification
10 5
import sys  # streams, version info
11 5
import threading  # get thread for spinner
12 5
import time  # spinner delay
13
14 5
from bbarchivist import bbconstants  # cap location, version, filename bits
15 5
from bbarchivist import compat  # backwards compat
16 5
from bbarchivist import dummy  # useless stdout
17 5
from bbarchivist import exceptions  # exceptions
18 5
from bbarchivist import iniconfig  # config parsing
19
20 5
__author__ = "Thurask"
21 5
__license__ = "WTFPL v2"
22 5
__copyright__ = "2015-2019 Thurask"
23
24
25 5
def grab_datafile(datafile):
26
    """
27
    Figure out where a datafile is.
28
29
    :param datafile: Datafile to check.
30
    :type datafile: bbconstants.Datafile
31
    """
32 5
    try:
33 5
        afile = glob.glob(os.path.join(os.getcwd(), datafile.filename))[0]
34 5
    except IndexError:
35 5
        afile = datafile.location if datafile.name == "cfp" else grab_capini(datafile)
36 5
    return os.path.abspath(afile)
37
38
39 5
def grab_capini(datafile):
40
    """
41
    Get cap location from .ini file, and write if it's new.
42
43
    :param datafile: Datafile to check.
44
    :type datafile: bbconstants.Datafile
45
    """
46 5
    try:
47 5
        apath = cappath_config_loader()
48 5
        afile = glob.glob(apath)[0]
49 5
    except IndexError:
50 5
        cappath_config_writer(datafile.location)
51 5
        return bbconstants.CAP.location  # no ini cap
52
    else:
53 5
        cappath_config_writer(os.path.abspath(afile))
54 5
        return os.path.abspath(afile)  # ini cap
55
56
57 5
def grab_cap():
58
    """
59
    Figure out where cap is, local, specified or system-supplied.
60
    """
61 5
    return grab_datafile(bbconstants.CAP)
62
63
64 5
def grab_cfp():
65
    """
66
    Figure out where cfp is, local or system-supplied.
67
    """
68 5
    return grab_datafile(bbconstants.CFP)
69
70
71 5
def new_enough(majver, minver):
72
    """
73
    Check if we're at or above a minimum Python version.
74
75
    :param majver: Minimum major Python version (majver.minver).
76
    :type majver: int
77
78 5
    :param minver: Minimum minor Python version (majver.minver).
79
    :type minver: int
80
    """
81 5
    if majver > sys.version_info[0]:
82
        sentinel = False
83
    elif majver == sys.version_info[0] and minver > sys.version_info[1]:
84
        sentinel = False
85
    else:
86
        sentinel = True
87
    return sentinel
88
89
90
def dirhandler(directory, defaultdir):
91 5
    """
92 5
    If directory is None, turn it into defaultdir.
93
94
    :param directory: Target directory.
95 5
    :type directory: str
96
97
    :param defaultdir: Default directory.
98
    :type defaultdir: str
99
    """
100
    directory = defaultdir if directory is None else directory
101
    return directory
102 5
103 5
104 5
def fsizer(file_size):
105 5
    """
106 5
    Raw byte file size to human-readable string.
107
108 5
    :param file_size: Number to parse.
109
    :type file_size: float
110 5
    """
111 5
    fsize = prep_filesize(file_size)
112
    for sfix in ['B', 'kB', 'MB', 'GB', 'TB', 'PB', 'EB', 'ZB']:
113
        if fsize < 1024.0:
114 5
            size = "{0:3.2f}{1}".format(fsize, sfix)
115
            break
116
        else:
117
            fsize /= 1024.0
118
    else:
119
        size = "{0:3.2f}{1}".format(fsize, 'YB')
120
    return size
121 5
122 5
123 5
def prep_filesize(file_size):
124 5
    """
125
    Convert file size to float.
126
127 5
    :param file_size: Number to parse.
128
    :type file_size: float
129
    """
130
    if file_size is None:
131
        file_size = 0.0
132
    fsize = float(file_size)
133
    return fsize
134 5
135
136
def s2b(input_check):
137 5
    """
138
    Return Boolean interpretation of string input.
139
140
    :param input_check: String to check if it means True or False.
141
    :type input_check: str
142
    """
143
    return str(input_check).lower() in ("yes", "true", "t", "1", "y")
144 5
145
146
def i2b(input_check):
147 5
    """
148
    Return Boolean interpretation of typed input.
149
150
    :param input_check: Query to feed into input function.
151 5
    :type input_check: str
152
    """
153
    return s2b(input(input_check))
154 5
155
156
def is_amd64():
157
    """
158 5
    Check if script is running on an AMD64 system (Python can be 32/64, this is for subprocess)
159
    """
160
    return platform.machine().endswith("64")
161 5
162
163
def is_windows():
164
    """
165
    Check if script is running on Windows.
166
    """
167
    return platform.system() == "Windows"
168
169
170
def talkaprint(msg, talkative=False):
171 5
    """
172 5
    Print only if asked to.
173
174
    :param msg: Message to print.
175 5
    :type msg: str
176
177
    :param talkative: Whether to output to screen. False by default.
178
    :type talkative: bool
179
    """
180
    if talkative:
181
        print(msg)
182
183
184
def get_seven_zip(talkative=False):
185 5
    """
186
    Return name of 7-Zip executable.
187
    On POSIX, it MUST be 7za.
188 5
    On Windows, it can be installed or supplied with the script.
189
    :func:`win_seven_zip` is used to determine if it's installed.
190
191
    :param talkative: Whether to output to screen. False by default.
192
    :type talkative: bool
193
    """
194
    return win_seven_zip(talkative) if is_windows() else "7za"
195
196 5
197 5
def win_seven_zip(talkative=False):
198 5
    """
199 5
    For Windows, check where 7-Zip is ("where", pretty much).
200 5
    Consult registry first for any installed instances of 7-Zip.
201 5
202 5
    :param talkative: Whether to output to screen. False by default.
203 5
    :type talkative: bool
204
    """
205
    talkaprint("CHECKING INSTALLED FILES...", talkative)
206
    try:
207
        path = wsz_registry()
208
    except OSError as exc:
209 5
        if talkative:
210
            exceptions.handle_exception(exc, xit=None)
211
        talkaprint("TRYING LOCAL FILES...", talkative)
212
        return win_seven_zip_local(talkative)
213
    else:
214
        talkaprint("7ZIP USING INSTALLED FILES", talkative)
215
        return '"{0}"'.format(os.path.join(path[0], "7z.exe"))
216
217
218
def wsz_registry():
219 5
    """
220
    Check Windows registry for 7-Zip executable location.
221
    """
222
    import winreg  # windows registry
223
    hk7z = winreg.OpenKey(winreg.HKEY_CURRENT_USER, "Software\\7-Zip")
224
    path = winreg.QueryValueEx(hk7z, "Path")
225
    return path
226
227 5
228 5
def win_seven_zip_local(talkative=False):
229 5
    """
230
    If 7-Zip isn't in the registry, fall back onto supplied executables.
231 5
    If *those* aren't there, return "error".
232 5
233
    :param talkative: Whether to output to screen. False by default.
234
    :type talkative: bool
235 5
    """
236
    filecount = wsz_filecount()
237
    if filecount == 2:
238
        szexe = wsz_local_good(talkative)
239 5
    else:
240 5
        szexe = wsz_local_bad(talkative)
241
    return szexe
242
243 5
244
def wsz_filecount():
245
    """
246
    Get count of 7-Zip executables in local folder.
247
    """
248
    filecount = len([x for x in os.listdir(os.getcwd()) if x in ["7za.exe", "7za64.exe"]])
249
    return filecount
250 5
251 5
252 5
def wsz_local_good(talkative=False):
253
    """
254
    Get 7-Zip exe name if everything is good.
255 5
256
    :param talkative: Whether to output to screen. False by default.
257
    :type talkative: bool
258
    """
259
    talkaprint("7ZIP USING LOCAL FILES", talkative)
260
    szexe = "7za64.exe" if is_amd64() else "7za.exe"
261
    return szexe
262 5
263 5
264 5
def wsz_local_bad(talkative=False):
265
    """
266
    Handle 7-Zip exe name in case of issues.
267 5
268
    :param talkative: Whether to output to screen. False by default.
269
    :type talkative: bool
270
    """
271 5
    talkaprint("NO LOCAL FILES", talkative)
272 5
    szexe = "error"
273 5
    return szexe
274 5
275
276 5
def get_core_count():
277 5
    """
278 5
    Find out how many CPU cores this system has.
279
    """
280
    try:
281 5
        cores = str(compat.enum_cpus())  # 3.4 and up
282
    except NotImplementedError:
283
        cores = "1"  # 3.2-3.3
284
    else:
285
        if compat.enum_cpus() is None:
286
            cores = "1"
287
    return cores
288
289
290
def prep_seven_zip_path(path, talkative=False):
291 5
    """
292 5
    Print p7zip path on POSIX, or notify if not there.
293 5
294
    :param path: Path to use.
295 5
    :type path: str
296 5
297 5
    :param talkative: Whether to output to screen. False by default.
298
    :type talkative: bool
299
    """
300 5
    if path is None:
301
        talkaprint("NO 7ZIP\nPLEASE INSTALL p7zip", talkative)
302
        sentinel = False
303
    else:
304
        talkaprint("7ZIP FOUND AT {0}".format(path), talkative)
305
        sentinel = True
306
    return sentinel
307 5
308 5
309 5
def prep_seven_zip_posix(talkative=False):
310 5
    """
311 5
    Check for p7zip on POSIX.
312
313 5
    :param talkative: Whether to output to screen. False by default.
314
    :type talkative: bool
315
    """
316 5
    try:
317
        path = compat.where_which("7za")
318
    except ImportError:
319
        talkaprint("PLEASE INSTALL SHUTILWHICH WITH PIP", talkative)
320
        return False
321
    else:
322
        return prep_seven_zip_path(path, talkative)
323
324
325 5
def prep_seven_zip(talkative=False):
326
    """
327
    Check for presence of 7-Zip.
328 5
    On POSIX, check for p7zip.
329 5
    On Windows, check for 7-Zip.
330
331
    :param talkative: Whether to output to screen. False by default.
332 5
    :type talkative: bool
333
    """
334
    if is_windows():
335
        final = get_seven_zip(talkative) != "error"
336
    else:
337
        final = prep_seven_zip_posix(talkative)
338
    return final
339
340
341
def increment(version, inc=3):
342 5
    """
343 5
    Increment version by given number. For repeated lookups.
344 5
345 5
    :param version: w.x.y.ZZZZ, becomes w.x.y.(ZZZZ + increment).
346 5
    :type version: str
347 5
348 5
    :param inc: What to increment by. Default is 3.
349
    :type inc: str
350
    """
351 5
    splitos = version.split(".")
352
    splitos[3] = int(splitos[3])
353
    if splitos[3] > 9996:  # prevent overflow
354
        splitos[3] = 0
355
    splitos[3] += int(inc)
356
    splitos[3] = str(splitos[3])
357
    return ".".join(splitos)
358 5
359
360
def stripper(name):
361 5
    """
362
    Strip fluff from bar filename.
363
364
    :param name: Bar filename, must contain '-nto+armle-v7+signed.bar'.
365
    :type name: str
366
    """
367
    return name.replace("-nto+armle-v7+signed.bar", "")
368
369 5
370 5
def create_base_url(softwareversion):
371
    """
372 5
    Make the root URL for production server files.
373 5
374
    :param softwareversion: Software version to hash.
375
    :type softwareversion: str
376 5
    """
377
    # Hash software version
378
    swhash = hashlib.sha1(softwareversion.encode('utf-8'))
379
    hashedsoftwareversion = swhash.hexdigest()
380
    # Root of all urls
381
    baseurl = "http://cdn.fs.sl.blackberry.com/fs/qnx/production/{0}".format(hashedsoftwareversion)
382
    return baseurl
383 5
384 5
385
def format_app_name(appname):
386
    """
387 5
    Convert long reverse DNS name to short name.
388
389
    :param appname: Application name (ex. sys.pim.calendar -> "calendar")
390
    :type appname: str
391
    """
392
    final = appname.split(".")[-1]
393
    return final
394
395
396
def create_bar_url(softwareversion, appname, appversion, clean=False):
397
    """
398
    Make the URL for any production server file.
399
400
    :param softwareversion: Software version to hash.
401
    :type softwareversion: str
402
403 5
    :param appname: Application name, preferably like on server.
404 5
    :type appname: str
405 5
406 5
    :param appversion: Application version.
407
    :type appversion: str
408
409 5
    :param clean: Whether or not to clean up app name. Default is False.
410
    :type clean: bool
411
    """
412
    baseurl = create_base_url(softwareversion)
413
    if clean:
414
        appname = format_app_name(appname)
415
    return "{0}/{1}-{2}-nto+armle-v7+signed.bar".format(baseurl, appname, appversion)
416
417
418
def generate_urls(softwareversion, osversion, radioversion, core=False):
419
    """
420
    Generate a list of OS URLs and a list of radio URLs based on input.
421
422
    :param softwareversion: Software version to hash.
423
    :type softwareversion: str
424
425 5
    :param osversion: OS version.
426
    :type osversion: str
427
428
    :param radioversion: Radio version.
429
    :type radioversion: str
430
431 5
    :param core: Whether or not to return core URLs as well.
432
    :type core: bool
433
    """
434
    osurls = [
435
        create_bar_url(softwareversion, "winchester.factory_sfi.desktop", osversion),
436
        create_bar_url(softwareversion, "qc8960.factory_sfi.desktop", osversion),
437
        create_bar_url(softwareversion, "qc8960.factory_sfi.desktop", osversion),
438
        create_bar_url(softwareversion, "qc8974.factory_sfi.desktop", osversion)
439
    ]
440 5
    radiourls = [
441 5
        create_bar_url(softwareversion, "m5730", radioversion),
442 5
        create_bar_url(softwareversion, "qc8960", radioversion),
443 5
        create_bar_url(softwareversion, "qc8960.omadm", radioversion),
444 5
        create_bar_url(softwareversion, "qc8960.wtr", radioversion),
445
        create_bar_url(softwareversion, "qc8960.wtr5", radioversion),
446
        create_bar_url(softwareversion, "qc8930.wtr5", radioversion),
447 5
        create_bar_url(softwareversion, "qc8974.wtr2", radioversion)
448
    ]
449
    coreurls = []
450
    osurls, radiourls = filter_urls(osurls, radiourls, osversion)
451
    if core:
452
        coreurls = [x.replace(".desktop", "") for x in osurls]
453
    return osurls, radiourls, coreurls
454
455
456
def newer_103(splitos, third):
457 5
    """
458 5
    Return True if given split OS version is 10.3.X or newer.
459
460
    :param splitos: OS version, split on the dots: [10, 3, 3, 2205]
461 5
    :type: list(int)
462
463
    :param third: The X in 10.3.X.
464
    :type third: int
465
    """
466
    newer = True if ((splitos[1] >= 4) or (splitos[1] == 3 and splitos[2] >= third)) else False
467
    return newer
468
469
470
def filter_urls(osurls, radiourls, osversion):
471
    """
472
    Filter lists of OS and radio URLs.
473
474 5
    :param osurls: List of OS URLs.
475 5
    :type osurls: list(str)
476 5
477 5
    :param radiourls: List of radio URLs.
478 5
    :type radiourls: list(str)
479
480
    :param osversion: OS version.
481 5
    :type osversion: str
482
    """
483
    splitos = [int(i) for i in osversion.split(".")]
484
    osurls[2] = filter_1031(osurls[2], splitos, 5)  # Z3 10.3.1+
485
    osurls[3] = filter_1031(osurls[3], splitos, 6)  # Passport 10.3.1+
486
    osurls, radiourls = pop_stl1(osurls, radiourls, splitos)  # STL100-1 10.3.3+
487
    return osurls, radiourls
488
489
490
def filter_1031(osurl, splitos, device):
491
    """
492
    Modify URLs to reflect changes in 10.3.1.
493
494 5
    :param osurl: OS URL to modify.
495 5
    :type osurl: str
496 5
497 5
    :param splitos: OS version, split and cast to int: [10, 3, 2, 2876]
498
    :type splitos: list(int)
499
500 5
    :param device: Device to use.
501
    :type device: int
502
    """
503
    if newer_103(splitos, 1):
504
        filterdict = {5: ("qc8960.factory_sfi", "qc8960.factory_sfi_hybrid_qc8x30"), 6: ("qc8974.factory_sfi", "qc8960.factory_sfi_hybrid_qc8974")}
505
        osurl = filter_osversion(osurl, device, filterdict)
506
    return osurl
507
508
509
def pop_stl1(osurls, radiourls, splitos):
510
    """
511
    Replace STL100-1 links in 10.3.3+.
512
513 5
    :param osurls: List of OS platforms.
514 5
    :type osurls: list(str)
515 5
516 5
    :param radiourls: List of radio platforms.
517
    :type radiourls: list(str)
518
519 5
    :param splitos: OS version, split and cast to int: [10, 3, 3, 2205]
520
    :type splitos: list(int)
521
    """
522
    if newer_103(splitos, 3):
523
        osurls = osurls[1:]
524
        radiourls = radiourls[1:]
525
    return osurls, radiourls
526
527
528
def filter_osversion(osurl, device, filterdict):
529
    """
530
    Modify URLs based on device index and dictionary of changes.
531
532 5
    :param osurl: OS URL to modify.
533 5
    :type osurl: str
534 5
535
    :param device: Device to use.
536
    :type device: int
537 5
538
    :param filterdict: Dictionary of changes: {device : (before, after)}
539
    :type filterdict: dict(int:(str, str))
540
    """
541
    if device in filterdict.keys():
542
        osurl = osurl.replace(filterdict[device][0], filterdict[device][1])
543
    return osurl
544
545
546
def generate_lazy_urls(softwareversion, osversion, radioversion, device):
547
    """
548
    Generate a pair of OS/radio URLs based on input.
549
550
    :param softwareversion: Software version to hash.
551
    :type softwareversion: str
552
553 5
    :param osversion: OS version.
554 5
    :type osversion: str
555
556 5
    :param radioversion: Radio version.
557
    :type radioversion: str
558 5
559 5
    :param device: Device to use.
560 5
    :type device: int
561 5
    """
562 5
    splitos = [int(i) for i in osversion.split(".")]
563
    rads = ["m5730", "qc8960", "qc8960.omadm", "qc8960.wtr",
564
            "qc8960.wtr5", "qc8930.wtr4", "qc8974.wtr2"]
565 5
    oses = ["winchester.factory", "qc8960.factory", "qc8960.verizon",
566
            "qc8974.factory"]
567
    maps = {0:0, 1:1, 2:2, 3:1, 4:1, 5:1, 6:3}
568
    osurl = create_bar_url(softwareversion, "{0}_sfi.desktop".format(oses[maps[device]]), osversion)
569
    radiourl = create_bar_url(softwareversion, rads[device], radioversion)
570
    osurl = filter_1031(osurl, splitos, device)
571
    return osurl, radiourl
572
573
574
def bulk_urls(softwareversion, osversion, radioversion, core=False, altsw=None):
575
    """
576
    Generate all URLs, plus extra Verizon URLs.
577
578
    :param softwareversion: Software version to hash.
579
    :type softwareversion: str
580
581
    :param osversion: OS version.
582
    :type osversion: str
583
584
    :param radioversion: Radio version.
585
    :type radioversion: str
586
587 5
    :param device: Device to use.
588 5
    :type device: int
589 5
590 5
    :param core: Whether or not to return core URLs as well.
591 5
    :type core: bool
592 5
593 5
    :param altsw: Radio software release, if not the same as OS.
594 5
    :type altsw: str
595 5
    """
596 5
    baseurl = create_base_url(softwareversion)
597 5
    osurls, radurls, coreurls = generate_urls(softwareversion, osversion, radioversion, core)
598 5
    vzwos, vzwrad = generate_lazy_urls(softwareversion, osversion, radioversion, 2)
599 5
    osurls.append(vzwos)
600 5
    radurls.append(vzwrad)
601
    vzwcore = vzwos.replace("sfi.desktop", "sfi")
602
    if core:
603 5
        coreurls.append(vzwcore)
604
    osurls = list(set(osurls))  # pop duplicates
605
    radurls = list(set(radurls))
606
    if core:
607
        coreurls = list(set(coreurls))
608
    radurls = bulk_urls_altsw(radurls, baseurl, altsw)
609
    return osurls, coreurls, radurls
610
611
612
def bulk_urls_altsw(radurls, baseurl, altsw=None):
613
    """
614
    Handle alternate software release for radio.
615
616 5
    :param radurls: List of radio URLs.
617 5
    :type radurls: list(str)
618 5
619 5
    :param baseurl: Base URL (from http to hashed SW release).
620 5
    :type baseurl: str
621 5
622
    :param altsw: Radio software release, if not the same as OS.
623
    :type altsw: str
624 5
    """
625
    if altsw is not None:
626
        altbase = create_base_url(altsw)
627
        radiourls2 = [rad.replace(baseurl, altbase) for rad in radurls]
628 5
        radurls = radiourls2
629 5
        del radiourls2
630
    return radurls
631
632 5
633
def line_begin():
634
    """
635
    Go to beginning of line, to overwrite whatever's there.
636 5
    """
637 5
    sys.stdout.write("\r")
638
    sys.stdout.flush()
639
640 5
641
def spinner_clear():
642
    """
643
    Get rid of any spinner residue left in stdout.
644
    """
645 5
    sys.stdout.write("\b \b")
646
    sys.stdout.flush()
647
648
649 5
class Spinner(object):
650 5
    """
651
    A basic spinner using itertools. No need for progress.
652 5
    """
653
654
    def __init__(self):
655
        """
656 5
        Generate the itertools wheel.
657 5
        """
658 5
        self.wheel = itertools.cycle(['-', '/', '|', '\\'])
659 5
        self.file = dummy.UselessStdout()
660 5
661
    def after(self):
662
        """
663
        Iterate over itertools.cycle, write to file.
664 5
        """
665
        try:
666
            self.file.write(next(self.wheel))
667
            self.file.flush()
668 5
            self.file.write("\b\r")
669
            self.file.flush()
670
        except (KeyboardInterrupt, SystemExit):
671 5
            self.stop()
672
673
    def stop(self):
674
        """
675
        Kill output.
676 5
        """
677
        self.file = dummy.UselessStdout()
678
679
680 5
class SpinManager(object):
681 5
    """
682 5
    Wraps around the itertools spinner, runs it in another thread.
683 5
    """
684 5
685 5
    def __init__(self):
686
        """
687 5
        Start the spinner thread.
688
        """
689
        spinner = Spinner()
690
        self.spinner = spinner
691 5
        self.thread = threading.Thread(target=self.loop, args=())
692 5
        self.thread.daemon = True
693 5
        self.scanning = False
694
        self.spinner.file = dummy.UselessStdout()
695 5
696
    def start(self):
697
        """
698
        Begin the spinner.
699 5
        """
700 5
        self.spinner.file = sys.stderr
701 5
        self.scanning = True
702 5
        self.thread.start()
703 5
704
    def loop(self):
705
        """
706
        Spin if scanning, clean up if not.
707
        """
708 5
        while self.scanning:
709
            time.sleep(0.5)
710
            try:
711
                line_begin()
712 5
                self.spinner.after()
713 5
            except (KeyboardInterrupt, SystemExit):
714 5
                self.scanning = False
715 5
                self.stop()
716 5
717 5
    def stop(self):
718
        """
719
        Stop the spinner.
720 5
        """
721
        self.spinner.stop()
722
        self.scanning = False
723
        spinner_clear()
724
        line_begin()
725
        if not is_windows():
726
            print("\n")
727 5
728 5
729 5
def return_and_delete(target):
730 5
    """
731
    Read text file, then delete it. Return contents.
732
733 5
    :param target: Text file to read.
734
    :type target: str
735
    """
736
    with open(target, "r") as thefile:
737
        content = thefile.read()
738
    os.remove(target)
739
    return content
740 5
741 5
742
def verify_loader_integrity(loaderfile):
743 5
    """
744 5
    Test for created loader integrity. Windows-only.
745 5
746 5
    :param loaderfile: Path to loader.
747 5
    :type loaderfile: str
748 5
    """
749 5
    if not is_windows():
750 5
        pass
751
    else:
752
        excode = None
753 5
        try:
754
            with open(os.devnull, 'rb') as dnull:
755
                cmd = "{0} fileinfo".format(loaderfile)
756
                excode = subprocess.call(cmd, stdout=dnull, stderr=subprocess.STDOUT)
757
        except OSError:
758
            excode = -1
759
        return excode == 0  # 0 if OK, non-zero if something broke
760 5
761 5
762 5
def bulkfilter_printer(afile):
763
    """
764
    Print filename and verify a loader file.
765 5
766
    :param afile: Path to file.
767
    :type afile: str
768
    """
769
    print("TESTING: {0}".format(os.path.basename(afile)))
770
    if not verify_loader_integrity(afile):
771
        return os.path.basename(afile)
772 5
773 5
774
def bulkfilter(files):
775
    """
776 5
    Verify all loader files in a given list.
777
778
    :param files: List of files.
779
    :type files: list(str)
780
    """
781
    brokens = [bulkfilter_printer(file) for file in files if prepends(os.path.basename(file), bbconstants.PREFIXES, ".exe")]
782
    return brokens
783 5
784 5
785
def verify_bulk_loaders(ldir):
786 5
    """
787 5
    Run :func:`verify_loader_integrity` for all files in a dir.
788 5
789
    :param ldir: Directory to use.
790
    :type ldir: str
791 5
    """
792
    if not is_windows():
793
        pass
794
    else:
795
        files = verify_bulk_loaders_filefilter(ldir)
796
        brokens = verify_bulk_loaders_brokens(files)
797
        return brokens
798 5
799 5
800
def verify_bulk_loaders_filefilter(ldir):
801
    """
802 5
    Prepare file names for :func:`verify_bulk_loaders`.
803
804
    :param ldir: Directory to use.
805
    :type ldir: str
806
    """
807
    files = [os.path.join(ldir, file) for file in os.listdir(ldir) if not os.path.isdir(file)]
808
    return files
809 5
810 5
811
def verify_bulk_loaders_brokens(files):
812
    """
813 5
    Prepare filtered file list for :func:`verify_bulk_loaders`.
814
815
    :param files: List of files.
816
    :type files: list(str)
817
    """
818
    brokens = [file for file in bulkfilter(files) if file]
819
    return brokens
820
821
822
def list_workers(input_data, workerlimit):
823 5
    """
824 5
    Count number of threads, either length of iterable or provided limit.
825
826
    :param input_data: Input data, some iterable.
827 5
    :type input_data: list
828
829
    :param workerlimit: Maximum number of workers.
830
    :type workerlimit: int
831
    """
832
    runners = len(input_data) if len(input_data) < workerlimit else workerlimit
833
    return runners
834 5
835
836
def cpu_workers(input_data):
837 5
    """
838
    Count number of CPU workers, smaller of number of threads and length of data.
839
840
    :param input_data: Input data, some iterable.
841 5
    :type input_data: list
842 5
    """
843 5
    return list_workers(input_data, compat.enum_cpus())
844 5
845 5
846
def prep_logfile():
847
    """
848 5
    Prepare log file, labeling it with current date. Select folder based on frozen status.
849
    """
850
    logfile = "{0}.txt".format(time.strftime("%Y_%m_%d_%H%M%S"))
851
    basefolder = prep_logfile_folder()
852 5
    record = os.path.join(basefolder, logfile)
853 5
    open(record, "w").close()
854 5
    return record
855
856 5
857 5
def prep_logfile_folder():
858
    """
859
    Prepare folder to write log file to.
860 5
    """
861
    if getattr(sys, 'frozen', False):
862
        basefolder = os.path.join(os.getcwd(), "lookuplogs")
863
        os.makedirs(basefolder, exist_ok=True)
864
    else:
865
        basefolder = iniconfig.config_homepath(None, True)
866
    return basefolder
867
868
869
def prepends(file, pre, suf):
870
    """
871
    Check if filename starts with/ends with stuff.
872
873 5
    :param file: File to check.
874
    :type file: str
875
876 5
    :param pre: Prefix(es) to check.
877
    :type pre: str or list or tuple
878
879
    :param suf: Suffix(es) to check.
880
    :type suf: str or list or tuple
881
    """
882
    return file.startswith(pre) and file.endswith(suf)
883 5
884 5
885
def lprint(iterable):
886
    """
887 5
    A oneliner for 'for item in x: print item'.
888
889
    :param iterable: Iterable to print.
890
    :type iterable: list/tuple
891
    """
892
    for item in iterable:
893
        print(item)
894 5
895 5
896 5
def cappath_config_loader(homepath=None):
897
    """
898
    Read a ConfigParser file to get cap preferences.
899 5
900
    :param homepath: Folder containing ini file. Default is user directory.
901
    :type homepath: str
902
    """
903
    capini = iniconfig.generic_loader('cappath', homepath)
904
    cappath = capini.get('path', fallback=bbconstants.CAP.location)
905
    return cappath
906
907
908
def cappath_config_writer(cappath=None, homepath=None):
909 5
    """
910 5
    Write a ConfigParser file to store cap preferences.
911 5
912
    :param cappath: Method to use.
913
    :type cappath: str
914 5
915
    :param homepath: Folder containing ini file. Default is user directory.
916
    :type homepath: str
917
    """
918
    cappath = grab_cap() if cappath is None else cappath
919
    results = {"path": cappath}
920
    iniconfig.generic_writer("cappath", results, homepath)
921
922
923
def one_and_none(first, second):
924 5
    """
925 5
    Check if one element in a pair is None and one isn't.
926
927
    :param first: To return True, this must be None.
928 5
    :type first: str
929
930
    :param second: To return True, this mustbe false.
931
    :type second: str
932
    """
933
    sentinel = True if first is None and second is not None else False
934
    return sentinel
935 5
936
937
def def_args(dirs):
938 5
    """
939
    Return prepared argument list for most instances of :func:`cond_check:`.
940
941
    :param dirs: List of directories.
942
    :type dirs: list(str)
943
    """
944
    return [dirs[4], dirs[5], dirs[2], dirs[3]]
945
946
947
def cond_do(dofunc, goargs, restargs=None, condition=True):
948
    """
949
    Do a function, check a condition, then do same function but swap first argument.
950
951
    :param dofunc: Function to do.
952
    :type dofunc: function
953
954 5
    :param goargs: List of variable arguments.
955 5
    :type goargs: list(str)
956 5
957 5
    :param restargs: Rest of arguments, which are constant.
958
    :type restargs: list(str)
959
960 5
    :param condition: Condition to check in order to use secondarg.
961
    :type condition: bool
962
    """
963
    restargs = [] if restargs is None else restargs
964
    dofunc(goargs[0], *restargs)
965
    if condition:
966
        dofunc(goargs[1], *restargs)
967
968
969
def cond_check(dofunc, goargs, restargs=None, condition=True, checkif=True, checkifnot=True):
970
    """
971
    Do :func:`cond_do` based on a condition, then do it again based on a second condition.
972
973
    :param dofunc: Function to do.
974
    :type dofunc: function
975
976
    :param goargs: List of variable arguments.
977
    :type goargs: list(str)
978
979
    :param restargs: Rest of arguments, which are constant.
980
    :type restargs: list(str)
981
982 5
    :param condition: Condition to check in order to use secondarg.
983 5
    :type condition: bool
984 5
985 5
    :param checkif: Do :func:`cond_do` if this is True.
986
    :type checkif: bool
987
988
    :param checkifnot: Do :func:`cond_do` if this is False.
989
    :type checkifnot: bool
990
    """
991
    if checkif:
992
        cond_do(dofunc, goargs[0:2], restargs, condition)
993
    if not checkifnot:
994
        cond_do(dofunc, goargs[2:4], restargs, condition)
995