Test Failed
Branch master (c56792)
by John
02:11
created

bbarchivist.scriptutils   F

Complexity

Total Complexity 227

Size/Duplication

Total Lines 1814
Duplicated Lines 0 %

Test Coverage

Coverage 100%

Importance

Changes 0
Metric Value
eloc 638
dl 0
loc 1814
rs 0.6314
c 0
b 0
f 0
ccs 609
cts 609
cp 1
wmc 227

88 Functions

Rating   Name   Duplication   Size   Complexity  
A check_altsw() 0 14 3
A tcl_findprd_prepd_end() 0 12 3
A check_os_bulk_handle() 0 9 2
A linkgen_sdk_dicter() 0 14 2
A check_radio_bulk_notfound() 0 17 2
A tcl_prep_otaver() 0 14 2
A dpf_flags_folder() 0 19 2
A tcl_findprd() 0 19 2
A bulk_hash() 0 22 1
A tclloader_filename() 0 17 2
B default_parser() 0 25 1
A autolookup_output_sql() 0 20 3
A tcl_findprd_checkfilter() 0 17 3
B linkgen() 0 40 3
A package_blitz() 0 19 2
A tcl_findprd_prepd_middle() 0 10 3
A test_loader_files() 0 18 3
A test_bar_files_individual() 0 22 2
A tcl_delta_remote() 0 13 2
A bar_broken_individual() 0 19 4
B check_radio_single() 0 24 4
B export_cchecker() 0 34 2
A tcl_findprd_safehandle() 0 14 1
A get_baseurls() 0 13 2
A check_radio_bulk_go() 0 14 2
B standard_preamble() 0 25 2
A check_radio_bulk() 0 18 4
A generate_workfolder() 0 11 3
A return_radio_sw_checked() 0 18 2
A dpf_flags_osr() 0 21 2
A shortversion() 0 11 3
A check_sw_handle() 0 12 2
B verify_gpg_credentials() 0 16 5
A tclloader_prep() 0 13 2
A clean_barlist() 0 12 4
B tcl_prd_print() 0 27 3
A arg_verify_none() 0 12 2
A tcl_mainscan_preamble() 0 10 2
A return_radio_version() 0 13 2
A check_os_single() 0 20 3
B generic_windows_shim() 0 24 2
B bulk_info() 0 31 1
A check_radio_sw() 0 14 1
A kernchecker_dict() 0 14 3
B test_signed_files() 0 18 5
A comp_joiner() 0 15 2
B tcl_findprd_centerscan() 0 26 4
B tcl_prd_scan() 0 39 4
A autolookup_logger() 0 12 2
A enn_ayy() 0 8 2
A check_os_bulk() 0 14 4
A check_sw_actual() 0 15 2
B autolookup_output() 0 24 1
B generate_blitz_links() 0 30 1
B info_header() 0 26 2
A test_bar_files() 0 20 3
A check_radio_bulk_stop() 0 8 2
A bulk_avail() 0 10 3
A return_sw_checked() 0 17 2
A tcl_mainscan_printer() 0 17 2
B tcl_download() 0 32 3
B prod_avail_mailprep() 0 25 4
A sw_check_contingency() 0 19 3
A linkgen_sdk() 0 19 2
A prep_info() 0 18 4
A longversion() 0 11 3
B clean_swrel() 0 14 5
A tcl_findprd_prepd_start() 0 11 2
A default_parser_vers() 0 10 2
B prod_avail() 0 27 4
B get_sz_executable() 0 26 4
A verify_gpg_key() 0 12 3
A bulk_verify() 0 23 3
A check_sw() 0 22 3
B write_info() 0 26 2
B prep_export_cchecker() 0 37 2
A verify_gpg_pass() 0 13 2
A slim_preamble() 0 8 1
A test_single_loader() 0 16 3
A default_parser_flags() 0 14 2
A tcl_delta_filename() 0 23 2
A kernchecker_prep() 0 11 3
B make_info() 0 24 3
B autolookup_printer() 0 26 4
B tcl_findprd_safescan() 0 24 3
A questionnaire_device() 0 12 4
A external_version() 0 13 3
A tcl_findprd_prepdict() 0 11 1

How to fix   Complexity   

Complexity

Complex classes like bbarchivist.scriptutils often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
#!/usr/bin/env python3
0 ignored issues
show
coding-style introduced by
Too many lines in module (1813/1000)
Loading history...
2 5
"""This module contains various utilities for the scripts folder."""
3
4 5
import argparse  # generic parser
5 5
import collections  # defaultdict
6 5
import getpass  # invisible password
7 5
import glob  # file lookup
8 5
import hashlib  # hashes
9 5
import os  # path work
10 5
import shutil  # folder removal
11 5
import subprocess  # running cfp/cap
12 5
import sys  # getattr
13 5
import threading  # run stuff in background
14
15 5
import requests  # session
16 5
from bbarchivist import archiveutils  # archive support
17 5
from bbarchivist import barutils  # file system work
18 5
from bbarchivist import bbconstants  # constants
19 5
from bbarchivist import decorators  # decorating functions
20 5
from bbarchivist import gpgutils  # gpg
21 5
from bbarchivist import hashutils  # file hashes
22 5
from bbarchivist import networkutils  # network tools
23 5
from bbarchivist import smtputils  # email
24 5
from bbarchivist import sqlutils  # sql
25 5
from bbarchivist import textgenerator  # writing text to file
26 5
from bbarchivist import utilities  # little things
27
28 5
__author__ = "Thurask"
29 5
__license__ = "WTFPL v2"
30 5
__copyright__ = "2015-2018 Thurask"
31
32
33 5
def shortversion():
34
    """
35
    Get short app version (Git tag).
36
    """
37 5
    if not getattr(sys, 'frozen', False):
38 5
        ver = bbconstants.VERSION
39
    else:
40 5
        verfile = glob.glob(os.path.join(os.getcwd(), "version.txt"))[0]
41 5
        with open(verfile) as afile:
42 5
            ver = afile.read()
43 5
    return ver
44
45
46 5
def longversion():
47
    """
48
    Get long app version (Git tag + commits + hash).
49
    """
50 5
    if not getattr(sys, 'frozen', False):
51 5
        ver = (bbconstants.LONGVERSION, bbconstants.COMMITDATE)
52
    else:
53 5
        verfile = glob.glob(os.path.join(os.getcwd(), "longversion.txt"))[0]
54 5
        with open(verfile) as afile:
55 5
            ver = afile.read().split("\n")
56 5
    return ver
57
58
59 5
def default_parser_vers(vers=None):
60
    """
61
    Prepare version for default parser.
62
63
    :param vers: Versions: [git commit hash, git commit date]
64
    :param vers: list(str)
65
    """
66 5
    if vers is None:
67 5
        vers = longversion()
68 5
    return vers
69
70
71 5
def default_parser_flags(parser, flags=None):
72
    """
73
    Handle flags for default parser.
74
75
    :param parser: Parser to modify.
76
    :type parser: argparse.ArgumentParser
77
78
    :param flags: Tuple of sections to add.
79
    :type flags: tuple(str)
80
    """
81 5
    if flags is not None:
82 5
        parser = dpf_flags_folder(parser, flags)
83 5
        parser = dpf_flags_osr(parser, flags)
84 5
    return parser
85
86
87 5
def dpf_flags_folder(parser, flags=None):
88
    """
89
    Add generic folder flag to parser.
90
91
    :param parser: Parser to modify.
92
    :type parser: argparse.ArgumentParser
93
94
    :param flags: Tuple of sections to add.
95
    :type flags: tuple(str)
96
    """
97 5
    if "folder" in flags:
98 5
        parser.add_argument("-f",
99 5
                            "--folder",
100
                            dest="folder",
101
                            help="Working folder",
102 5
                            default=None,
103
                            metavar="DIR",
104
                            type=utilities.file_exists)
105
    return parser
106
107
108
def dpf_flags_osr(parser, flags=None):
109
    """
110
    Add generic OS/radio/software flags to parser.
111
112 5
    :param parser: Parser to modify.
113 5
    :type parser: argparse.ArgumentParser
114 5
115 5
    :param flags: Tuple of sections to add.
116 5
    :type flags: tuple(str)
117
    """
118
    if "osr" in flags:
119 5
        parser.add_argument("os", help="OS version")
120
        parser.add_argument("radio",
121
                            help="Radio version, 10.x.y.zzzz",
122
                            nargs="?",
123
                            default=None)
124
        parser.add_argument("swrelease",
125
                            help="Software version, 10.x.y.zzzz",
126
                            nargs="?",
127
                            default=None)
128
    return parser
