Test Failed
Branch master (c56792)
by John
02:11
created

bbarchivist.scriptutils.prep_export_cchecker()   B

Complexity

Conditions 2

Size

Total Lines 37
Code Lines 9

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 9
CRAP Score 2

Importance

Changes 0
Metric Value
cc 2
eloc 9
nop 8
dl 0
loc 37
rs 8.8571
c 0
b 0
f 0
ccs 9
cts 9
cp 1
crap 2

How to fix   Many Parameters   

Many Parameters

Methods with many parameters are not only hard to understand, but their parameters also often become inconsistent when you need more, or different data.

There are several approaches to avoid long parameter lists:

1
#!/usr/bin/env python3
0 ignored issues
show
coding-style introduced by
Too many lines in module (1813/1000)
Loading history...
2 5
"""This module contains various utilities for the scripts folder."""
3
4 5
import argparse  # generic parser
5 5
import collections  # defaultdict
6 5
import getpass  # invisible password
7 5
import glob  # file lookup
8 5
import hashlib  # hashes
9 5
import os  # path work
10 5
import shutil  # folder removal
11 5
import subprocess  # running cfp/cap
12 5
import sys  # getattr
13 5
import threading  # run stuff in background
14
15 5
import requests  # session
16 5
from bbarchivist import archiveutils  # archive support
17 5
from bbarchivist import barutils  # file system work
18 5
from bbarchivist import bbconstants  # constants
19 5
from bbarchivist import decorators  # decorating functions
20 5
from bbarchivist import gpgutils  # gpg
21 5
from bbarchivist import hashutils  # file hashes
22 5
from bbarchivist import networkutils  # network tools
23 5
from bbarchivist import smtputils  # email
24 5
from bbarchivist import sqlutils  # sql
25 5
from bbarchivist import textgenerator  # writing text to file
26 5
from bbarchivist import utilities  # little things
27
28 5
__author__ = "Thurask"
29 5
__license__ = "WTFPL v2"
30 5
__copyright__ = "2015-2018 Thurask"
31
32
33 5
def shortversion():
34
    """
35
    Get short app version (Git tag).
36
    """
37 5
    if not getattr(sys, 'frozen', False):
38 5
        ver = bbconstants.VERSION
39
    else:
40 5
        verfile = glob.glob(os.path.join(os.getcwd(), "version.txt"))[0]
41 5
        with open(verfile) as afile:
42 5
            ver = afile.read()
43 5
    return ver
44
45
46 5
def longversion():
47
    """
48
    Get long app version (Git tag + commits + hash).
49
    """
50 5
    if not getattr(sys, 'frozen', False):
51 5
        ver = (bbconstants.LONGVERSION, bbconstants.COMMITDATE)
52
    else:
53 5
        verfile = glob.glob(os.path.join(os.getcwd(), "longversion.txt"))[0]
54 5
        with open(verfile) as afile:
55 5
            ver = afile.read().split("\n")
56 5
    return ver
57
58
59 5
def default_parser_vers(vers=None):
60
    """
61
    Prepare version for default parser.
62
63
    :param vers: Versions: [git commit hash, git commit date]
64
    :param vers: list(str)
65
    """
66 5
    if vers is None:
67 5
        vers = longversion()
68 5
    return vers
69
70
71 5
def default_parser_flags(parser, flags=None):
72
    """
73
    Handle flags for default parser.
74
75
    :param parser: Parser to modify.
76
    :type parser: argparse.ArgumentParser
77
78
    :param flags: Tuple of sections to add.
79
    :type flags: tuple(str)
80
    """
81 5
    if flags is not None:
82 5
        parser = dpf_flags_folder(parser, flags)
83 5
        parser = dpf_flags_osr(parser, flags)
84 5
    return parser
85
86
87 5
def dpf_flags_folder(parser, flags=None):
88
    """
89
    Add generic folder flag to parser.
90
91
    :param parser: Parser to modify.
92
    :type parser: argparse.ArgumentParser
93
94
    :param flags: Tuple of sections to add.
95
    :type flags: tuple(str)
96
    """
97 5
    if "folder" in flags:
98 5
        parser.add_argument("-f",
99 5
                            "--folder",
100
                            dest="folder",
101
                            help="Working folder",
102 5
                            default=None,
103
                            metavar="DIR",
104
                            type=utilities.file_exists)
105
    return parser
106
107
108
def dpf_flags_osr(parser, flags=None):
109
    """
110
    Add generic OS/radio/software flags to parser.
111
112 5
    :param parser: Parser to modify.
113 5
    :type parser: argparse.ArgumentParser
114 5
115 5
    :param flags: Tuple of sections to add.
116 5
    :type flags: tuple(str)
117
    """
118
    if "osr" in flags:
119 5
        parser.add_argument("os", help="OS version")
120
        parser.add_argument("radio",
121
                            help="Radio version, 10.x.y.zzzz",
122
                            nargs="?",
123
                            default=None)
124
        parser.add_argument("swrelease",
125
                            help="Software version, 10.x.y.zzzz",
126
                            nargs="?",
127
                            default=None)
128
    return parser
129
130
131
def default_parser(name=None, desc=None, flags=None, vers=None):
132
    """
133
    A generic form of argparse's ArgumentParser.
134
135 5
    :param name: App name.
136 5
    :type name: str
137 5
138 5
    :param desc: App description.
139 5
    :type desc: str
140
141
    :param flags: Tuple of sections to add.
142 5
    :type flags: tuple(str)
143
144
    :param vers: Versions: [git commit hash, git commit date]
145
    :param vers: list(str)
146
    """
147
    vers = default_parser_vers(vers)
148
    homeurl = "https://github.com/thurask/bbarchivist"
149
    parser = argparse.ArgumentParser(prog=name, description=desc, epilog=homeurl)
150
    parser.add_argument("-v",
151
                        "--version",
152
                        action="version",
153
                        version="{0} {1} committed {2}".format(parser.prog, vers[0], vers[1]))
154
    parser = default_parser_flags(parser, flags)
155
    return parser
156
157
158 5
def generic_windows_shim(scriptname, scriptdesc, target, version):
159 5
    """
160 5
    Generic CFP/CAP runner; Windows only.
161 5
162 5
    :param scriptname: Script name, 'bb-something'.
163 5
    :type scriptname: str
164
165 5
    :param scriptdesc: Script description, i.e. scriptname -h.
166
    :type scriptdesc: str
167
168 5
    :param target: Path to file to execute.
169
    :type target: str
170
171
    :param version: Version of target.
172
    :type version: str
173
    """
174
    parser = default_parser(scriptname, scriptdesc)
175
    capver = "|{0}".format(version)
176
    parser = external_version(parser, capver)
177
    parser.parse_known_args(sys.argv[1:])
178 5
    if utilities.is_windows():
179 5
        subprocess.call([target] + sys.argv[1:])
180
    else:
181
        print("Sorry, Windows only.")
182 5
183
184
def arg_verify_none(argval, message):
185
    """
186
    Check if an argument is None, error out if it is.
187
188
    :param argval: Argument to check.
189
    :type argval: str
190
191
    :param message: Error message to print.
192 5
    :type message: str
193 5
    """
194 5
    if argval is None:
195
        raise argparse.ArgumentError(argument=None, message=message)
196
197 5
198
def external_version(parser, addition):
199
    """
200
    Modify the version string of argparse.ArgumentParser, adding something.
201
202
    :param parser: Parser to modify.
203
    :type parser: argparse.ArgumentParser
204
205
    :param addition: What to add.
206
    :type addition: str
207 5
    """
208 5
    verarg = [arg for arg in parser._actions if isinstance(arg, argparse._VersionAction)][0]
209 5
    verarg.version = "{1}{0}".format(addition, verarg.version)
210
    return parser
211
212 5
213
def return_radio_version(osversion, radioversion=None):
214
    """
215
    Increment radio version, if need be.
216
217
    :param osversion: OS version.
218
    :type osversion: str
219 5
220 5
    :param radioversion: Radio version, None if incremented.
221 5
    :type radioversion: str
222 5
    """
223 5
    if radioversion is None:
224 5
        radioversion = utilities.increment(osversion, 1)
225
    return radioversion
226 5
227 5
228
def sw_check_contingency(softwareversion):
229 5
    """
230 5
    Ask in the event software release isn't found.
231
232
    :param softwareversion: Software release version.
233 5
    :type softwareversion: str
234
    """
235
    if softwareversion == "SR not in system":
236
        print("SOFTWARE RELEASE NOT FOUND")
237
        cont = utilities.i2b("INPUT MANUALLY? Y/N: ")
238
        if cont:
239
            softwareversion = input("SOFTWARE RELEASE: ")
240
            swchecked = False
241
        else:
242
            print("\nEXITING...")
243 5
            raise SystemExit  # bye bye
244 5
    else:
245 5
        swchecked = True
246 5
    return softwareversion, swchecked
247
248 5
249 5
def return_sw_checked(softwareversion, osversion):
250
    """
251
    Check software existence, return boolean.
252 5
253
    :param softwareversion: Software release version.
254
    :type softwareversion: str
255
256
    :param osversion: OS version.
257
    :type osversion: str
258
    """
259
    if softwareversion is None:
260
        serv = bbconstants.SERVERS["p"]
261
        softwareversion = networkutils.sr_lookup(osversion, serv)
262 5
        softwareversion, swchecked = sw_check_contingency(softwareversion)
263 5
    else:
264 5
        swchecked = True
265 5
    return softwareversion, swchecked
266 5
267
268 5
def return_radio_sw_checked(altsw, radioversion):
269 5
    """
270
    Check radio software existence, return boolean.
271
272 5
    :param altsw: Software release version.
273
    :type altsw: str
274
275
    :param radioversion: Radio version.
276
    :type radioversion: str
277
    """
278
    if altsw == "checkme":
279
        serv = bbconstants.SERVERS["p"]
280
        testos = utilities.increment(radioversion, -1)
281
        altsw = networkutils.sr_lookup(testos, serv)
282
        altsw, altchecked = sw_check_contingency(altsw)
283
    else:
284
        altchecked = True
285
    return altsw, altchecked
286
287
288 5
def check_sw(baseurl, softwareversion, swchecked, altsw=False):
289 5
    """
290 5
    Check existence of software release.
291 5
292
    :param baseurl: Base URL (from http to hashed SW release).
293 5
    :type baseurl: str
294
295
    :param softwareversion: Software release.
296 5
    :type softwareversion: str
297
298
    :param swchecked: If we checked the sw release already.
299
    :type swchecked: bool
300
301
    :param altsw: If this is the radio-only release. Default is false.
302
    :type altsw: bool
303
    """
304
    message = "CHECKING RADIO SOFTWARE RELEASE..." if altsw else "CHECKING SOFTWARE RELEASE..."
305
    print(message)
306 5
    if not swchecked:
307 5
        check_sw_actual(baseurl, softwareversion)
308 5
    else:
309
        print("SOFTWARE RELEASE {0} EXISTS".format(softwareversion))
310 5
311
312
def check_sw_actual(baseurl, softwareversion):
313 5
    """
314
    Get the status of a software release.
315
316
    :param baseurl: Base URL (from http to hashed SW release).
317
    :type baseurl: str
318
319
    :param softwareversion: Software release.
320 5
    :type softwareversion: str
321 5
    """
322 5
    avlty = networkutils.availability(baseurl)
323 5
    if avlty:
324 5
        print("SOFTWARE RELEASE {0} EXISTS".format(softwareversion))
325
    else:
326
        check_sw_handle(softwareversion)
327 5
328
329
def check_sw_handle(softwareversion):
330
    """
331
    Handle non-existent software release.
332
333
    :param softwareversion: Software release.
334
    :type softwareversion: str
335
    """
336
    print("SOFTWARE RELEASE {0} NOT FOUND".format(softwareversion))
337
    cont = utilities.i2b("CONTINUE? Y/N: ")
338
    if not cont:
339
        print("\nEXITING...")
340 5
        raise SystemExit
341
342
343 5
def check_radio_sw(alturl, altsw, altchecked):
344
    """
345
    Check existence of radio software release.
346
347
    :param alturl: Radio base URL (from http to hashed SW release).
348
    :type alturl: str
349
350 5
    :param altsw: Radio software release.
351 5
    :type altsw: str
352 5
353 5
    :param altchecked: If we checked the sw release already.
354
    :type altchecked: bool
355 5
    """
356 5
    return check_sw(alturl, altsw, altchecked, True)
357
358
359 5
def check_altsw(altcheck=False):
360
    """
361
    Ask for and return alternate software release, if needed.
362
363
    :param altcheck: If we're using an alternate software release.
364
    :type altcheck: bool
365
    """
366
    if altcheck:
367
        altsw = input("RADIO SOFTWARE RELEASE (PRESS ENTER TO GUESS): ")
368
        if not altsw:
369
            altsw = "checkme"
370
    else:
371
        altsw = None
372 5
    return altsw
373 5
374 5
375 5
def check_os_single(osurl, osversion, device):
376 5
    """
377 5
    Check existence of single OS link.
378 5
379
    :param radiourl: Radio URL to check.
380
    :type radiourl: str
381 5
382
    :param radioversion: Radio version.
383
    :type radioversion: str
384
385
    :param device: Device family.
386
    :type device: int
387
    """
388 5
    osav = networkutils.availability(osurl)
389 5
    if not osav:
390 5
        print("{0} NOT AVAILABLE FOR {1}".format(osversion, bbconstants.DEVICES[device]))
391 5
        cont = utilities.i2b("CONTINUE? Y/N: ")
392 5
        if not cont:
393
            print("\nEXITING...")
394 5
            raise SystemExit
395
396
397 5
def check_os_bulk(osurls):
398
    """
399
    Check existence of list of OS links.
400
401 5
    :param osurls: OS URLs to check.
402 5
    :type osurls: list(str)
403 5
    """
404 5
    sess = requests.Session()
405 5
    for url in osurls:
406
        osav = networkutils.availability(url, sess)
407
        if osav:
408 5
            break
409
    else:
410
        check_os_bulk_handle()
411
412
413
def check_os_bulk_handle():
414
    """
415
    Handle no existing OS links.
416
    """
417
    print("OS VERSION NOT FOUND")
418 5
    cont = utilities.i2b("CONTINUE? Y/N: ")
419 5
    if not cont:
420 5
        print("\nEXITING...")
421 5
        raise SystemExit
422 5
423 5
424 5
def check_radio_single(radiourl, radioversion):
425 5
    """
426
    Check existence of single radio link.
427 5
428 5
    :param radiourl: Radio URL to check.
429 5
    :type radiourl: str
430 5
431 5
    :param radioversion: Radio version.
432
    :type radioversion: str
433
    """
434 5
    radav = networkutils.availability(radiourl)
435
    if not radav:
436
        print("RADIO VERSION NOT FOUND")
437
        cont = utilities.i2b("INPUT MANUALLY? Y/N: ")
438
        if cont:
439
            rad2 = input("RADIO VERSION: ")
440
            radiourl = radiourl.replace(radioversion, rad2)
441
            radioversion = rad2
442
        else:
443
            going = utilities.i2b("KEEP GOING? Y/N: ")
444 5
            if not going:
445 5
                print("\nEXITING...")
446 5
                raise SystemExit
447 5
    return radiourl, radioversion
448 5
449
450 5
def check_radio_bulk(radiourls, radioversion):
451 5
    """
452
    Check existence of list of radio links.
453
454 5
    :param radiourls: Radio URLs to check.
455
    :type radiourls: list(str)
456
457
    :param radioversion: Radio version.
458
    :type radioversion: str
459
    """
460
    sess = requests.Session()
461
    for url in radiourls:
462
        radav = networkutils.availability(url, sess)
463
        if radav:
464 5
            break
465 5
    else:
466 5
        radiourls, radioversion = check_radio_bulk_notfound(radiourls, radioversion)
467 5
    return radiourls, radioversion
468
469 5
470 5
def check_radio_bulk_notfound(radiourls, radioversion):
471
    """
472
    What to do if radio links aren't found.
473 5
474
    :param radiourls: Radio URLs to check.
475
    :type radiourls: list(str)
476
477
    :param radioversion: Radio version.
478
    :type radioversion: str
479
    """
480
    print("RADIO VERSION NOT FOUND")
481
    cont = utilities.i2b("INPUT MANUALLY? Y/N: ")
482
    if cont:
483 5
        radiourls, radioversion = check_radio_bulk_go(radiourls, radioversion)