129
130
131
def default_parser(name=None, desc=None, flags=None, vers=None):
132
    """
133
    A generic form of argparse's ArgumentParser.
134
135 5
    :param name: App name.
136 5
    :type name: str
137 5
138 5
    :param desc: App description.
139 5
    :type desc: str
140
141
    :param flags: Tuple of sections to add.
142 5
    :type flags: tuple(str)
143
144
    :param vers: Versions: [git commit hash, git commit date]
145
    :param vers: list(str)
146
    """
147
    vers = default_parser_vers(vers)
148
    homeurl = "https://github.com/thurask/bbarchivist"
149
    parser = argparse.ArgumentParser(prog=name, description=desc, epilog=homeurl)
150
    parser.add_argument("-v",
151
                        "--version",
152
                        action="version",
153
                        version="{0} {1} committed {2}".format(parser.prog, vers[0], vers[1]))
154
    parser = default_parser_flags(parser, flags)
155
    return parser
156
157
158 5
def generic_windows_shim(scriptname, scriptdesc, target, version):
159 5
    """
160 5
    Generic CFP/CAP runner; Windows only.
161 5
162 5
    :param scriptname: Script name, 'bb-something'.
163 5
    :type scriptname: str
164
165 5
    :param scriptdesc: Script description, i.e. scriptname -h.
166
    :type scriptdesc: str
167
168 5
    :param target: Path to file to execute.
169
    :type target: str
170
171
    :param version: Version of target.
172
    :type version: str
173
    """
174
    parser = default_parser(scriptname, scriptdesc)
175
    capver = "|{0}".format(version)
176
    parser = external_version(parser, capver)
177
    parser.parse_known_args(sys.argv[1:])
178 5
    if utilities.is_windows():
179 5
        subprocess.call([target] + sys.argv[1:])
180
    else:
181
        print("Sorry, Windows only.")
182 5
183
184
def arg_verify_none(argval, message):
185
    """
186
    Check if an argument is None, error out if it is.
187
188
    :param argval: Argument to check.
189
    :type argval: str
190
191
    :param message: Error message to print.
192 5
    :type message: str
193 5
    """
194 5
    if argval is None:
195
        raise argparse.ArgumentError(argument=None, message=message)
196
197 5
198
def external_version(parser, addition):
199
    """
200
    Modify the version string of argparse.ArgumentParser, adding something.
201
202
    :param parser: Parser to modify.
203
    :type parser: argparse.ArgumentParser
204
205
    :param addition: What to add.
206
    :type addition: str
207 5
    """
208 5
    verarg = [arg for arg in parser._actions if isinstance(arg, argparse._VersionAction)][0]
209 5
    verarg.version = "{1}{0}".format(addition, verarg.version)
210
    return parser
211
212 5
213
def return_radio_version(osversion, radioversion=None):
214
    """
215
    Increment radio version, if need be.
216
217
    :param osversion: OS version.
218
    :type osversion: str
219 5
220 5
    :param radioversion: Radio version, None if incremented.
221 5
    :type radioversion: str
222 5
    """
223 5
    if radioversion is None:
224 5
        radioversion = utilities.increment(osversion, 1)
225
    return radioversion
226 5
227 5
228
def sw_check_contingency(softwareversion):
229 5
    """
230 5
    Ask in the event software release isn't found.
231
232
    :param softwareversion: Software release version.
233 5
    :type softwareversion: str
234
    """
235
    if softwareversion == "SR not in system":
236
        print("SOFTWARE RELEASE NOT FOUND")
237
        cont = utilities.i2b("INPUT MANUALLY? Y/N: ")
238
        if cont:
239
            softwareversion = input("SOFTWARE RELEASE: ")
240
            swchecked = False
241
        else:
242
            print("\nEXITING...")
243 5
            raise SystemExit  # bye bye
244 5
    else:
245 5
        swchecked = True
246 5
    return softwareversion, swchecked
247
248 5
249 5
def return_sw_checked(softwareversion, osversion):
250
    """
251
    Check software existence, return boolean.
252 5
253
    :param softwareversion: Software release version.
254
    :type softwareversion: str
255
256
    :param osversion: OS version.
257
    :type osversion: str
258
    """
259
    if softwareversion is None:
260
        serv = bbconstants.SERVERS["p"]
261
        softwareversion = networkutils.sr_lookup(osversion, serv)
262 5
        softwareversion, swchecked = sw_check_contingency(softwareversion)
263 5
    else:
264 5
        swchecked = True
265 5
    return softwareversion, swchecked
266 5
267
268 5
def return_radio_sw_checked(altsw, radioversion):
269 5
    """
270
    Check radio software existence, return boolean.
271
272 5
    :param altsw: Software release version.
273
    :type altsw: str
274
275
    :param radioversion: Radio version.
276
    :type radioversion: str
277
    """
278
    if altsw == "checkme":
279
        serv = bbconstants.SERVERS["p"]
280
        testos = utilities.increment(radioversion, -1)
281
        altsw = networkutils.sr_lookup(testos, serv)
282
        altsw, altchecked = sw_check_contingency(altsw)
283
    else:
284
        altchecked = True
285
    return altsw, altchecked
286
287
288 5
def check_sw(baseurl, softwareversion, swchecked, altsw=False):
289 5
    """
290 5
    Check existence of software release.
291 5
292
    :param baseurl: Base URL (from http to hashed SW release).
293 5
    :type baseurl: str
294
295
    :param softwareversion: Software release.
296 5
    :type softwareversion: str
297
298
    :param swchecked: If we checked the sw release already.
299
    :type swchecked: bool
300
301
    :param altsw: If this is the radio-only release. Default is false.
302
    :type altsw: bool
303
    """
304
    message = "CHECKING RADIO SOFTWARE RELEASE..." if altsw else "CHECKING SOFTWARE RELEASE..."
305
    print(message)
306 5
    if not swchecked:
307 5
        check_sw_actual(baseurl, softwareversion)
308 5
    else:
309
        print("SOFTWARE RELEASE {0} EXISTS".format(softwareversion))
310 5
311
312
def check_sw_actual(baseurl, softwareversion):
313 5
    """
314
    Get the status of a software release.
315
316
    :param baseurl: Base URL (from http to hashed SW release).
317
    :type baseurl: str
318
319
    :param softwareversion: Software release.
320 5
    :type softwareversion: str
321 5
    """
322 5
    avlty = networkutils.availability(baseurl)
323 5
    if avlty:
324 5
        print("SOFTWARE RELEASE {0} EXISTS".format(softwareversion))
325
    else:
326
        check_sw_handle(softwareversion)
327 5
328
329
def check_sw_handle(softwareversion):
330
    """
331
    Handle non-existent software release.
332
333
    :param softwareversion: Software release.
334
    :type softwareversion: str
335
    """
336
    print("SOFTWARE RELEASE {0} NOT FOUND".format(softwareversion))
337
    cont = utilities.i2b("CONTINUE? Y/N: ")
338
    if not cont:
339
        print("\nEXITING...")
340 5
        raise SystemExit
341
342
343 5
def check_radio_sw(alturl, altsw, altchecked):
344
    """
345
    Check existence of radio software release.
346
347
    :param alturl: Radio base URL (from http to hashed SW release).
348
    :type alturl: str
349
350 5
    :param altsw: Radio software release.
351 5
    :type altsw: str
352 5
353 5
    :param altchecked: If we checked the sw release already.
354
    :type altchecked: bool
355 5
    """
356 5
    return check_sw(alturl, altsw, altchecked, True)
357
358
359 5
def check_altsw(altcheck=False):
360
    """
361
    Ask for and return alternate software release, if needed.
362
363
    :param altcheck: If we're using an alternate software release.
364
    :type altcheck: bool
365
    """
366
    if altcheck:
367
        altsw = input("RADIO SOFTWARE RELEASE (PRESS ENTER TO GUESS): ")
368
        if not altsw:
369
            altsw = "checkme"
370
    else:
371
        altsw = None
372 5
    return altsw
373 5
374 5
375 5
def check_os_single(osurl, osversion, device):
376 5
    """
377 5
    Check existence of single OS link.
378 5
379
    :param radiourl: Radio URL to check.
380
    :type radiourl: str
381 5
382
    :param radioversion: Radio version.
383
    :type radioversion: str
384
385
    :param device: Device family.
386
    :type device: int
387
    """
388 5
    osav = networkutils.availability(osurl)
389 5
    if not osav:
390 5
        print("{0} NOT AVAILABLE FOR {1}".format(osversion, bbconstants.DEVICES[device]))
391 5
        cont = utilities.i2b("CONTINUE? Y/N: ")
392 5
        if not cont:
393
            print("\nEXITING...")
394 5
            raise SystemExit
395
396
397 5
def check_os_bulk(osurls):
398
    """
399
    Check existence of list of OS links.
400
401 5
    :param osurls: OS URLs to check.
402 5
    :type osurls: list(str)
403 5
    """
404 5
    sess = requests.Session()
405 5
    for url in osurls:
406
        osav = networkutils.availability(url, sess)
407
        if osav:
408 5
            break
409
    else:
410
        check_os_bulk_handle()
411
412
413
def check_os_bulk_handle():
414
    """
415
    Handle no existing OS links.
416
    """
417
    print("OS VERSION NOT FOUND")
418 5
    cont = utilities.i2b("CONTINUE? Y/N: ")
419 5
    if not cont:
420 5
        print("\nEXITING...")
421 5
        raise SystemExit
422 5
423 5
424 5
def check_radio_single(radiourl, radioversion):
425 5
    """
426
    Check existence of single radio link.
427 5
428 5
    :param radiourl: Radio URL to check.
429 5
    :type radiourl: str
430 5
431 5
    :param radioversion: Radio version.
432
    :type radioversion: str
433
    """
434 5
    radav = networkutils.availability(radiourl)
435
    if not radav:
436
        print("RADIO VERSION NOT FOUND")
437
        cont = utilities.i2b("INPUT MANUALLY? Y/N: ")
438
        if cont:
439
            rad2 = input("RADIO VERSION: ")
440
            radiourl = radiourl.replace(radioversion, rad2)
441
            radioversion = rad2
442
        else:
443
            going = utilities.i2b("KEEP GOING? Y/N: ")
444 5
            if not going:
445 5
                print("\nEXITING...")
446 5
                raise SystemExit
447 5
    return radiourl, radioversion
448 5
449
450 5
def check_radio_bulk(radiourls, radioversion):
451 5
    """
452
    Check existence of list of radio links.
453
454 5
    :param radiourls: Radio URLs to check.
455
    :type radiourls: list(str)
456
457
    :param radioversion: Radio version.
458
    :type radioversion: str
459
    """
460
    sess = requests.Session()
461
    for url in radiourls:
462
        radav = networkutils.availability(url, sess)
463
        if radav:
464 5
            break
465 5
    else:
466 5
        radiourls, radioversion = check_radio_bulk_notfound(radiourls, radioversion)
467 5
    return radiourls, radioversion
468
469 5
470 5
def check_radio_bulk_notfound(radiourls, radioversion):
471
    """
472
    What to do if radio links aren't found.
473 5
474
    :param radiourls: Radio URLs to check.
475
    :type radiourls: list(str)
476
477
    :param radioversion: Radio version.
478
    :type radioversion: str
479
    """
480
    print("RADIO VERSION NOT FOUND")
481
    cont = utilities.i2b("INPUT MANUALLY? Y/N: ")
482
    if cont:
483 5
        radiourls, radioversion = check_radio_bulk_go(radiourls, radioversion)
484 5
    else:
485 5
        check_radio_bulk_stop()
486 5
    return radiourls, radioversion
487
488
489 5
def check_radio_bulk_go(radiourls, radioversion):
490
    """
491
    Replace radio version and URLs, and keep going.
492
493 5
    :param radiourls: Radio URLs to check.
494 5
    :type radiourls: list(str)
495 5
496 5
    :param radioversion: Radio version.
497
    :type radioversion: str
498
    """
499 5
    rad2 = input("RADIO VERSION: ")
500
    radiourls = [url.replace(radioversion, rad2) for url in radiourls]
501
    radioversion = rad2
502
    return radiourls, radioversion
503
504
505
def check_radio_bulk_stop():
506 5
    """
507 5
    Ask if we should keep going once no radio has been found.
508 5
    """
509
    going = utilities.i2b("KEEP GOING? Y/N: ")
510
    if not going:
511 5
        print("\nEXITING...")
512
        raise SystemExit
513
514
515
def bulk_avail(urllist):
516
    """
517
    Filter 404 links out of URL list.
518
519
    :param urllist: URLs to check.
520
    :type urllist: list(str)
521 5
    """
522 5
    sess = requests.Session()
523 5
    url2 = [x for x in urllist if networkutils.availability(x, sess)]
524
    return url2
525
526 5
527
def get_baseurls(softwareversion, altsw=None):
528
    """
529
    Generate base URLs for bar links.
530
531
    :param softwareversion: Software version.
532
    :type softwareversion: str
533 5
534 5
    :param altsw: Radio software version, if necessary.
535
    :type altsw: str
536 5
    """
537 5
    baseurl = utilities.create_base_url(softwareversion)
538 5
    alturl = utilities.create_base_url(altsw) if altsw else None
539 5
    return baseurl, alturl
540 5
541
542 5
def get_sz_executable(compmethod):
543 5
    """
544 5
    Get 7z executable.
545 5
546 5
    :param compmethod: Compression method.
547 5
    :type compmethod: str
548
    """
549 5
    if compmethod != "7z":
550 5
        szexe = ""
551 5
    else:
552
        print("CHECKING PRESENCE OF 7ZIP...")
553
        psz = utilities.prep_seven_zip(True)
554 5
        if psz:
555
            print("7ZIP OK")
556
            szexe = utilities.get_seven_zip(False)
557
        else:
558
            szexe = ""
559
            print("7ZIP NOT FOUND")
560
            cont = utilities.i2b("CONTINUE? Y/N ")
561
            if cont:
562
                print("FALLING BACK TO ZIP...")
563
                compmethod = "zip"
564 5
            else:
565 5
                print("\nEXITING...")
566 5
                raise SystemExit  # bye bye
567 5
    return compmethod, szexe
568 5
569 5
570 5
def test_bar_files(localdir, urllist):
571 5
    """
572
    Test bar files after download.
573 5
574
    :param localdir: Directory.
575
    :type localdir: str
576 5
577
    :param urllist: List of URLs to check.
578
    :type urllist: list(str)
579
    """
580
    print("TESTING BAR FILES...")
581
    brokenlist = []
582
    for file in os.listdir(localdir):
583
        brokenlist = test_bar_files_individual(file, localdir, urllist, brokenlist)
584
    if brokenlist:
585
        print("SOME FILES ARE BROKEN!")
586
        utilities.lprint(brokenlist)
587
        raise SystemExit
588
    else:
589
        print("BAR FILES DOWNLOADED OK")
590
591
592 5
def test_bar_files_individual(file, localdir, urllist, brokenlist):
593 5
    """
594 5
    Test bar file after download.
595 5
596 5
    :param file: Bar file to check.
597 5
    :type file: str
598
599
    :param localdir: Directory.
600 5
    :type localdir: str
601
602
    :param urllist: List of URLs to check.
603
    :type urllist: list(str)
604
605
    :param brokenlist: List of URLs to download later.
606
    :type brokenlist: list(str)
607
    """
608
    if file.endswith(".bar"):
609
        print("TESTING: {0}".format(file))
610
        thepath = os.path.abspath(os.path.join(localdir, file))
611
        brokens = barutils.bar_tester(thepath)
612
        brokenlist = bar_broken_individual(brokens, urllist, brokenlist)
613 5
    return brokenlist
614 5
615 5
616 5
def bar_broken_individual(brokens, urllist, brokenlist):
617 5
    """
618 5
    What to do if a downloaded bar file is broken.
619
620
    :param brokens: None if bar is OK, filename if it is not.
621 5
    :type brokens: str
622
623
    :param urllist: List of URLs to check.
624
    :type urllist: list(str)
625
626
    :param brokenlist: List of URLs to download later.
627
    :type brokenlist: list(str)
628 5
    """
629 5
    if brokens is not None:
630 5
        os.remove(brokens)
631 5
        for url in urllist:
632 5
            if brokens in url:
633 5
                brokenlist.append(url)
634 5
    return brokenlist
635 5
636 5
637
def test_signed_files(localdir):
638 5
    """
639
    Test signed files after extract.
640
641 5
    :param localdir: Directory.
642
    :type localdir: str
643
    """
644
    print("TESTING SIGNED FILES...")
645
    for file in os.listdir(localdir):
646
        if file.endswith(".bar"):
647
            print("TESTING: {0}".format(file))
648 5
            signname, signhash = barutils.retrieve_sha512(os.path.join(localdir, file))
649 5
            sha512ver = barutils.verify_sha512(os.path.join(localdir, signname.decode("utf-8")), signhash)
650
            if not sha512ver:
651 5
                print("{0} IS BROKEN".format((file)))
652 5
                break
653 5
    else:
654 5
        print("ALL FILES EXTRACTED OK")
655 5
656 5
657
def test_loader_files(localdir):
658 5
    """
659
    Test loader files after creation.
660
661 5
    :param localdir: Directory.
662
    :type localdir: str
663
    """
664
    if not utilities.is_windows():
665
        pass
666
    else:
667
        print("TESTING LOADER FILES...")
668 5
        brokens = utilities.verify_bulk_loaders(localdir)
669 5
        if brokens:
670
            print("BROKEN FILES:")
671 5
            utilities.lprint(brokens)
672 5
            raise SystemExit
673 5
        else:
674 5
            print("ALL FILES CREATED OK")
675
676 5
677
def test_single_loader(loaderfile):
678
    """
679 5
    Test single loader file after creation.
680
681
    :param loaderfile: File to check.
682
    :type loaderfile: str
683
    """
684
    if not utilities.is_windows():
685
        pass
686
    else:
687
        print("TESTING LOADER...")
688
        if not utilities.verify_loader_integrity(loaderfile):
689
            print("{0} IS BROKEN!".format(os.path.basename(loaderfile)))
690
            raise SystemExit
691
        else:
692
            print("LOADER CREATED OK")
693
694
695 5
def prod_avail(results, mailer=False, osversion=None, password=None):
696 5
    """
697 5
    Clean availability for production lookups for autolookup script.
698 5
699 5
    :param results: Result dict.
700 5
    :type results: dict(str: str)
701 5
702
    :param mailer: If we're mailing links. Default is false.
703 5
    :type mailer: bool
704 5
705 5
    :param osversion: OS version.
706
    :type osversion: str
707
708 5
    :param password: Email password.
709
    :type password: str
710
    """
711
    prel = results['p']
712
    if prel != "SR not in system" and prel is not None:
713
        pav = "PD"
714
        baseurl = utilities.create_base_url(prel)
715
        avail = networkutils.availability(baseurl)
716
        is_avail = "Available" if avail else "Unavailable"
717
        prod_avail_mailprep(prel, avail, osversion, mailer, password)
718
    else:
719
        pav = "  "
720
        is_avail = "Unavailable"
721
    return prel, pav, is_avail
722
723
724
def prod_avail_mailprep(prel, avail, osversion=None, mailer=False, password=None):
725
    """
726
    Do SQL/SMTP prep work after a good production lookup hit.
727 5
728 5
    :param prel: Software lookup result.
729 5
    :type prel: str
730 5
731 5
    :param avail: If software lookup result is available for download.
732 5
    :type avail: bool
733
734
    :param osversion: OS version.
735 5
    :type osversion: str
736
737
    :param mailer: If we're mailing links. Default is false.
738
    :type mailer: bool
739
740
    :param password: Email password.
741
    :type password: str
742
    """
743
    if avail and mailer:
744
        sqlutils.prepare_sw_db()
745
        if not sqlutils.check_exists(osversion, prel):
746
            rad = utilities.increment(osversion, 1)
747
            linkgen(osversion, rad, prel, temp=True)
748 5
            smtputils.prep_email(osversion, prel, password)
749 5
750
751
def comp_joiner(rootdir, localdir, filelist):
752 5
    """
753
    Join rootdir, localdir to every file in filelist.
754
755
    :param rootdir: Root directory.
756
    :type rootdir: str
757
758
    :param localdir: Subfolder inside rootdir.
759 5
    :type localdir: str
760 5
761 5
    :param filelist: List of files to return this path for.
762 5
    :type filelist: list(str)
763
    """
764
    joinedfiles = [os.path.join(rootdir, localdir, os.path.basename(x)) for x in filelist]
765 5
    return joinedfiles
766
767
768
def kernchecker_prep(kernlist):
769
    """
770
    Prepare output from kernel list.
771
772
    :param kernlist: List of kernel branches.
773
    :type kernlist: list(str)
774
    """
775 5
    splitkerns = [x.split("/") for x in kernlist]
776 5
    platforms = list({x[0] for x in splitkerns})
777 5
    kerndict = kernchecker_dict(splitkerns, platforms)
778 5
    return kerndict
779
780
781 5
def kernchecker_dict(splitkerns, platforms):
782
    """
783
    Prepare results dictionary.
784
785
    :param splitkerns: Split kernel branches.
786
    :type splitkerns: list(str)
787
788
    :param platforms: List of platform dicts.
789
    :type platforms: list(dict)
790
    """
791 5
    kerndict = {x: [] for x in platforms}
792 5
    for kernel in splitkerns:
793 5
        kerndict[kernel[0]].append("\t{0}".format(kernel[1]))
794
    return kerndict
795
796 5
797
def tclloader_prep(loaderfile, directory=False):
798
    """
799
    Prepare directory name and OS version.
800
801
    :param loaderfile: Path to input file/folder.
802
    :type loaderfile: str
803
804
    :param directory: If the input file is a folder. Default is False.
805
    :type directory: bool
806
    """
807
    loaderdir = loaderfile if directory else loaderfile.replace(".zip", "")
808
    osver = loaderdir.split("-")[-1]
809 5
    return loaderdir, osver
810 5
811 5
812 5
def tclloader_filename(loaderdir, osver, loadername=None):
813
    """
814
    Prepare platform and filename.
815 5
816
    :param loaderdir: Path to input folder.
817
    :type loaderdir: str
818
819
    :param osver: OS version.
820
    :type osver: str
821
822
    :param loadername: Name of final autoloader. Default is auto-generated.
823
    :type loadername: str
824
    """
825
    platform = os.listdir(os.path.join(loaderdir, "target", "product"))[0]
826
    if loadername is None:
827
        loadername = "{0}_autoloader_user-all-{1}".format(platform, osver)
828
    return loadername, platform
829
830
831
def tcl_download(downloadurl, filename, filesize, filehash, verify=True):
832
    """
833
    Download autoloader file, rename, and verify.
834 5
835 5
    :param downloadurl: Download URL.
836 5
    :type downloadurl: str
837 5
838 5
    :param filename: Name of autoloader file.
839 5
    :type filename: str
840 5
841 5
    :param filesize: Size of autoloader file.
842 5
    :type filesize: str
843 5
844
    :param filehash: SHA-1 hash of autoloader file.
845 5
    :type filehash: str
846 5
847
    :param verify: Whether to verify the file after downloading. Default is True.
848
    :type verify: bool
849 5
    """
850
    print("FILENAME: {0}".format(filename))
851
    print("LENGTH: {0}".format(utilities.fsizer(filesize)))
852
    networkutils.download(downloadurl)
853
    print("DOWNLOAD COMPLETE")
854
    os.rename(downloadurl.split("/")[-1], filename)
855
    if verify:
856
        method = hashutils.get_engine("sha1")
857
        shahash = hashutils.hashlib_hash(filename, method)
858
        if shahash == filehash:
859
            print("HASH CHECK OK")
860
        else:
861
            print(shahash)
862
            print("HASH FAILED!")
863
864
865
def tcl_prd_scan(curef, download=False, mode=4, fvver="AAA000", original=True, export=False, verify=True):
2 ignored issues
show
best-practice introduced by
Too many arguments (7/5)
Loading history...
Comprehensibility introduced by
This function exceeds the maximum number of variables (20/15).
Loading history...
866
    """
867
    Scan one PRD and produce download URL and filename.
868
869
    :param curef: PRD of the phone variant to check.
870
    :type curef: str
871
872
    :param download: If we'll download the file that this returns. Default is False.
873
    :type download: bool
874 5
875 5
    :param mode: 4 if downloading autoloaders, 2 if downloading OTA deltas.
876 5
    :type mode: int
877 5
878 5
    :param fvver: Initial software version, must be specific if downloading OTA deltas.
879 5
    :type fvver: str
880 5
881 5
    :param original: If we'll download the file with its original filename. Default is True.
882 5
    :type original: bool
883 5
884 5
    :param export: Whether to export XML response to file. Default is False.
885 5
    :type export: bool
886 5
887 5
    :param verify: Whether to verify the file after downloading. Default is True.
888
    :type verify: bool
889
    """
890 5
    sess = requests.Session()
891
    ctext = networkutils.tcl_check(curef, sess, mode, fvver, export)
892
    if ctext is None:
893
        raise SystemExit
894
    tvver, firmwareid, filename, filesize, filehash = networkutils.parse_tcl_check(ctext)
895
    salt = networkutils.tcl_salt()
896
    vkhsh = networkutils.vkhash(curef, tvver, firmwareid, salt, mode, fvver)
897
    updatetext = networkutils.tcl_download_request(curef, tvver, firmwareid, salt, vkhsh, sess, mode, fvver, export)
898
    downloadurl, encslave = networkutils.parse_tcl_download_request(updatetext)
899
    statcode = networkutils.getcode(downloadurl, sess)
900
    filename = tcl_delta_filename(curef, fvver, tvver, filename, original)
901
    tcl_prd_print(downloadurl, filename, statcode, encslave, sess)
902
    if statcode == 200 and download:
903
        tcl_download(downloadurl, filename, filesize, filehash, verify)