484 5
    else:
485 5
        check_radio_bulk_stop()
486 5
    return radiourls, radioversion
487
488
489 5
def check_radio_bulk_go(radiourls, radioversion):
490
    """
491
    Replace radio version and URLs, and keep going.
492
493 5
    :param radiourls: Radio URLs to check.
494 5
    :type radiourls: list(str)
495 5
496 5
    :param radioversion: Radio version.
497
    :type radioversion: str
498
    """
499 5
    rad2 = input("RADIO VERSION: ")
500
    radiourls = [url.replace(radioversion, rad2) for url in radiourls]
501
    radioversion = rad2
502
    return radiourls, radioversion
503
504
505
def check_radio_bulk_stop():
506 5
    """
507 5
    Ask if we should keep going once no radio has been found.
508 5
    """
509
    going = utilities.i2b("KEEP GOING? Y/N: ")
510
    if not going:
511 5
        print("\nEXITING...")
512
        raise SystemExit
513
514
515
def bulk_avail(urllist):
516
    """
517
    Filter 404 links out of URL list.
518
519
    :param urllist: URLs to check.
520
    :type urllist: list(str)
521 5
    """
522 5
    sess = requests.Session()
523 5
    url2 = [x for x in urllist if networkutils.availability(x, sess)]
524
    return url2
525
526 5
527
def get_baseurls(softwareversion, altsw=None):
528
    """
529
    Generate base URLs for bar links.
530
531
    :param softwareversion: Software version.
532
    :type softwareversion: str
533 5
534 5
    :param altsw: Radio software version, if necessary.
535
    :type altsw: str
536 5
    """
537 5
    baseurl = utilities.create_base_url(softwareversion)
538 5
    alturl = utilities.create_base_url(altsw) if altsw else None
539 5
    return baseurl, alturl
540 5
541
542 5
def get_sz_executable(compmethod):
543 5
    """
544 5
    Get 7z executable.
545 5
546 5
    :param compmethod: Compression method.
547 5
    :type compmethod: str
548
    """
549 5
    if compmethod != "7z":
550 5
        szexe = ""
551 5
    else:
552
        print("CHECKING PRESENCE OF 7ZIP...")
553
        psz = utilities.prep_seven_zip(True)
554 5
        if psz:
555
            print("7ZIP OK")
556
            szexe = utilities.get_seven_zip(False)
557
        else:
558
            szexe = ""
559
            print("7ZIP NOT FOUND")
560
            cont = utilities.i2b("CONTINUE? Y/N ")
561
            if cont:
562
                print("FALLING BACK TO ZIP...")
563
                compmethod = "zip"
564 5
            else:
565 5
                print("\nEXITING...")
566 5
                raise SystemExit  # bye bye
567 5
    return compmethod, szexe
568 5
569 5
570 5
def test_bar_files(localdir, urllist):
571 5
    """
572
    Test bar files after download.
573 5
574
    :param localdir: Directory.
575
    :type localdir: str
576 5
577
    :param urllist: List of URLs to check.
578
    :type urllist: list(str)
579
    """
580
    print("TESTING BAR FILES...")
581
    brokenlist = []
582
    for file in os.listdir(localdir):
583
        brokenlist = test_bar_files_individual(file, localdir, urllist, brokenlist)
584
    if brokenlist:
585
        print("SOME FILES ARE BROKEN!")
586
        utilities.lprint(brokenlist)
587
        raise SystemExit
588
    else:
589
        print("BAR FILES DOWNLOADED OK")
590
591
592 5
def test_bar_files_individual(file, localdir, urllist, brokenlist):
593 5
    """
594 5
    Test bar file after download.
595 5
596 5
    :param file: Bar file to check.
597 5
    :type file: str
598
599
    :param localdir: Directory.
600 5
    :type localdir: str
601
602
    :param urllist: List of URLs to check.
603
    :type urllist: list(str)
604
605
    :param brokenlist: List of URLs to download later.
606
    :type brokenlist: list(str)
607
    """
608
    if file.endswith(".bar"):
609
        print("TESTING: {0}".format(file))
610
        thepath = os.path.abspath(os.path.join(localdir, file))
611
        brokens = barutils.bar_tester(thepath)
612
        brokenlist = bar_broken_individual(brokens, urllist, brokenlist)
613 5
    return brokenlist
614 5
615 5
616 5
def bar_broken_individual(brokens, urllist, brokenlist):
617 5
    """
618 5
    What to do if a downloaded bar file is broken.
619
620
    :param brokens: None if bar is OK, filename if it is not.
621 5
    :type brokens: str
622
623
    :param urllist: List of URLs to check.
624
    :type urllist: list(str)
625
626
    :param brokenlist: List of URLs to download later.
627
    :type brokenlist: list(str)
628 5
    """
629 5
    if brokens is not None:
630 5
        os.remove(brokens)
631 5
        for url in urllist:
632 5
            if brokens in url:
633 5
                brokenlist.append(url)
634 5
    return brokenlist
635 5
636 5
637
def test_signed_files(localdir):
638 5
    """
639
    Test signed files after extract.
640
641 5
    :param localdir: Directory.
642
    :type localdir: str
643
    """
644
    print("TESTING SIGNED FILES...")
645
    for file in os.listdir(localdir):
646
        if file.endswith(".bar"):
647
            print("TESTING: {0}".format(file))
648 5
            signname, signhash = barutils.retrieve_sha512(os.path.join(localdir, file))
649 5
            sha512ver = barutils.verify_sha512(os.path.join(localdir, signname.decode("utf-8")), signhash)
650
            if not sha512ver:
651 5
                print("{0} IS BROKEN".format((file)))
652 5
                break
653 5
    else:
654 5
        print("ALL FILES EXTRACTED OK")
655 5
656 5
657
def test_loader_files(localdir):
658 5
    """
659
    Test loader files after creation.
660
661 5
    :param localdir: Directory.
662
    :type localdir: str
663
    """
664
    if not utilities.is_windows():
665
        pass
666
    else:
667
        print("TESTING LOADER FILES...")
668 5
        brokens = utilities.verify_bulk_loaders(localdir)
669 5
        if brokens:
670
            print("BROKEN FILES:")
671 5
            utilities.lprint(brokens)
672 5
            raise SystemExit
673 5
        else:
674 5
            print("ALL FILES CREATED OK")
675
676 5
677
def test_single_loader(loaderfile):
678
    """
679 5
    Test single loader file after creation.
680
681
    :param loaderfile: File to check.
682
    :type loaderfile: str
683
    """
684
    if not utilities.is_windows():
685
        pass
686
    else:
687
        print("TESTING LOADER...")
688
        if not utilities.verify_loader_integrity(loaderfile):
689
            print("{0} IS BROKEN!".format(os.path.basename(loaderfile)))
690
            raise SystemExit
691
        else:
692
            print("LOADER CREATED OK")
693
694
695 5
def prod_avail(results, mailer=False, osversion=None, password=None):
696 5
    """
697 5
    Clean availability for production lookups for autolookup script.
698 5
699 5
    :param results: Result dict.
700 5
    :type results: dict(str: str)
701 5
702
    :param mailer: If we're mailing links. Default is false.
703 5
    :type mailer: bool
704 5
705 5
    :param osversion: OS version.
706
    :type osversion: str
707
708 5
    :param password: Email password.
709
    :type password: str
710
    """
711
    prel = results['p']
712
    if prel != "SR not in system" and prel is not None:
713
        pav = "PD"
714
        baseurl = utilities.create_base_url(prel)
715
        avail = networkutils.availability(baseurl)
716
        is_avail = "Available" if avail else "Unavailable"
717
        prod_avail_mailprep(prel, avail, osversion, mailer, password)
718
    else:
719
        pav = "  "
720
        is_avail = "Unavailable"
721
    return prel, pav, is_avail
722
723
724
def prod_avail_mailprep(prel, avail, osversion=None, mailer=False, password=None):
725
    """
726
    Do SQL/SMTP prep work after a good production lookup hit.
727 5
728 5
    :param prel: Software lookup result.
729 5
    :type prel: str
730 5
731 5
    :param avail: If software lookup result is available for download.
732 5
    :type avail: bool
733
734
    :param osversion: OS version.
735 5
    :type osversion: str
736
737
    :param mailer: If we're mailing links. Default is false.
738
    :type mailer: bool
739
740
    :param password: Email password.
741
    :type password: str
742
    """