904
905
906
def tcl_delta_filename(curef, fvver, tvver, filename, original=True):
907
    """
908
    Generate compatible filenames for deltas, if needed.
909 5
910 5
    :param curef: PRD of the phone variant to check.
911 5
    :type curef: str
912 5
913
    :param fvver: Initial software version.
914
    :type fvver: str
915 5
916
    :param tvver: Target software version.
917
    :type tvver: str
918
919
    :param filename: File name from download URL, passed through if not changing filename.
920
    :type filename: str
921
922
    :param original: If we'll download the file with its original filename. Default is True.
923
    :type original: bool
924
    """
925
    if not original:
926
        prdver = curef.split("-")[1]
927
        filename = "JSU_PRD-{0}-{1}to{2}.zip".format(prdver, fvver, tvver)
928
    return filename
929
930
931
def tcl_prd_print(downloadurl, filename, statcode, encslave, session):
932
    """
933
    Print output from PRD scanning.
934 5
935 5
    :param downloadurl: File to download.
936 5
    :type downloadurl: str
937 5
938 5
    :param filename: File name from download URL.
939 5
    :type filename: str
940 5
941 5
    :param statcode: Status code of download URL.
942
    :type statcode: int
943
944 5
    :param encslave: Server hosting header script.
945
    :type encslave: str
946
947
    :param session: Session object.
948
    :type session: requests.Session
949
    """
950
    print("{0}: HTTP {1}".format(filename, statcode))
951 5
    print(downloadurl)
952 5
    if encslave is not None:
953 5
        address = "/{0}".format(downloadurl.split("/", 3)[3:][0])
954
        print("CHECKING HEADER...")
955 5
        sentinel = networkutils.encrypt_header(address, encslave, session)
956 5
        if sentinel is not None:
957 5
            print(sentinel)
958
959
960 5
def tcl_prep_otaver(ota=None):
961
    """
962
    Prepare variables for OTA versus full check.
963
964
    :param ota: The starting version if OTA, None if not. Default is None.
965
    :type ota: str
966
    """
967 5
    if ota is not None:
968 5
        mode = 2
969 5
        fvver = ota
970 5
    else:
971 5
        mode = 4
972 5
        fvver = "AAA000"
973
    return mode, fvver
974
975 5
976
def tcl_delta_remote(curef):
977
    """
978
    Prepare remote version for delta scanning.
979
980
    :param curef: PRD of the phone variant to check.
981
    :type curef: str
982 5
    """
983 5
    remotedict = networkutils.remote_prd_info()
984 5
    fvver = remotedict.get(curef, "AAA000")
985
    if fvver == "AAA000":
986
        print("NO REMOTE VERSION FOUND!")
987 5
        raise SystemExit
988
    return fvver
989
990
991
def tcl_mainscan_preamble(ota=None):
992
    """
993
    Prepare preamble for TCL scanning.
994
995
    :param ota: The starting version if OTA, None if not. Default is None.
996
    :type ota: str
997
    """
998
    slim_preamble("TCLSCAN")
999
    if ota is not None:
1000 5
        print("PRDs with OTA from OS {0}".format(ota.upper()))
1001 5
1002
1003 5
def tcl_mainscan_printer(curef, tvver, ota=None):
1004
    """
1005
    Print output of TCL scanning.
1006 5
1007
    :param curef: PRD of the phone variant to check.
1008
    :type curef: str
1009
1010
    :param tvver: Target software version.
1011
    :type tvver: str
1012
1013 5
    :param ota: The starting version if OTA, None if not. Default is None.
1014 5
    :type ota: str
1015 5
    """
1016 5
    if ota is not None:
1017
        print("{0}: {2} to {1}".format(curef, tvver, ota.upper()))
1018
    else:
1019 5
        print("{0}: {1}".format(curef, tvver))
1020
1021
1022
def tcl_findprd_prepd_start(prddict):
1023
    """
1024
    Collect list of PRD entries.
1025
1026 5
    :param prddict: Device:PRD dictionary.
1027 5
    :type prddict: dict(str: list)
1028 5
    """
1029
    prda = []
1030
    for item in prddict.values():
1031 5
        prda.extend(item)
1032
    return prda
1033
1034
1035
def tcl_findprd_prepd_middle(prda):
1036
    """
1037
    Convert PRD entries to list of center:end entries.
1038 5
1039 5
    :param prda: List of PRD-xxxxx-yyy entries.
1040 5
    :type prda: list(str)
1041 5
    """
1042 5
    prds = [x.split(" ")[0].replace("PRD-", "").split("-") for x in prda]
1043
    prdx = list({x[0]: x[1]} for x in prds)
1044
    return prdx
1045 5
1046
1047
def tcl_findprd_prepd_end(prdx):
1048
    """
1049
    Convert list of center:end entries to final center:[ends] dict.
1050
1051
    :param prdx: List of center:end dict entries.
1052 5
    :type prdx: list(dict(str: str))
1053 5
    """
1054 5
    prdf = collections.defaultdict(list)
1055 5
    for prdc in prdx:
1056
        for key, value in prdc.items():
1057
            prdf[key].append(value)
1058 5
    return prdf
1059
1060
1061
def tcl_findprd_prepdict(prddict):
1062
    """
1063
    Prepare dict of center:[ends] entries.
1064
1065
    :param prddict: Device:PRD dictionary.
1066
    :type prddict: dict(str: list)
1067
    """
1068 5
    prda = tcl_findprd_prepd_start(prddict)
1069 5
    prdx = tcl_findprd_prepd_middle(prda)
1070 5
    prdfinal = tcl_findprd_prepd_end(prdx)
1071 5
    return prdfinal
1072 5
1073 5
1074 5
def tcl_findprd_checkfilter(prddict, tocheck=None):
1075
    """
1076
    Filter PRD dict if needed.
1077 5
1078
    :param prddict: PRD center:[ends] dictionary.
1079
    :type prddict: collections.defaultdict(str: list)
1080
1081
    :param tocheck: Specific PRD(s) to check, None if all will be checked. Default is None.
1082
    :type tocheck: list(str)
1083
    """
1084
    prddict2 = prddict
1085
    if tocheck is not None:
1086
        prddict2 = collections.defaultdict(list)
1087
        for toch in tocheck:
1088
            toch = toch.replace("PRD-", "")
1089
            prddict2[toch] = prddict[toch]
1090
    return prddict2
1091
1092
1093
def tcl_findprd_centerscan(center, prddict, session, floor=0, ceiling=999, export=False):
1 ignored issue
show
best-practice introduced by
Too many arguments (6/5)
Loading history...
1094
    """
1095
    Individual scanning for the center of a PRD.
1096
1097
    :param center: PRD-center-end.
1098
    :type center: str
1099 5
1100 5
    :param prddict: PRD center:[ends] dictionary.
1101 5
    :type prddict: collections.defaultdict(str: list)
1102 5
1103
    :param session: Session object.
1104
    :type session: requests.Session
1105 5
1106
    :param floor: When to start. Default is 0.
1107
    :type floor: int
1108
1109
    :param ceiling: When to stop. Default is 999.
1110
    :type ceiling: int
1111
1112
    :param export: Whether to export XML response to file. Default is False.
1113
    :type export: bool
1114
    """
1115
    tails = [int(i) for i in prddict[center]]
1116
    safes = [g for g in range(floor, ceiling) if g not in tails]
1117
    print("SCANNING ROOT: {0}{1}".format(center, " "*8))
1118
    tcl_findprd_safescan(safes, center, session, export)
1119
1120
1121 5
def tcl_findprd_safescan(safes, center, session, export=False):
1122 5
    """
1123 5
    Scan for PRDs known not to be in database.
1124 5
1125 5
    :param safes: List of ends within given range that aren't in database.
1126 5
    :type safes: list(int)
1127
1128 5
    :param center: PRD-center-end.
1129
    :type center: str
1130
1131 5
    :param session: Session object.
1132
    :type session: requests.Session
1133
1134
    :param export: Whether to export XML response to file. Default is False.
1135
    :type export: bool
1136
    """
1137
    for j in safes:
1138
        curef = "PRD-{}-{:03}".format(center, j)
1139
        print("NOW SCANNING: {0}".format(curef), end="\r")
1140
        checktext = networkutils.tcl_check(curef, session, export)
1141 5
        if checktext is None:
1142 5
            continue
1143 5
        else:
1144 5
            tcl_findprd_safehandle(curef, checktext)
1145
1146
1147 5
def tcl_findprd_safehandle(curef, checktext):
1148
    """
1149
    Parse API output and print the relevant bits.
1150
1151
    :param curef: PRD of the phone variant to check.
1152
    :type curef: str
1153
1154
    :param checktext: The XML formatted data returned from the first stage API check.
1155
    :type checktext: str
1156
    """
1157
    tvver, firmwareid, filename, fsize, fhash = networkutils.parse_tcl_check(checktext)