743
    if avail and mailer:
744
        sqlutils.prepare_sw_db()
745
        if not sqlutils.check_exists(osversion, prel):
746
            rad = utilities.increment(osversion, 1)
747
            linkgen(osversion, rad, prel, temp=True)
748 5
            smtputils.prep_email(osversion, prel, password)
749 5
750
751
def comp_joiner(rootdir, localdir, filelist):
752 5
    """
753
    Join rootdir, localdir to every file in filelist.
754
755
    :param rootdir: Root directory.
756
    :type rootdir: str
757
758
    :param localdir: Subfolder inside rootdir.
759 5
    :type localdir: str
760 5
761 5
    :param filelist: List of files to return this path for.
762 5
    :type filelist: list(str)
763
    """
764
    joinedfiles = [os.path.join(rootdir, localdir, os.path.basename(x)) for x in filelist]
765 5
    return joinedfiles
766
767
768
def kernchecker_prep(kernlist):
769
    """
770
    Prepare output from kernel list.
771
772
    :param kernlist: List of kernel branches.
773
    :type kernlist: list(str)
774
    """
775 5
    splitkerns = [x.split("/") for x in kernlist]
776 5
    platforms = list({x[0] for x in splitkerns})
777 5
    kerndict = kernchecker_dict(splitkerns, platforms)
778 5
    return kerndict
779
780
781 5
def kernchecker_dict(splitkerns, platforms):
782
    """
783
    Prepare results dictionary.
784
785
    :param splitkerns: Split kernel branches.
786
    :type splitkerns: list(str)
787
788
    :param platforms: List of platform dicts.
789
    :type platforms: list(dict)
790
    """
791 5
    kerndict = {x: [] for x in platforms}
792 5
    for kernel in splitkerns:
793 5
        kerndict[kernel[0]].append("\t{0}".format(kernel[1]))
794
    return kerndict
795
796 5
797
def tclloader_prep(loaderfile, directory=False):
798
    """
799
    Prepare directory name and OS version.
800
801
    :param loaderfile: Path to input file/folder.
802
    :type loaderfile: str
803
804
    :param directory: If the input file is a folder. Default is False.
805
    :type directory: bool
806
    """
807
    loaderdir = loaderfile if directory else loaderfile.replace(".zip", "")
808
    osver = loaderdir.split("-")[-1]
809 5
    return loaderdir, osver
810 5
811 5
812 5
def tclloader_filename(loaderdir, osver, loadername=None):
813
    """
814
    Prepare platform and filename.
815 5
816
    :param loaderdir: Path to input folder.
817
    :type loaderdir: str
818
819
    :param osver: OS version.
820
    :type osver: str
821
822
    :param loadername: Name of final autoloader. Default is auto-generated.
823
    :type loadername: str
824
    """
825
    platform = os.listdir(os.path.join(loaderdir, "target", "product"))[0]
826
    if loadername is None:
827
        loadername = "{0}_autoloader_user-all-{1}".format(platform, osver)
828
    return loadername, platform
829
830
831
def tcl_download(downloadurl, filename, filesize, filehash, verify=True):
832
    """
833
    Download autoloader file, rename, and verify.
834 5
835 5
    :param downloadurl: Download URL.
836 5
    :type downloadurl: str
837 5
838 5
    :param filename: Name of autoloader file.
839 5
    :type filename: str
840 5
841 5
    :param filesize: Size of autoloader file.
842 5
    :type filesize: str
843 5
844
    :param filehash: SHA-1 hash of autoloader file.
845 5
    :type filehash: str
846 5
847
    :param verify: Whether to verify the file after downloading. Default is True.
848
    :type verify: bool
849 5
    """
850
    print("FILENAME: {0}".format(filename))
851
    print("LENGTH: {0}".format(utilities.fsizer(filesize)))
852
    networkutils.download(downloadurl)
853
    print("DOWNLOAD COMPLETE")
854
    os.rename(downloadurl.split("/")[-1], filename)
855
    if verify:
856
        method = hashutils.get_engine("sha1")
857
        shahash = hashutils.hashlib_hash(filename, method)
858
        if shahash == filehash:
859
            print("HASH CHECK OK")
860
        else:
861
            print(shahash)
862
            print("HASH FAILED!")
863
864
865
def tcl_prd_scan(curef, download=False, mode=4, fvver="AAA000", original=True, export=False, verify=True):
2 ignored issues
show
best-practice introduced by
Too many arguments (7/5)
Loading history...
Comprehensibility introduced by
This function exceeds the maximum number of variables (20/15).
Loading history...
866
    """
867
    Scan one PRD and produce download URL and filename.
868
869
    :param curef: PRD of the phone variant to check.
870
    :type curef: str
871
872
    :param download: If we'll download the file that this returns. Default is False.
873
    :type download: bool
874 5
875 5
    :param mode: 4 if downloading autoloaders, 2 if downloading OTA deltas.
876 5
    :type mode: int
877 5
878 5
    :param fvver: Initial software version, must be specific if downloading OTA deltas.
879 5
    :type fvver: str
880 5
881 5
    :param original: If we'll download the file with its original filename. Default is True.
882 5
    :type original: bool
883 5
884 5
    :param export: Whether to export XML response to file. Default is False.
885 5
    :type export: bool
886 5
887 5
    :param verify: Whether to verify the file after downloading. Default is True.
888
    :type verify: bool
889
    """
890 5
    sess = requests.Session()
891
    ctext = networkutils.tcl_check(curef, sess, mode, fvver, export)
892
    if ctext is None:
893
        raise SystemExit
894
    tvver, firmwareid, filename, filesize, filehash = networkutils.parse_tcl_check(ctext)
895
    salt = networkutils.tcl_salt()
896
    vkhsh = networkutils.vkhash(curef, tvver, firmwareid, salt, mode, fvver)
897
    updatetext = networkutils.tcl_download_request(curef, tvver, firmwareid, salt, vkhsh, sess, mode, fvver, export)
898
    downloadurl, encslave = networkutils.parse_tcl_download_request(updatetext)
899
    statcode = networkutils.getcode(downloadurl, sess)
900
    filename = tcl_delta_filename(curef, fvver, tvver, filename, original)
901
    tcl_prd_print(downloadurl, filename, statcode, encslave, sess)
902
    if statcode == 200 and download:
903
        tcl_download(downloadurl, filename, filesize, filehash, verify)
904
905
906
def tcl_delta_filename(curef, fvver, tvver, filename, original=True):
907
    """
908
    Generate compatible filenames for deltas, if needed.
909 5
910 5
    :param curef: PRD of the phone variant to check.
911 5
    :type curef: str
912 5
913
    :param fvver: Initial software version.
914
    :type fvver: str
915 5
916
    :param tvver: Target software version.
917
    :type tvver: str
918
919
    :param filename: File name from download URL, passed through if not changing filename.
920
    :type filename: str
921
922
    :param original: If we'll download the file with its original filename. Default is True.
923
    :type original: bool
924
    """
925
    if not original:
926
        prdver = curef.split("-")[1]
927
        filename = "JSU_PRD-{0}-{1}to{2}.zip".format(prdver, fvver, tvver)
928
    return filename
929
930
931
def tcl_prd_print(downloadurl, filename, statcode, encslave, session):
932
    """
933
    Print output from PRD scanning.
934 5
935 5
    :param downloadurl: File to download.
936 5
    :type downloadurl: str
937 5
938 5
    :param filename: File name from download URL.
939 5
    :type filename: str
940 5
941 5
    :param statcode: Status code of download URL.
942
    :type statcode: int
943
944 5
    :param encslave: Server hosting header script.
945
    :type encslave: str
946
947
    :param session: Session object.
948
    :type session: requests.Session
949
    """
950
    print("{0}: HTTP {1}".format(filename, statcode))
951 5
    print(downloadurl)
952 5
    if encslave is not None:
953 5
        address = "/{0}".format(downloadurl.split("/", 3)[3:][0])
954
        print("CHECKING HEADER...")
955 5
        sentinel = networkutils.encrypt_header(address, encslave, session)
956 5
        if sentinel is not None:
957 5
            print(sentinel)
958
959
960 5
def tcl_prep_otaver(ota=None):
961
    """
962
    Prepare variables for OTA versus full check.
963
964
    :param ota: The starting version if OTA, None if not. Default is None.
965
    :type ota: str
966
    """