1158
    del firmwareid, filename, fsize, fhash
1159
    tvver2 = "{0}{1}".format(tvver, " "*8)
1160
    tcl_mainscan_printer(curef, tvver2)
1161
1162
1163 5
def tcl_findprd(prddict, floor=0, ceiling=999, export=False):
1164 5
    """
1165 5
    Check for new PRDs based on PRD database.
1166
1167
    :param prddict: PRD center:[ends] dictionary.
1168 5
    :type prddict: collections.defaultdict(str: list)
1169
1170
    :param floor: When to start. Default is 0.
1171
    :type floor: int
1172
1173
    :param ceiling: When to stop. Default is 999.
1174
    :type ceiling: int
1175
1176
    :param export: Whether to export XML response to file. Default is False.
1177
    :type export: bool
1178
    """
1179
    sess = requests.Session()
1180
    for center in sorted(prddict.keys()):
1181 5
        tcl_findprd_centerscan(center, prddict, sess, floor, ceiling, export)
1182
1183
1184 5
def linkgen_sdk_dicter(indict, origtext, newtext):
1185
    """
1186
    Prepare SDK radio/OS dictionaries.
1187
1188
    :param indict: Dictionary of radio and OS pairs.
1189
    :type: dict(str:str)
1190
1191
    :param origtext: String in indict's values that must be replaced.
1192
    :type origtext: str
1193
1194
    :param newtext: What to replace origtext with.
1195
    :type newtext: str
1196
    """
1197 5
    return {key: val.replace(origtext, newtext) for key, val in indict.items()}
1198 5
1199 5
1200 5
def linkgen_sdk(sdk, oses, cores):
1201 5
    """
1202 5
    Generate SDK debrick/core images.
1203
1204
    :param sdk: If we specifically want SDK images. Default is False.
1205 5
    :type sdk: bool
1206
1207
    :param oses: Dictionary of radio and debrick pairs.
1208
    :type oses: dict(str:str)
1209
1210
    :param cores: Dictionary of radio and core pairs.
1211
    :type cores: dict(str:str)
1212
    """
1213
    if sdk:
1214
        oses2 = linkgen_sdk_dicter(oses, "factory_sfi", "sdk")
1215
        cores2 = linkgen_sdk_dicter(cores, "factory_sfi", "sdk")
1216
        oses = linkgen_sdk_dicter(oses2, "verizon_sfi", "sdk")
1217
        cores = linkgen_sdk_dicter(cores2, "verizon_sfi", "sdk")
1218
    return oses, cores
1219
1220
1221
def linkgen(osversion, radioversion=None, softwareversion=None, altsw=None, temp=False, sdk=False):
2 ignored issues
show
best-practice introduced by
Too many arguments (6/5)
Loading history...
Comprehensibility introduced by
This function exceeds the maximum number of variables (17/15).
Loading history...
1222
    """
1223
    Generate debrick/core/radio links for given OS, radio, software release.
1224
1225
    :param osversion: OS version, 10.x.y.zzzz.
1226
    :type osversion: str
1227 5
1228 5
    :param radioversion: Radio version, 10.x.y.zzzz. Can be guessed.
1229 5
    :type radioversion: str
1230 5
1231 5
    :param softwareversion: Software version, 10.x.y.zzzz. Can be guessed.
1232 5
    :type softwareversion: str
1233 5
1234 5
    :param altsw: Radio software release, if not the same as OS.
1235 5
    :type altsw: str
1236 5
1237 5
    :param temp: If file we write to is temporary. Default is False.
1238 5
    :type temp: bool
1239 5
1240 5
    :param sdk: If we specifically want SDK images. Default is False.
1241 5
    :type sdk: bool
1242 5
    """
1243 5
    radioversion = return_radio_version(osversion, radioversion)
1244 5
    softwareversion, swc = return_sw_checked(softwareversion, osversion)
1245
    del swc
1246
    if altsw is not None:
1247 5
        altsw, aswc = return_radio_sw_checked(altsw, radioversion)
1248
        del aswc
1249
    baseurl = utilities.create_base_url(softwareversion)
1250
    oses, cores, radios = textgenerator.url_gen(osversion, radioversion, softwareversion)
1251
    if altsw is not None:
1252
        del radios
1253
        dbks, cors, radios = textgenerator.url_gen(osversion, radioversion, altsw)
1254 5
        del dbks
1255 5
        del cors
1256 5
    avlty = networkutils.availability(baseurl)
1257 5
    oses, cores = linkgen_sdk(sdk, oses, cores)
1258
    prargs = (softwareversion, osversion, radioversion, oses, cores, radios, avlty, False, None, temp, altsw)
1259 5
    lthr = threading.Thread(target=textgenerator.write_links, args=prargs)
1260 5
    lthr.start()
1261
1262
1263 5
def clean_swrel(swrelset):
1264
    """
1265
    Clean a list of software release lookups.
1266
1267
    :param swrelset: List of software releases.
1268
    :type swrelset: set(str)
1269
    """
1270
    for i in swrelset:
1271
        if i != "SR not in system" and i is not None:
1272
            swrelease = i
1273 5
            break
1274 5
    else:
1275
        swrelease = ""
1276
    return swrelease
1277 5
1278
1279
def autolookup_logger(record, out):
1280
    """
1281
    Write autolookup results to file.
1282
1283
    :param record: The file to log to.
1284
    :type record: str
1285
1286
    :param out: Output block.
1287
    :type out: str
1288
    """
1289
    with open(record, "a") as rec:
1290
        rec.write("{0}\n".format(out))
1291
1292
1293
def autolookup_printer(out, avail, log=False, quiet=False, record=None):
1294
    """
1295
    Print autolookup results, logging if specified.
1296 5
1297 5
    :param out: Output block.
1298 5
    :type out: str
1299 5
1300 5
    :param avail: Availability. Can be "Available" or "Unavailable".
1301 5
    :type avail: str
1302 5
1303
    :param log: If we're logging to file.
1304
    :type log: bool
1305 5
1306
    :param quiet: If we only note available entries.
1307
    :type quiet: bool
1308
1309
    :param record: If we're logging, the file to log to.
1310
    :type record: str
1311
    """
1312
    if not quiet:
1313
        avail = "Available"  # force things
1314
    if avail.lower() == "available":
1315
        if log:
1316
            lthr = threading.Thread(target=autolookup_logger, args=(record, out))
1317
            lthr.start()
1318
        print(out)
1319
1320
1321 5
def autolookup_output_sql(osversion, swrelease, avail, sql=False):
1322 5
    """
1323 5
    Add OS to SQL database.
1324 5
1325
    :param osversion: OS version.
1326
    :type osversion: str
1327 5
1328
    :param swrelease: Software release.
1329
    :type swrelease: str
1330
1331
    :param avail: "Unavailable" or "Available".
1332
    :type avail: str
1333
1334
    :param sql: If we're adding this to our SQL database.
1335
    :type sql: bool
1336
    """
1337
    if sql:
1338
        sqlutils.prepare_sw_db()
1339
        if not sqlutils.check_exists(osversion, swrelease):
1340
            sqlutils.insert(osversion, swrelease, avail.lower())
1341
1342
1343
def autolookup_output(osversion, swrelease, avail, avpack, sql=False):
1344
    """
1345
    Prepare autolookup block, and add to SQL database.
1346 5
1347 5
    :param osversion: OS version.
1348 5
    :type osversion: str
1349 5
1350 5
    :param swrelease: Software release.
1351
    :type swrelease: str
1352
1353 5
    :param avail: "Unavailable" or "Available".
1354
    :type avail: str
1355
1356
    :param avpack: Availabilities: alpha 1 and 2, beta 1 and 2, production.
1357
    :type avpack: list(str)
1358
1359
    :param sql: If we're adding this to our SQL database.
1360
    :type sql: bool
1361
    """
1362
    othr = threading.Thread(target=autolookup_output_sql, args=(osversion, swrelease, avail, sql))
1363 5
    othr.start()
1364 5
    avblok = "[{0}|{1}|{2}|{3}|{4}]".format(*avpack)
1365
    out = "OS {0} - SR {1} - {2} - {3}".format(osversion, swrelease, avblok, avail)
1366
    return out
1367 5
1368
1369
def clean_barlist(cleanfiles, stoppers):
1370
    """
1371
    Remove certain bars from barlist based on keywords.
1372
1373
    :param cleanfiles: List of files to clean.
1374
    :type cleanfiles: list(str)
1375
1376
    :param stoppers: List of keywords (i.e. bar names) to exclude.
1377
    :type stoppers: list(str)
1378
    """
1379
    finals = [link for link in cleanfiles if all(word not in link for word in stoppers)]
1380
    return finals