967 5
    if ota is not None:
968 5
        mode = 2
969 5
        fvver = ota
970 5
    else:
971 5
        mode = 4
972 5
        fvver = "AAA000"
973
    return mode, fvver
974
975 5
976
def tcl_delta_remote(curef):
977
    """
978
    Prepare remote version for delta scanning.
979
980
    :param curef: PRD of the phone variant to check.
981
    :type curef: str
982 5
    """
983 5
    remotedict = networkutils.remote_prd_info()
984 5
    fvver = remotedict.get(curef, "AAA000")
985
    if fvver == "AAA000":
986
        print("NO REMOTE VERSION FOUND!")
987 5
        raise SystemExit
988
    return fvver
989
990
991
def tcl_mainscan_preamble(ota=None):
992
    """
993
    Prepare preamble for TCL scanning.
994
995
    :param ota: The starting version if OTA, None if not. Default is None.
996
    :type ota: str
997
    """
998
    slim_preamble("TCLSCAN")
999
    if ota is not None:
1000 5
        print("PRDs with OTA from OS {0}".format(ota.upper()))
1001 5
1002
1003 5
def tcl_mainscan_printer(curef, tvver, ota=None):
1004
    """
1005
    Print output of TCL scanning.
1006 5
1007
    :param curef: PRD of the phone variant to check.
1008
    :type curef: str
1009
1010
    :param tvver: Target software version.
1011
    :type tvver: str
1012
1013 5
    :param ota: The starting version if OTA, None if not. Default is None.
1014 5
    :type ota: str
1015 5
    """
1016 5
    if ota is not None:
1017
        print("{0}: {2} to {1}".format(curef, tvver, ota.upper()))
1018
    else:
1019 5
        print("{0}: {1}".format(curef, tvver))
1020
1021
1022
def tcl_findprd_prepd_start(prddict):
1023
    """
1024
    Collect list of PRD entries.
1025
1026 5
    :param prddict: Device:PRD dictionary.
1027 5
    :type prddict: dict(str: list)
1028 5
    """
1029
    prda = []
1030
    for item in prddict.values():
1031 5
        prda.extend(item)
1032
    return prda
1033
1034
1035
def tcl_findprd_prepd_middle(prda):
1036
    """
1037
    Convert PRD entries to list of center:end entries.
1038 5
1039 5
    :param prda: List of PRD-xxxxx-yyy entries.
1040 5
    :type prda: list(str)
1041 5
    """
1042 5
    prds = [x.split(" ")[0].replace("PRD-", "").split("-") for x in prda]
1043
    prdx = list({x[0]: x[1]} for x in prds)
1044
    return prdx
1045 5
1046
1047
def tcl_findprd_prepd_end(prdx):
1048
    """
1049
    Convert list of center:end entries to final center:[ends] dict.
1050
1051
    :param prdx: List of center:end dict entries.
1052 5
    :type prdx: list(dict(str: str))
1053 5
    """
1054 5
    prdf = collections.defaultdict(list)
1055 5
    for prdc in prdx:
1056
        for key, value in prdc.items():
1057
            prdf[key].append(value)
1058 5
    return prdf
1059
1060
1061
def tcl_findprd_prepdict(prddict):
1062
    """
1063
    Prepare dict of center:[ends] entries.
1064
1065
    :param prddict: Device:PRD dictionary.
1066
    :type prddict: dict(str: list)
1067
    """
1068 5
    prda = tcl_findprd_prepd_start(prddict)
1069 5
    prdx = tcl_findprd_prepd_middle(prda)
1070 5
    prdfinal = tcl_findprd_prepd_end(prdx)
1071 5
    return prdfinal
1072 5
1073 5
1074 5
def tcl_findprd_checkfilter(prddict, tocheck=None):
1075
    """
1076
    Filter PRD dict if needed.
1077 5
1078
    :param prddict: PRD center:[ends] dictionary.
1079
    :type prddict: collections.defaultdict(str: list)
1080
1081
    :param tocheck: Specific PRD(s) to check, None if all will be checked. Default is None.
1082
    :type tocheck: list(str)
1083
    """
1084
    prddict2 = prddict
1085
    if tocheck is not None:
1086
        prddict2 = collections.defaultdict(list)
1087
        for toch in tocheck:
1088
            toch = toch.replace("PRD-", "")
1089
            prddict2[toch] = prddict[toch]
1090
    return prddict2
1091
1092
1093
def tcl_findprd_centerscan(center, prddict, session, floor=0, ceiling=999, export=False):
1 ignored issue
show
best-practice introduced by
Too many arguments (6/5)
Loading history...
1094
    """
1095
    Individual scanning for the center of a PRD.
1096
1097
    :param center: PRD-center-end.
1098
    :type center: str
1099 5
1100 5
    :param prddict: PRD center:[ends] dictionary.
1101 5
    :type prddict: collections.defaultdict(str: list)
1102 5
1103
    :param session: Session object.
1104
    :type session: requests.Session
1105 5
1106
    :param floor: When to start. Default is 0.
1107
    :type floor: int
1108
1109
    :param ceiling: When to stop. Default is 999.
1110
    :type ceiling: int
1111
1112
    :param export: Whether to export XML response to file. Default is False.
1113
    :type export: bool
1114
    """
1115
    tails = [int(i) for i in prddict[center]]
1116
    safes = [g for g in range(floor, ceiling) if g not in tails]
1117
    print("SCANNING ROOT: {0}{1}".format(center, " "*8))
1118
    tcl_findprd_safescan(safes, center, session, export)
1119
1120
1121 5
def tcl_findprd_safescan(safes, center, session, export=False):
1122 5
    """
1123 5
    Scan for PRDs known not to be in database.
1124 5
1125 5
    :param safes: List of ends within given range that aren't in database.
1126 5
    :type safes: list(int)
1127
1128 5
    :param center: PRD-center-end.
1129
    :type center: str
1130
1131 5
    :param session: Session object.
1132
    :type session: requests.Session
1133
1134
    :param export: Whether to export XML response to file. Default is False.
1135
    :type export: bool
1136
    """
1137
    for j in safes:
1138
        curef = "PRD-{}-{:03}".format(center, j)
1139
        print("NOW SCANNING: {0}".format(curef), end="\r")
1140
        checktext = networkutils.tcl_check(curef, session, export)
1141 5
        if checktext is None:
1142 5
            continue
1143 5
        else:
1144 5
            tcl_findprd_safehandle(curef, checktext)
1145
1146
1147 5
def tcl_findprd_safehandle(curef, checktext):
1148
    """
1149
    Parse API output and print the relevant bits.
1150
1151
    :param curef: PRD of the phone variant to check.
1152
    :type curef: str
1153
1154
    :param checktext: The XML formatted data returned from the first stage API check.
1155
    :type checktext: str
1156
    """
1157
    tvver, firmwareid, filename, fsize, fhash = networkutils.parse_tcl_check(checktext)
1158
    del firmwareid, filename, fsize, fhash
1159
    tvver2 = "{0}{1}".format(tvver, " "*8)
1160
    tcl_mainscan_printer(curef, tvver2)
1161
1162
1163 5
def tcl_findprd(prddict, floor=0, ceiling=999, export=False):
1164 5
    """
1165 5
    Check for new PRDs based on PRD database.
1166
1167
    :param prddict: PRD center:[ends] dictionary.
1168 5
    :type prddict: collections.defaultdict(str: list)
1169
1170
    :param floor: When to start. Default is 0.
1171
    :type floor: int
1172
1173
    :param ceiling: When to stop. Default is 999.
1174
    :type ceiling: int
1175
1176
    :param export: Whether to export XML response to file. Default is False.
1177
    :type export: bool
1178
    """
1179
    sess = requests.Session()
1180
    for center in sorted(prddict.keys()):
1181 5
        tcl_findprd_centerscan(center, prddict, sess, floor, ceiling, export)
1182
1183
1184 5
def linkgen_sdk_dicter(indict, origtext, newtext):
1185
    """
1186
    Prepare SDK radio/OS dictionaries.
1187
1188
    :param indict: Dictionary of radio and OS pairs.
1189
    :type: dict(str:str)
1190
1191
    :param origtext: String in indict's values that must be replaced.
1192
    :type origtext: str
1193
1194
    :param newtext: What to replace origtext with.
1195
    :type newtext: str
1196
    """