1381
1382
1383
def prep_export_cchecker(files, npc, hwid, osv, radv, swv, upgrade=False, forced=None):
1 ignored issue
show
best-practice introduced by
Too many arguments (8/5)
Loading history...
1384
    """
1385
    Prepare carrierchecker lookup links to write to file.
1386
1387
    :param files: List of file URLs.
1388
    :type files: list(str)
1389
1390
    :param npc: MCC + MNC (ex. 302220).
1391
    :type npc: int
1392
1393
    :param hwid: Device hardware ID.
1394
    :type hwid: str
1395 5
1396 5
    :param osv: OS version.
1397 5
    :type osv: str
1398
1399 5
    :param radv: Radio version.
1400 5
    :type radv: str
1401 5
1402 5
    :param swv: Software release.
1403 5
    :type swv: str
1404
1405
    :param upgrade: Whether or not to use upgrade files. Default is false.
1406 5
    :type upgrade: bool
1407
1408
    :param forced: Force a software release. None to go for latest.
1409
    :type forced: str
1410
    """
1411
    if not upgrade:
1412
        newfiles = networkutils.carrier_query(npc, hwid, True, False, forced)
1413
        cleanfiles = newfiles[3]
1414
    else:
1415
        cleanfiles = files
1416
    osurls, coreurls, radiourls = textgenerator.url_gen(osv, radv, swv)
1417
    stoppers = ["8960", "8930", "8974", "m5730", "winchester"]
1418
    finals = clean_barlist(cleanfiles, stoppers)
1419
    return osurls, coreurls, radiourls, finals
1420
1421
1422
def export_cchecker(files, npc, hwid, osv, radv, swv, upgrade=False, forced=None):
1 ignored issue
show
best-practice introduced by
Too many arguments (8/5)
Loading history...
1423
    """
1424
    Write carrierchecker lookup links to file.
1425
1426
    :param files: List of file URLs.
1427
    :type files: list(str)
1428
1429
    :param npc: MCC + MNC (ex. 302220).
1430
    :type npc: int
1431
1432
    :param hwid: Device hardware ID.
1433
    :type hwid: str
1434 5
1435 5
    :param osv: OS version.
1436 5
    :type osv: str
1437 5
1438
    :param radv: Radio version.
1439 5
    :type radv: str
1440
1441
    :param swv: Software release.
1442 5
    :type swv: str
1443
1444
    :param upgrade: Whether or not to use upgrade files. Default is false.
1445
    :type upgrade: bool
1446
1447
    :param forced: Force a software release. None to go for latest.
1448
    :type forced: str
1449
    """
1450
    if files:
1451
        osurls, coreurls, radiourls, finals = prep_export_cchecker(files, npc, hwid, osv, radv, swv, upgrade, forced)
1452
        textgenerator.write_links(swv, osv, radv, osurls, coreurls, radiourls, True, True, finals)
1453
        print("\nFINISHED!!!")
1454
    else:
1455
        print("CANNOT EXPORT, NO SOFTWARE RELEASE")
1456
1457 5
1458
def generate_blitz_links(files, osv, radv, swv):
1459
    """
1460
    Generate blitz URLs (i.e. all OS and radio links).
1461
    :param files: List of file URLs.
1462
    :type files: list(str)
1463 5
1464
    :param osv: OS version.
1465
    :type osv: str
1466
1467
    :param radv: Radio version.
1468
    :type radv: str
1469
1470
    :param swv: Software release.
1471 5
    :type swv: str
1472
    """
1473
    coreurls = [
1474 5
        utilities.create_bar_url(swv, "winchester.factory_sfi", osv),
1475
        utilities.create_bar_url(swv, "qc8960.factory_sfi", osv),
1476
        utilities.create_bar_url(swv, "qc8960.factory_sfi", osv),
1477
        utilities.create_bar_url(swv, "qc8960.factory_sfi_hybrid_qc8974", osv)
1478
    ]
1479
    radiourls = [
1480
        utilities.create_bar_url(swv, "m5730", radv),
1481
        utilities.create_bar_url(swv, "qc8960", radv),
1482
        utilities.create_bar_url(swv, "qc8960.wtr", radv),
1483
        utilities.create_bar_url(swv, "qc8960.wtr5", radv),
1484 5
        utilities.create_bar_url(swv, "qc8930.wtr5", radv),
1485 5
        utilities.create_bar_url(swv, "qc8974.wtr2", radv)
1486 5
    ]
1487 5
    return files + coreurls + radiourls
1488 5
1489 5
1490 5
def package_blitz(bardir, swv):
1491
    """
1492 5
    Package and verify a blitz package.
1493
1494
    :param bardir: Path to folder containing bar files.
1495 5
    :type bardir: str
1496
1497
    :param swv: Software version.
1498
    :type swv: str
1499
    """
1500
    print("\nCREATING BLITZ...")
1501
    barutils.create_blitz(bardir, swv)
1502 5
    print("\nTESTING BLITZ...")
1503
    zipver = archiveutils.zip_verify("Blitz-{0}.zip".format(swv))
1504
    if not zipver:
1505 5
        print("BLITZ FILE IS BROKEN")
1506
        raise SystemExit
1507
    else:
1508
        shutil.rmtree(bardir)
1509
1510
1511
def slim_preamble(appname):
1512
    """
1513
    Standard app name header.
1514
1515
    :param appname: Name of app.
1516
    :type appname: str
1517
    """
1518
    print("~~~{0} VERSION {1}~~~".format(appname.upper(), shortversion()))
1519
1520
1521
def standard_preamble(appname, osversion, softwareversion, radioversion, altsw=None):
1522
    """
1523
    Standard app name, OS, radio and software (plus optional radio software) print block.
1524 5
1525 5
    :param appname: Name of app.
1526 5
    :type appname: str
1527 5
1528 5
    :param osversion: OS version, 10.x.y.zzzz. Required.
1529 5
    :type osversion: str
1530
1531
    :param radioversion: Radio version, 10.x.y.zzzz. Can be guessed.
1532 5
    :type radioversion: str
1533
1534
    :param softwareversion: Software release, 10.x.y.zzzz. Can be guessed.
1535
    :type softwareversion: str
1536 5
1537 5
    :param altsw: Radio software release, if not the same as OS.
1538 5
    :type altsw: str
1539 5
    """
1540 5
    slim_preamble(appname)
1541 5
    print("OS VERSION: {0}".format(osversion))
1542 5
    print("OS SOFTWARE VERSION: {0}".format(softwareversion))
1543 5
    print("RADIO VERSION: {0}".format(radioversion))
1544
    if altsw is not None:
1545
        print("RADIO SOFTWARE VERSION: {0}".format(altsw))
1546 5
1547
1548
def questionnaire_device(message=None):
1549
    """
1550 5
    Get device from questionnaire.
1551 5
    """
1552 5
    message = "DEVICE (XXX100-#): " if message is None else message
1553 5
    device = input(message)
1554 5
    if not device:
1555 5
        print("NO DEVICE SPECIFIED!")
1556 5
        decorators.enter_to_exit(True)
1557 5
        if not getattr(sys, 'frozen', False):
1558 5
            raise SystemExit
1559
    return device
1560 5
1561 5
1562
def verify_gpg_credentials():
1563
    """
1564 5
    Read GPG key/pass from file, verify if incomplete.
1565
    """
1566
    gpgkey, gpgpass = gpgutils.gpg_config_loader()
1567
    if gpgkey is None or gpgpass is None:
1568
        print("NO PGP KEY/PASS FOUND")
1569
        cont = utilities.i2b("CONTINUE (Y/N)?: ")
1570
        if cont:
1571 5
            gpgkey = verify_gpg_key(gpgkey)
1572 5
            gpgpass, writebool = verify_gpg_pass(gpgpass)
1573 5
            gpgpass2 = gpgpass if writebool else None
1574 5
            gpgutils.gpg_config_writer(gpgkey, gpgpass2)
1575 5
        else:
1576
            gpgkey = None
1577
    return gpgkey, gpgpass
1578 5
1579
1580
def verify_gpg_key(gpgkey=None):
1581
    """
1582
    Verify GPG key.
1583
1584
    :param gpgkey: Key, use None to take from input.
1585 5
    :type gpgkey: str
1586 5
    """
1587 5
    if gpgkey is None:
1588
        gpgkey = input("PGP KEY (0x12345678): ")
1589 5
        if not gpgkey.startswith("0x"):
1590 5
            gpgkey = "0x{0}".format(gpgkey)   # add preceding 0x
1591
    return gpgkey
1592
1593 5
1594
def verify_gpg_pass(gpgpass=None):
1595
    """
1596
    Verify GPG passphrase.
1597
1598
    :param gpgpass: Passphrase, use None to take from input.
1599
    :type gpgpass: str
1600
    """
1601
    if gpgpass is None:
1602
        gpgpass = getpass.getpass(prompt="PGP PASSPHRASE: ")
1603
        writebool = utilities.i2b("SAVE PASSPHRASE (Y/N)?:")
1604
    else:
1605
        writebool = False
1606
    return gpgpass, writebool
1607
1608
1609
def bulk_hash(dirs, compressed=True, deleted=True, radios=True, hashdict=None):
1610
    """
1611
    Hash files in several folders based on flags.
1612 5
1613 5
    :param dirs: Folders: [OS_bars, radio_bars, OS_exes, radio_exes, OS_zips, radio_zips]
1614
    :type dirs: list(str)
1615
1616 5
    :param compressed: Whether to hash compressed files. True by default.
1617
    :type compressed: bool
1618
1619
    :param deleted: Whether to delete uncompressed files. True by default.
1620
    :type deleted: bool
1621
1622
    :param radios: Whether to hash radio autoloaders. True by default.
1623
    :type radios: bool
1624
1625
    :param hashdict: Dictionary of hash rules, in ~\bbarchivist.ini.
1626
    :type hashdict: dict({str: bool})
1627
    """
1628
    print("HASHING LOADERS...")
1629
    defargs = utilities.def_args(dirs)
1630
    utilities.cond_check(hashutils.verifier, defargs, [hashdict], radios, compressed, deleted)
1631
1632 5
1633 5
def bulk_verify(dirs, compressed=True, deleted=True, radios=True):
1634 5
    """
1635 5
    Verify files in several folders based on flags.
1636 5
1637 5
    :param dirs: Folders: [OS_bars, radio_bars, OS_exes, radio_exes, OS_zips, radio_zips]
1638
    :type dirs: list(str)
1639
1640 5
    :param compressed: Whether to hash compressed files. True by default.
1641
    :type compressed: bool
1642
1643
    :param deleted: Whether to delete uncompressed files. True by default.
1644
    :type deleted: bool
1645
1646
    :param radios: Whether to hash radio autoloaders. True by default.
1647 5
    :type radios: bool
1648
    """
1649
    gpgkey, gpgpass = verify_gpg_credentials()
1650 5
    if gpgpass is not None and gpgkey is not None:
1651
        print("VERIFYING LOADERS...")
1652
        print("KEY: {0}".format(gpgkey))
1653
        restargs = [gpgkey, gpgpass, True]
1654
        defargs = utilities.def_args(dirs)
1655
        utilities.cond_check(gpgutils.gpgrunner, defargs, restargs, radios, compressed, deleted)
1656
1657 5
1658 5
def enn_ayy(quant):
1659 5
    """
1660 5
    Cheeky way to put a N/A placeholder for a string.
1661
1662
    :param quant: What to check if it's None.
1663 5
    :type quant: str
1664
    """
1665
    return "N/A" if quant is None else quant
1666
1667
1668
def generate_workfolder(folder=None):
1669
    """
1670
    Check if a folder exists, make it if it doesn't, set it to home if None.
1671
1672
    :param folder: Folder to check.
1673
    :type folder: str
1674
    """
1675
    folder = utilities.dirhandler(folder, os.getcwd())
1676
    if folder is not None and not os.path.exists(folder):
1677
        os.makedirs(folder)
1678
    return folder
1679
1680
1681
def info_header(afile, osver, radio=None, software=None, device=None):
1682 5
    """
1683 5
    Write header for info file.
1684 5
1685
    :param afile: Open file to write to.
1686 5
    :type afile: File object
1687 5
1688 5
    :param osver: OS version, required for both types.
1689
    :type osver: str
1690
1691 5
    :param radio: Radio version, required for QNX.
1692
    :type radio: str
1693
1694
    :param software: Software release, required for QNX.
1695
    :type software: str
1696
1697
    :param device: Device type, required for Android.
1698
    :type device: str
1699
    """
1700
    afile.write("OS: {0}\n".format(osver))
1701
    if device:
1702
        afile.write("Device: {0}\n".format(enn_ayy(device)))
1703
    else:
1704 5
        afile.write("Radio: {0}\n".format(enn_ayy(radio)))
1705 5
        afile.write("Software: {0}\n".format(enn_ayy(software)))
1706 5
    afile.write("{0}\n".format("~"*40))
1707 5
1708 5
1709
def prep_info(filepath, osver, device=None):
1710
    """
1711 5
    Prepare file list for new-style info file.
1712
1713
    :param filepath: Path to folder to analyze.
1714
    :type filepath: str
1715
1716
    :param osver: OS version, required for both types.
1717
    :type osver: str
1718
1719
    :param device: Device type, required for Android.
1720
    :type device: str
1721
    """
1722
    fileext = ".zip" if device else ".7z"
1723
    files = os.listdir(filepath)
1724
    absfiles = [os.path.join(filepath, x) for x in files if x.endswith((fileext, ".exe"))]
1725
    fname = os.path.join(filepath, "!{0}_OSINFO!.txt".format(osver))
1726
    return fname, absfiles
1727
1728
1729
def make_info(filepath, osver, radio=None, software=None, device=None):
1730 5
    """
1731 5
    Create a new-style info (names, sizes and hashes) file.
1732 5
1733 5
    :param filepath: Path to folder to analyze.
1734 5
    :type filepath: str
1735
1736
    :param osver: OS version, required for both types.
1737 5
    :type osver: str
1738
1739
    :param radio: Radio version, required for QNX.
1740
    :type radio: str
1741
1742
    :param software: Software release, required for QNX.
1743
    :type software: str
1744
1745
    :param device: Device type, required for Android.
1746
    :type device: str
1747
    """
1748
    fname, absfiles = prep_info(filepath, osver, device)
1749
    with open(fname, "w") as afile:
1750
        info_header(afile, osver, radio, software, device)
1751
        for indx, file in enumerate(absfiles):
1752
            write_info(file, indx, len(absfiles), afile)
1753 5
1754 5
1755 5
def write_info(infile, index, filecount, outfile):
1756 5
    """
1757 5
    Write a new-style info (names, sizes and hashes) file.
1758 5
1759 5
    :param infile: Path to file whose name, size and hash are to be written.
1760 5
    :type infile: str
1761 5
1762 5
    :param index: Which file index out of the list of files we're writing.
1763
    :type index: int
1764
1765 5
    :param filecount: Total number of files we're to write; for excluding terminal newline.
1766
    :type filecount: int
1767
1768
    :param outfile: Open (!!!) file handle. Output file.
1769
    :type outfile: str
1770
    """
1771
    fsize = os.stat(infile).st_size
1772
    outfile.write("File: {0}\n".format(os.path.basename(infile)))
1773
    outfile.write("\tSize: {0} ({1})\n".format(fsize, utilities.fsizer(fsize)))
1774
    outfile.write("\tHashes:\n")
1775
    outfile.write("\t\tMD5: {0}\n".format(hashutils.hashlib_hash(infile, hashlib.md5()).upper()))
1776
    outfile.write("\t\tSHA1: {0}\n".format(hashutils.hashlib_hash(infile, hashlib.sha1()).upper()))
1777
    outfile.write("\t\tSHA256: {0}\n".format(hashutils.hashlib_hash(infile, hashlib.sha256()).upper()))
1778
    outfile.write("\t\tSHA512: {0}\n".format(hashutils.hashlib_hash(infile, hashlib.sha512()).upper()))
1779
    if index != filecount - 1:
1780
        outfile.write("\n")
1781
1782
1783
def bulk_info(dirs, osv, compressed=True, deleted=True, radios=True, rad=None, swv=None, dev=None):
1 ignored issue
show
best-practice introduced by
Too many arguments (8/5)
Loading history...
1784
    """
1785
    Generate info files in several folders based on flags.
1786
1787
    :param dirs: Folders: [OS_bars, radio_bars, OS_exes, radio_exes, OS_zips, radio_zips]
1788
    :type dirs: list(str)
1789
1790
    :param osver: OS version, required for both types.
1791
    :type osver: str
1792
1793 5
    :param compressed: Whether to hash compressed files. True by default.
1794 5
    :type compressed: bool
1795 5
1796
    :param deleted: Whether to delete uncompressed files. True by default.
1797
    :type deleted: bool
1798
1799
    :param radios: Whether to hash radio autoloaders. True by default.
1800
    :type radios: bool
1801
1802
    :param rad: Radio version, required for QNX.
1803
    :type rad: str
1804
1805
    :param swv: Software release, required for QNX.
1806
    :type swv: str
1807
1808
    :param dev: Device type, required for Android.
1809
    :type dev: str
1810
    """
1811
    print("GENERATING INFO FILES...")
1812
    restargs = [osv, rad, swv, dev]
1813
    utilities.cond_check(make_info, utilities.def_args(dirs), restargs, radios, compressed, deleted)
1814