1197 5
    return {key: val.replace(origtext, newtext) for key, val in indict.items()}
1198 5
1199 5
1200 5
def linkgen_sdk(sdk, oses, cores):
1201 5
    """
1202 5
    Generate SDK debrick/core images.
1203
1204
    :param sdk: If we specifically want SDK images. Default is False.
1205 5
    :type sdk: bool
1206
1207
    :param oses: Dictionary of radio and debrick pairs.
1208
    :type oses: dict(str:str)
1209
1210
    :param cores: Dictionary of radio and core pairs.
1211
    :type cores: dict(str:str)
1212
    """
1213
    if sdk:
1214
        oses2 = linkgen_sdk_dicter(oses, "factory_sfi", "sdk")
1215
        cores2 = linkgen_sdk_dicter(cores, "factory_sfi", "sdk")
1216
        oses = linkgen_sdk_dicter(oses2, "verizon_sfi", "sdk")
1217
        cores = linkgen_sdk_dicter(cores2, "verizon_sfi", "sdk")
1218
    return oses, cores
1219
1220
1221
def linkgen(osversion, radioversion=None, softwareversion=None, altsw=None, temp=False, sdk=False):
2 ignored issues
show
best-practice introduced by
Too many arguments (6/5)
Loading history...
Comprehensibility introduced by
This function exceeds the maximum number of variables (17/15).
Loading history...
1222
    """
1223
    Generate debrick/core/radio links for given OS, radio, software release.
1224
1225
    :param osversion: OS version, 10.x.y.zzzz.
1226
    :type osversion: str
1227 5
1228 5
    :param radioversion: Radio version, 10.x.y.zzzz. Can be guessed.
1229 5
    :type radioversion: str
1230 5
1231 5
    :param softwareversion: Software version, 10.x.y.zzzz. Can be guessed.
1232 5
    :type softwareversion: str
1233 5
1234 5
    :param altsw: Radio software release, if not the same as OS.
1235 5
    :type altsw: str
1236 5
1237 5
    :param temp: If file we write to is temporary. Default is False.
1238 5
    :type temp: bool
1239 5
1240 5
    :param sdk: If we specifically want SDK images. Default is False.
1241 5
    :type sdk: bool
1242 5
    """
1243 5
    radioversion = return_radio_version(osversion, radioversion)
1244 5
    softwareversion, swc = return_sw_checked(softwareversion, osversion)
1245
    del swc
1246
    if altsw is not None:
1247 5
        altsw, aswc = return_radio_sw_checked(altsw, radioversion)
1248
        del aswc
1249
    baseurl = utilities.create_base_url(softwareversion)
1250
    oses, cores, radios = textgenerator.url_gen(osversion, radioversion, softwareversion)
1251
    if altsw is not None:
1252
        del radios
1253
        dbks, cors, radios = textgenerator.url_gen(osversion, radioversion, altsw)
1254 5
        del dbks
1255 5
        del cors
1256 5
    avlty = networkutils.availability(baseurl)
1257 5
    oses, cores = linkgen_sdk(sdk, oses, cores)
1258
    prargs = (softwareversion, osversion, radioversion, oses, cores, radios, avlty, False, None, temp, altsw)
1259 5
    lthr = threading.Thread(target=textgenerator.write_links, args=prargs)
1260 5
    lthr.start()
1261
1262
1263 5
def clean_swrel(swrelset):
1264
    """
1265
    Clean a list of software release lookups.
1266
1267
    :param swrelset: List of software releases.
1268
    :type swrelset: set(str)
1269
    """
1270
    for i in swrelset:
1271
        if i != "SR not in system" and i is not None:
1272
            swrelease = i
1273 5
            break
1274 5
    else:
1275
        swrelease = ""
1276
    return swrelease
1277 5
1278
1279
def autolookup_logger(record, out):
1280
    """
1281
    Write autolookup results to file.
1282
1283
    :param record: The file to log to.
1284
    :type record: str
1285
1286
    :param out: Output block.
1287
    :type out: str
1288
    """
1289
    with open(record, "a") as rec:
1290
        rec.write("{0}\n".format(out))
1291
1292
1293
def autolookup_printer(out, avail, log=False, quiet=False, record=None):
1294
    """
1295
    Print autolookup results, logging if specified.
1296 5
1297 5
    :param out: Output block.
1298 5
    :type out: str
1299 5
1300 5
    :param avail: Availability. Can be "Available" or "Unavailable".
1301 5
    :type avail: str
1302 5
1303
    :param log: If we're logging to file.
1304
    :type log: bool
1305 5
1306
    :param quiet: If we only note available entries.
1307
    :type quiet: bool
1308
1309
    :param record: If we're logging, the file to log to.
1310
    :type record: str
1311
    """
1312
    if not quiet:
1313
        avail = "Available"  # force things
1314
    if avail.lower() == "available":
1315
        if log:
1316
            lthr = threading.Thread(target=autolookup_logger, args=(record, out))
1317
            lthr.start()
1318
        print(out)
1319
1320
1321 5
def autolookup_output_sql(osversion, swrelease, avail, sql=False):
1322 5
    """
1323 5
    Add OS to SQL database.
1324 5
1325
    :param osversion: OS version.
1326
    :type osversion: str
1327 5
1328
    :param swrelease: Software release.
1329
    :type swrelease: str
1330
1331
    :param avail: "Unavailable" or "Available".
1332
    :type avail: str
1333
1334
    :param sql: If we're adding this to our SQL database.
1335
    :type sql: bool
1336
    """
1337
    if sql:
1338
        sqlutils.prepare_sw_db()
1339
        if not sqlutils.check_exists(osversion, swrelease):
1340
            sqlutils.insert(osversion, swrelease, avail.lower())
1341
1342
1343
def autolookup_output(osversion, swrelease, avail, avpack, sql=False):
1344
    """
1345
    Prepare autolookup block, and add to SQL database.
1346 5
1347 5
    :param osversion: OS version.
1348 5
    :type osversion: str
1349 5
1350 5
    :param swrelease: Software release.
1351
    :type swrelease: str
1352
1353 5
    :param avail: "Unavailable" or "Available".
1354
    :type avail: str
1355
1356
    :param avpack: Availabilities: alpha 1 and 2, beta 1 and 2, production.
1357
    :type avpack: list(str)
1358
1359
    :param sql: If we're adding this to our SQL database.
1360
    :type sql: bool
1361
    """
1362
    othr = threading.Thread(target=autolookup_output_sql, args=(osversion, swrelease, avail, sql))
1363 5
    othr.start()
1364 5
    avblok = "[{0}|{1}|{2}|{3}|{4}]".format(*avpack)
1365
    out = "OS {0} - SR {1} - {2} - {3}".format(osversion, swrelease, avblok, avail)
1366
    return out
1367 5
1368
1369
def clean_barlist(cleanfiles, stoppers):
1370
    """
1371
    Remove certain bars from barlist based on keywords.
1372
1373
    :param cleanfiles: List of files to clean.
1374
    :type cleanfiles: list(str)
1375
1376
    :param stoppers: List of keywords (i.e. bar names) to exclude.
1377
    :type stoppers: list(str)
1378
    """
1379
    finals = [link for link in cleanfiles if all(word not in link for word in stoppers)]
1380
    return finals
1381
1382
1383
def prep_export_cchecker(files, npc, hwid, osv, radv, swv, upgrade=False, forced=None):
1 ignored issue
show
best-practice introduced by
Too many arguments (8/5)
Loading history...
1384
    """
1385
    Prepare carrierchecker lookup links to write to file.
1386
1387
    :param files: List of file URLs.
1388
    :type files: list(str)
1389
1390
    :param npc: MCC + MNC (ex. 302220).
1391
    :type npc: int
1392
1393
    :param hwid: Device hardware ID.
1394
    :type hwid: str
1395 5
1396 5
    :param osv: OS version.
1397 5
    :type osv: str
1398
1399 5
    :param radv: Radio version.
1400 5
    :type radv: str
1401 5
1402 5
    :param swv: Software release.
1403 5
    :type swv: str
1404
1405
    :param upgrade: Whether or not to use upgrade files. Default is false.
1406 5
    :type upgrade: bool
1407
1408
    :param forced: Force a software release. None to go for latest.
1409
    :type forced: str
1410
    """
1411
    if not upgrade:
1412
        newfiles = networkutils.carrier_query(npc, hwid, True, False, forced)
1413
        cleanfiles = newfiles[3]
1414
    else:
1415
        cleanfiles = files
1416
    osurls, coreurls, radiourls = textgenerator.url_gen(osv, radv, swv)
1417
    stoppers = ["8960", "8930", "8974", "m5730", "winchester"]
1418
    finals = clean_barlist(cleanfiles, stoppers)
1419
    return osurls, coreurls, radiourls, finals
1420
1421
1422
def export_cchecker(files, npc, hwid, osv, radv, swv, upgrade=False, forced=None):
1 ignored issue
show
best-practice introduced by
Too many arguments (8/5)
Loading history...
1423
    """
1424
    Write carrierchecker lookup links to file.
1425
1426
    :param files: List of file URLs.
1427
    :type files: list(str)
1428
1429
    :param npc: MCC + MNC (ex. 302220).
1430
    :type npc: int
1431
1432
    :param hwid: Device hardware ID.
1433
    :type hwid: str
1434 5
1435 5
    :param osv: OS version.
1436 5
    :type osv: str
1437 5
1438
    :param radv: Radio version.
1439 5
    :type radv: str
1440
1441
    :param swv: Software release.
1442 5
    :type swv: str
1443
1444
    :param upgrade: Whether or not to use upgrade files. Default is false.
1445
    :type upgrade: bool
1446
1447
    :param forced: Force a software release. None to go for latest.
1448
    :type forced: str
1449
    """
1450
    if files:
1451
        osurls, coreurls, radiourls, finals = prep_export_cchecker(files, npc, hwid, osv, radv, swv, upgrade, forced)
1452
        textgenerator.write_links(swv, osv, radv, osurls, coreurls, radiourls, True, True, finals)
1453
        print("\nFINISHED!!!")
1454
    else:
1455
        print("CANNOT EXPORT, NO SOFTWARE RELEASE")
1456
1457 5
1458
def generate_blitz_links(files, osv, radv, swv):
1459
    """
1460
    Generate blitz URLs (i.e. all OS and radio links).
1461
    :param files: List of file URLs.
1462
    :type files: list(str)
1463 5
1464
    :param osv: OS version.
1465
    :type osv: str
1466
1467
    :param radv: Radio version.
1468
    :type radv: str
1469
1470
    :param swv: Software release.
1471 5
    :type swv: str
1472
    """
1473
    coreurls = [
1474 5
        utilities.create_bar_url(swv, "winchester.factory_sfi", osv),
1475
        utilities.create_bar_url(swv, "qc8960.factory_sfi", osv),
1476
        utilities.create_bar_url(swv, "qc8960.factory_sfi", osv),
1477
        utilities.create_bar_url(swv, "qc8960.factory_sfi_hybrid_qc8974", osv)
1478
    ]
1479
    radiourls = [
1480
        utilities.create_bar_url(swv, "m5730", radv),
1481
        utilities.create_bar_url(swv, "qc8960", radv),
1482
        utilities.create_bar_url(swv, "qc8960.wtr", radv),
1483
        utilities.create_bar_url(swv, "qc8960.wtr5", radv),
1484 5
        utilities.create_bar_url(swv, "qc8930.wtr5", radv),
1485 5
        utilities.create_bar_url(swv, "qc8974.wtr2", radv)
1486 5
    ]
1487 5
    return files + coreurls + radiourls
1488 5
1489 5
1490 5
def package_blitz(bardir, swv):
1491
    """
1492 5
    Package and verify a blitz package.
1493
1494
    :param bardir: Path to folder containing bar files.
1495 5
    :type bardir: str
1496
1497
    :param swv: Software version.
1498
    :type swv: str
1499
    """
1500
    print("\nCREATING BLITZ...")
1501
    barutils.create_blitz(bardir, swv)
1502 5
    print("\nTESTING BLITZ...")
1503
    zipver = archiveutils.zip_verify("Blitz-{0}.zip".format(swv))
1504
    if not zipver:
1505 5
        print("BLITZ FILE IS BROKEN")
1506
        raise SystemExit
1507
    else:
1508
        shutil.rmtree(bardir)
1509
1510
1511
def slim_preamble(appname):
1512
    """
1513
    Standard app name header.
1514
1515
    :param appname: Name of app.
1516
    :type appname: str
1517
    """
1518
    print("~~~{0} VERSION {1}~~~".format(appname.upper(), shortversion()))
1519
1520
1521
def standard_preamble(appname, osversion, softwareversion, radioversion, altsw=None):
1522
    """
1523
    Standard app name, OS, radio and software (plus optional radio software) print block.
1524 5
1525 5
    :param appname: Name of app.
1526 5
    :type appname: str
1527 5
1528 5
    :param osversion: OS version, 10.x.y.zzzz. Required.
1529 5
    :type osversion: str
1530
1531
    :param radioversion: Radio version, 10.x.y.zzzz. Can be guessed.
1532 5
    :type radioversion: str
1533
1534
    :param softwareversion: Software release, 10.x.y.zzzz. Can be guessed.
1535
    :type softwareversion: str
1536 5
1537 5
    :param altsw: Radio software release, if not the same as OS.
1538 5
    :type altsw: str
1539 5
    """
1540 5
    slim_preamble(appname)
1541 5
    print("OS VERSION: {0}".format(osversion))
1542 5
    print("OS SOFTWARE VERSION: {0}".format(softwareversion))
1543 5
    print("RADIO VERSION: {0}".format(radioversion))
1544
    if altsw is not None:
1545
        print("RADIO SOFTWARE VERSION: {0}".format(altsw))
1546 5
1547
1548
def questionnaire_device(message=None):
1549
    """
1550 5
    Get device from questionnaire.
1551 5
    """
1552 5
    message = "DEVICE (XXX100-#): " if message is None else message
1553 5
    device = input(message)
1554 5
    if not device:
1555 5
        print("NO DEVICE SPECIFIED!")
1556 5
        decorators.enter_to_exit(True)
1557 5
        if not getattr(sys, 'frozen', False):
1558 5
            raise SystemExit
1559
    return device
1560 5
1561 5
1562
def verify_gpg_credentials():
1563
    """
1564 5
    Read GPG key/pass from file, verify if incomplete.
1565
    """
1566
    gpgkey, gpgpass = gpgutils.gpg_config_loader()
1567
    if gpgkey is None or gpgpass is None:
1568
        print("NO PGP KEY/PASS FOUND")
1569
        cont = utilities.i2b("CONTINUE (Y/N)?: ")
1570
        if cont:
1571 5
            gpgkey = verify_gpg_key(gpgkey)
1572 5
            gpgpass, writebool = verify_gpg_pass(gpgpass)
1573 5
            gpgpass2 = gpgpass if writebool else None
1574 5
            gpgutils.gpg_config_writer(gpgkey, gpgpass2)
1575 5
        else:
1576
            gpgkey = None
1577
    return gpgkey, gpgpass
1578 5
1579
1580
def verify_gpg_key(gpgkey=None):
1581
    """
1582
    Verify GPG key.
1583
1584
    :param gpgkey: Key, use None to take from input.
1585 5
    :type gpgkey: str
1586 5
    """
1587 5
    if gpgkey is None:
1588
        gpgkey = input("PGP KEY (0x12345678): ")
1589 5
        if not gpgkey.startswith("0x"):
1590 5
            gpgkey = "0x{0}".format(gpgkey)   # add preceding 0x
1591
    return gpgkey
1592
1593 5
1594
def verify_gpg_pass(gpgpass=None):
1595
    """
1596
    Verify GPG passphrase.
1597
1598
    :param gpgpass: Passphrase, use None to take from input.
1599
    :type gpgpass: str
1600
    """
1601
    if gpgpass is None:
1602
        gpgpass = getpass.getpass(prompt="PGP PASSPHRASE: ")
1603
        writebool = utilities.i2b("SAVE PASSPHRASE (Y/N)?:")
1604
    else:
1605
        writebool = False
1606
    return gpgpass, writebool
1607
1608
1609
def bulk_hash(dirs, compressed=True, deleted=True, radios=True, hashdict=None):
1610
    """
1611
    Hash files in several folders based on flags.
1612 5
1613 5
    :param dirs: Folders: [OS_bars, radio_bars, OS_exes, radio_exes, OS_zips, radio_zips]
1614
    :type dirs: list(str)
1615
1616 5
    :param compressed: Whether to hash compressed files. True by default.
1617
    :type compressed: bool
1618
1619
    :param deleted: Whether to delete uncompressed files. True by default.
1620
    :type deleted: bool
1621
1622
    :param radios: Whether to hash radio autoloaders. True by default.
1623
    :type radios: bool
1624
1625
    :param hashdict: Dictionary of hash rules, in ~\bbarchivist.ini.
1626
    :type hashdict: dict({str: bool})
1627
    """
1628
    print("HASHING LOADERS...")
1629
    defargs = utilities.def_args(dirs)
1630
    utilities.cond_check(hashutils.verifier, defargs, [hashdict], radios, compressed, deleted)
1631
1632 5
1633 5
def bulk_verify(dirs, compressed=True, deleted=True, radios=True):
1634 5
    """
1635 5
    Verify files in several folders based on flags.
1636 5
1637 5
    :param dirs: Folders: [OS_bars, radio_bars, OS_exes, radio_exes, OS_zips, radio_zips]
1638
    :type dirs: list(str)
1639
1640 5
    :param compressed: Whether to hash compressed files. True by default.
1641
    :type compressed: bool
1642
1643
    :param deleted: Whether to delete uncompressed files. True by default.
1644
    :type deleted: bool
1645
1646
    :param radios: Whether to hash radio autoloaders. True by default.
1647 5
    :type radios: bool
1648
    """
1649
    gpgkey, gpgpass = verify_gpg_credentials()
1650 5
    if gpgpass is not None and gpgkey is not None:
1651
        print("VERIFYING LOADERS...")
1652
        print("KEY: {0}".format(gpgkey))
1653
        restargs = [gpgkey, gpgpass, True]
1654
        defargs = utilities.def_args(dirs)
1655
        utilities.cond_check(gpgutils.gpgrunner, defargs, restargs, radios, compressed, deleted)
1656
1657 5
1658 5
def enn_ayy(quant):
1659 5
    """
1660 5
    Cheeky way to put a N/A placeholder for a string.
1661
1662
    :param quant: What to check if it's None.
1663 5
    :type quant: str
1664
    """
1665
    return "N/A" if quant is None else quant
1666
1667
1668
def generate_workfolder(folder=None):
1669
    """
1670
    Check if a folder exists, make it if it doesn't, set it to home if None.
1671
1672
    :param folder: Folder to check.
1673
    :type folder: str
1674
    """
1675
    folder = utilities.dirhandler(folder, os.getcwd())
1676
    if folder is not None and not os.path.exists(folder):
1677
        os.makedirs(folder)
1678
    return folder
1679
1680
1681
def info_header(afile, osver, radio=None, software=None, device=None):
1682 5
    """
1683 5
    Write header for info file.
1684 5
1685
    :param afile: Open file to write to.
1686 5
    :type afile: File object
1687 5
1688 5
    :param osver: OS version, required for both types.
1689
    :type osver: str
1690
1691 5
    :param radio: Radio version, required for QNX.
1692
    :type radio: str
1693
1694
    :param software: Software release, required for QNX.
1695
    :type software: str
1696
1697
    :param device: Device type, required for Android.
1698
    :type device: str
1699
    """
1700
    afile.write("OS: {0}\n".format(osver))
1701
    if device:
1702
        afile.write("Device: {0}\n".format(enn_ayy(device)))
1703
    else:
1704 5
        afile.write("Radio: {0}\n".format(enn_ayy(radio)))
1705 5
        afile.write("Software: {0}\n".format(enn_ayy(software)))
1706 5
    afile.write("{0}\n".format("~"*40))
1707 5
1708 5
1709
def prep_info(filepath, osver, device=None):
1710
    """
1711 5
    Prepare file list for new-style info file.
1712
1713
    :param filepath: Path to folder to analyze.
1714
    :type filepath: str
1715
1716
    :param osver: OS version, required for both types.
1717
    :type osver: str
1718
1719
    :param device: Device type, required for Android.
1720
    :type device: str
1721
    """
1722
    fileext = ".zip" if device else ".7z"
1723
    files = os.listdir(filepath)
1724
    absfiles = [os.path.join(filepath, x) for x in files if x.endswith((fileext, ".exe"))]
1725
    fname = os.path.join(filepath, "!{0}_OSINFO!.txt".format(osver))
1726
    return fname, absfiles
1727
1728
1729
def make_info(filepath, osver, radio=None, software=None, device=None):
1730 5
    """
1731 5
    Create a new-style info (names, sizes and hashes) file.
1732 5
1733 5
    :param filepath: Path to folder to analyze.
1734 5
    :type filepath: str
1735
1736
    :param osver: OS version, required for both types.
1737 5
    :type osver: str
1738
1739
    :param radio: Radio version, required for QNX.
1740
    :type radio: str
1741
1742
    :param software: Software release, required for QNX.
1743
    :type software: str
1744
1745
    :param device: Device type, required for Android.
1746
    :type device: str
1747
    """
1748
    fname, absfiles = prep_info(filepath, osver, device)
1749
    with open(fname, "w") as afile:
1750
        info_header(afile, osver, radio, software, device)
1751
        for indx, file in enumerate(absfiles):
1752
            write_info(file, indx, len(absfiles), afile)
1753 5
1754 5
1755 5
def write_info(infile, index, filecount, outfile):
1756 5
    """
1757 5
    Write a new-style info (names, sizes and hashes) file.
1758 5
1759 5
    :param infile: Path to file whose name, size and hash are to be written.
1760 5
    :type infile: str
1761 5
1762 5
    :param index: Which file index out of the list of files we're writing.
1763
    :type index: int
1764
1765 5
    :param filecount: Total number of files we're to write; for excluding terminal newline.
1766
    :type filecount: int
1767
1768
    :param outfile: Open (!!!) file handle. Output file.
1769
    :type outfile: str
1770
    """
1771
    fsize = os.stat(infile).st_size
1772
    outfile.write("File: {0}\n".format(os.path.basename(infile)))
1773
    outfile.write("\tSize: {0} ({1})\n".format(fsize, utilities.fsizer(fsize)))
1774
    outfile.write("\tHashes:\n")
1775
    outfile.write("\t\tMD5: {0}\n".format(hashutils.hashlib_hash(infile, hashlib.md5()).upper()))
1776
    outfile.write("\t\tSHA1: {0}\n".format(hashutils.hashlib_hash(infile, hashlib.sha1()).upper()))
1777
    outfile.write("\t\tSHA256: {0}\n".format(hashutils.hashlib_hash(infile, hashlib.sha256()).upper()))
1778
    outfile.write("\t\tSHA512: {0}\n".format(hashutils.hashlib_hash(infile, hashlib.sha512()).upper()))
1779
    if index != filecount - 1:
1780
        outfile.write("\n")
1781
1782
1783
def bulk_info(dirs, osv, compressed=True, deleted=True, radios=True, rad=None, swv=None, dev=None):
1 ignored issue
show
best-practice introduced by
Too many arguments (8/5)
Loading history...
1784
    """
1785
    Generate info files in several folders based on flags.
1786
1787
    :param dirs: Folders: [OS_bars, radio_bars, OS_exes, radio_exes, OS_zips, radio_zips]
1788
    :type dirs: list(str)
1789
1790
    :param osver: OS version, required for both types.
1791
    :type osver: str
1792
1793 5
    :param compressed: Whether to hash compressed files. True by default.
1794 5
    :type compressed: bool
1795 5
1796
    :param deleted: Whether to delete uncompressed files. True by default.
1797
    :type deleted: bool
1798
1799
    :param radios: Whether to hash radio autoloaders. True by default.
1800
    :type radios: bool
1801
1802
    :param rad: Radio version, required for QNX.
1803
    :type rad: str
1804
1805
    :param swv: Software release, required for QNX.
1806
    :type swv: str
1807
1808
    :param dev: Device type, required for Android.
1809
    :type dev: str
1810
    """
1811
    print("GENERATING INFO FILES...")
1812
    restargs = [osv, rad, swv, dev]
1813
    utilities.cond_check(make_info, utilities.def_args(dirs), restargs, radios, compressed, deleted)
1814