Completed
Push — master ( 8ea8b2...72dabb )
by John
03:38
created

encrypt_header()   B

Complexity

Conditions 4

Size

Total Lines 26

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 13
CRAP Score 4

Importance

Changes 0
Metric Value
cc 4
dl 0
loc 26
ccs 13
cts 13
cp 1
crap 4
rs 8.5806
c 0
b 0
f 0
1
#!/usr/bin/env python3
2 5
"""This module is used for network connections; APIs, downloading, etc."""
3
4 5
import os  # filesystem read
5 5
try:
6 5
    from defusedxml import ElementTree  # safer XML parsing
7 1
except (ImportError, AttributeError):
8 1
    from xml.etree import ElementTree  # XML parsing
9 5
import re  # regexes
10 5
import concurrent.futures  # multiprocessing/threading
11 5
import glob  # pem file lookup
12 5
import hashlib  # salt
13 5
import time  # salt
14 5
import random  # salt
15 5
import base64  # encoding
16 5
import requests  # downloading
17 5
from bs4 import BeautifulSoup  # scraping
18 5
from bbarchivist import utilities  # parse filesize
19 5
from bbarchivist.bbconstants import SERVERS  # lookup servers
20
21 5
__author__ = "Thurask"
22 5
__license__ = "WTFPL v2"
23 5
__copyright__ = "2015-2017 Thurask"
24
25
26 5
def grab_pem():
27
    """
28
    Work with either local cacerts or system cacerts.
29
    """
30 5
    try:
31 5
        pemfile = glob.glob(os.path.join(os.getcwd(), "cacert.pem"))[0]
32 5
    except IndexError:
33 5
        return requests.certs.where()  # no local cacerts
34
    else:
35 5
        return os.path.abspath(pemfile)  # local cacerts
36
37
38 5
def pem_wrapper(method):
39
    """
40
    Decorator to set REQUESTS_CA_BUNDLE.
41
42
    :param method: Method to use.
43
    :type method: function
44
    """
45 5
    def wrapper(*args, **kwargs):
46
        """
47
        Set REQUESTS_CA_BUNDLE before doing function.
48
        """
49 5
        os.environ["REQUESTS_CA_BUNDLE"] = grab_pem()
50 5
        return method(*args, **kwargs)
51 5
    return wrapper
52
53
54 5
def generic_session(session=None):
55
    """
56
    Create a Requests session object on the fly, if need be.
57
58
    :param session: Requests session object, created if this is None.
59
    :type session: requests.Session()
60
    """
61 5
    sess = requests.Session() if session is None else session
62 5
    return sess
63
64
65 5
def generic_soup_parser(url, session=None):
66
    """
67
    Get a BeautifulSoup HTML parser for some URL.
68
69
    :param url: The URL to check.
70
    :type url: str
71
72
    :param session: Requests session object, default is created on the fly.
73
    :type session: requests.Session()
74
    """
75 5
    session = generic_session(session)
76 5
    req = session.get(url)
77 5
    soup = BeautifulSoup(req.content, "html.parser")
78 5
    return soup
79
80
81 5
@pem_wrapper
82 5
def get_length(url, session=None):
83
    """
84
    Get content-length header from some URL.
85
86
    :param url: The URL to check.
87
    :type url: str
88
89
    :param session: Requests session object, default is created on the fly.
90
    :type session: requests.Session()
91
    """
92 5
    session = generic_session(session)
93 5
    if url is None:
94 5
        return 0
95 5
    try:
96 5
        heads = session.head(url)
97 5
        fsize = heads.headers['content-length']
98 5
        return int(fsize)
99 5
    except requests.ConnectionError:
100 5
        return 0
101
102
103 5
@pem_wrapper
104 5
def download(url, output_directory=None, session=None):
105
    """
106
    Download file from given URL.
107
108
    :param url: URL to download from.
109
    :type url: str
110
111
    :param output_directory: Download folder. Default is local.
112
    :type output_directory: str
113
114
    :param session: Requests session object, default is created on the fly.
115
    :type session: requests.Session()
116
    """
117 5
    session = generic_session(session)
118 5
    output_directory = utilities.dirhandler(output_directory, os.getcwd())
119 5
    lfname = url.split('/')[-1]
120 5
    sname = utilities.stripper(lfname)
121 5
    fname = os.path.join(output_directory, lfname)
122 5
    download_writer(url, fname, lfname, sname, session)
123 5
    remove_empty_download(fname)
124
125
126 5
def remove_empty_download(fname):
127
    """
128
    Remove file if it's empty.
129
130
    :param fname: File path.
131
    :type fname: str
132
    """
133 5
    if os.stat(fname).st_size == 0:
134 5
        os.remove(fname)
135
136
137 5
def download_writer(url, fname, lfname, sname, session=None):
138
    """
139
    Download file and write to disk.
140
141
    :param url: URL to download from.
142
    :type url: str
143
144
    :param fname: File path.
145
    :type fname: str
146
147
    :param lfname: Long filename.
148
    :type lfname: str
149
150
    :param sname: Short name, for printing to screen.
151
    :type sname: str
152
153
    :param session: Requests session object, default is created on the fly.
154
    :type session: requests.Session()
155
    """
156 5
    with open(fname, "wb") as file:
157 5
        req = session.get(url, stream=True)
158 5
        clength = req.headers['content-length']
159 5
        fsize = utilities.fsizer(clength)
160 5
        if req.status_code == 200:  # 200 OK
161 5
            print("DOWNLOADING {0} [{1}]".format(sname, fsize))
162 5
            for chunk in req.iter_content(chunk_size=1024):
163 5
                file.write(chunk)
164
        else:
165 5
            print("ERROR: HTTP {0} IN {1}".format(req.status_code, lfname))
166
167
168 5
def download_bootstrap(urls, outdir=None, workers=5, session=None):
169
    """
170
    Run downloaders for each file in given URL iterable.
171
172
    :param urls: URLs to download.
173
    :type urls: list
174
175
    :param outdir: Download folder. Default is handled in :func:`download`.
176
    :type outdir: str
177
178
    :param workers: Number of worker processes. Default is 5.
179
    :type workers: int
180
181
    :param session: Requests session object, default is created on the fly.
182
    :type session: requests.Session()
183
    """
184 5
    workers = len(urls) if len(urls) < workers else workers
185 5
    spinman = utilities.SpinManager()
186 5
    with concurrent.futures.ThreadPoolExecutor(max_workers=workers) as xec:
187 5
        try:
188 5
            spinman.start()
189 5
            for url in urls:
190 5
                xec.submit(download, url, outdir, session)
191 5
        except (KeyboardInterrupt, SystemExit):
192 5
            xec.shutdown()
193 5
            spinman.stop()
194 5
    spinman.stop()
195 5
    utilities.spinner_clear()
196 5
    utilities.line_begin()
197
198
199 5
def download_android_tools(downloaddir=None):
200
    """
201
    Download Android SDK platform tools.
202
203
    :param downloaddir: Directory name, default is "plattools".
204
    :type downloaddir: str
205
    """
206 5
    if downloaddir is None:
207 5
        downloaddir = "plattools"
208 5
    if os.path.exists(downloaddir):
209 5
        os.removedirs(downloaddir)
210 5
    os.mkdir(downloaddir)
211 5
    platforms = ("windows", "linux", "darwin")
212 5
    dlurls = ["https://dl.google.com/android/repository/platform-tools-latest-{0}.zip".format(plat) for plat in platforms]
213 5
    sess = generic_session()
214 5
    download_bootstrap(dlurls, outdir="plattools", session=sess)
215
216
217 5
@pem_wrapper
218 5
def getcode(url, session=None):
219
    """
220
    Return status code of given URL.
221
222
    :param url: URL to check.
223
    :type url: str
224
225
    :param session: Requests session object, default is created on the fly.
226
    :type session: requests.Session()
227
    """
228 5
    session = generic_session(session)
229 5
    try:
230 5
        shead = session.head(url)
231 5
        status = int(shead.status_code)
232 5
        return status
233 5
    except requests.ConnectionError:
234 5
        return 404
235
236
237 5
@pem_wrapper
238 5
def availability(url, session=None):
239
    """
240
    Check HTTP status code of given URL.
241
    200 or 301-308 is OK, else is not.
242
243
    :param url: URL to check.
244
    :type url: str
245
246
    :param session: Requests session object, default is created on the fly.
247
    :type session: requests.Session()
248
    """
249 5
    status = getcode(url, session)
250 5
    return status == 200 or 300 < status <= 308
251
252
253 5
def clean_availability(results, server):
254
    """
255
    Clean availability for autolookup script.
256
257
    :param results: Result dict.
258
    :type results: dict(str: str)
259
260
    :param server: Server, key for result dict.
261
    :type server: str
262
    """
263 5
    marker = "PD" if server == "p" else server.upper()
264 5
    rel = results[server.lower()]
265 5
    avail = marker if rel != "SR not in system" and rel is not None else "  "
266 5
    return rel, avail
267
268
269 5
@pem_wrapper
270 5
def tcl_check(curef, session=None, mode=4, fvver="AAM481"):
271
    """
272
    Check TCL server for updates.
273
274
    :param curef: PRD of the phone variant to check.
275
    :type curef: str
276
277
    :param session: Requests session object, default is created on the fly.
278
    :type session: requests.Session()
279
280
    :param mode: 4 if downloading autoloaders, 2 if downloading OTA deltas.
281
    :type mode: int
282
283
    :param fvver: Initial software version, must be specific if downloading OTA deltas.
284
    :type fvver: str
285
    """
286 5
    sess = generic_session(session)
287 5
    geturl = "http://g2master-us-east.tctmobile.com/check.php"
288 5
    params = {"id": "543212345000000", "curef": curef, "fv": fvver, "mode": mode, "type": "Firmware", "cltp": 2010, "cktp": 2, "rtd": 1, "chnl": 2}
289 5
    req = sess.get(geturl, params=params)
290 5
    if req.status_code == 200:
291 5
        return req.text
292
    else:
293 5
        return None
294
295
296 5
def parse_tcl_check(data):
297
    """
298
    Extract version and file info from TCL update server response.
299
300
    :param data: The data to parse.
301
    :type data: str
302
    """
303 5
    root = ElementTree.fromstring(data)
304 5
    tvver = root.find("VERSION").find("TV").text
305 5
    fwid = root.find("FIRMWARE").find("FW_ID").text
306 5
    fileinfo = root.find("FIRMWARE").find("FILESET").find("FILE")
307 5
    filename = fileinfo.find("FILENAME").text
308 5
    filesize = fileinfo.find("SIZE").text
309 5
    filehash = fileinfo.find("CHECKSUM").text
310 5
    return tvver, fwid, filename, filesize, filehash
311
312
313 5
def tcl_salt():
314
    """
315
    Generate salt value for TCL server tools.
316
    """
317 5
    millis = round(time.time() * 1000)
318 5
    tail = "{0:06d}".format(random.randint(0, 999999))
319 5
    return "{0}{1}".format(str(millis), tail)
320
321
322 5
def vkhash(curef, tvver, fwid, salt, mode=4, fvver="AAM481"):
323
    """
324
    Generate hash from TCL update server variables.
325
326
    :param curef: PRD of the phone variant to check.
327
    :type curef: str
328
329
    :param tvver: Target software version.
330
    :type tvver: str
331
332
    :param fwid: Firmware ID for desired download file.
333
    :type fwid: str
334
335
    :param salt: Salt hash.
336
    :type salt: str
337
338
    :param mode: 4 if downloading autoloaders, 2 if downloading OTA deltas.
339
    :type mode: int
340
341
    :param fvver: Initial software version, must be specific if downloading OTA deltas.
342
    :type fvver: str
343
    """
344 5
    vdkey = "1271941121281905392291845155542171963889169361242115412511417616616958244916823523421516924614377131161951402261451161002051042011757216713912611682532031591181861081836612643016596231212872211620511861302106446924625728571011411121471811641125920123641181975581511602312222261817375462445966911723844130106116313122624220514"
345 5
    query = "id={0}&salt={1}&curef={2}&fv={3}&tv={4}&type={5}&fw_id={6}&mode={7}&cltp={8}{9}".format("543212345000000", salt, curef, fvver, tvver, "Firmware", fwid, mode, 2010, vdkey)
346 5
    engine = hashlib.sha1()
347 5
    engine.update(bytes(query, "utf-8"))
348 5
    return engine.hexdigest()
349
350
351 5
@pem_wrapper
352 5
def tcl_download_request(curef, tvver, fwid, salt, vkh, session=None, mode=4, fvver="AAM481"):
353
    """
354
    Check TCL server for download URLs.
355
356
    :param curef: PRD of the phone variant to check.
357
    :type curef: str
358
359
    :param tvver: Target software version.
360
    :type tvver: str
361
362
    :param fwid: Firmware ID for desired download file.
363
    :type fwid: str
364
365
    :param salt: Salt hash.
366
    :type salt: str
367
368
    :param vkh: VDKey-based hash.
369
    :type vkh: str
370
371
    :param session: Requests session object, default is created on the fly.
372
    :type session: requests.Session()
373
374
    :param mode: 4 if downloading autoloaders, 2 if downloading OTA deltas.
375
    :type mode: int
376
377
    :param fvver: Initial software version, must be specific if downloading OTA deltas.
378
    :type fvver: str
379
    """
380 5
    sess = generic_session(session)
381 5
    posturl = "http://g2master-us-east.tctmobile.com/download_request.php"
382 5
    params = {"id": "543212345000000", "curef": curef, "fv": fvver, "mode": mode, "type": "Firmware", "tv": tvver, "fw_id": fwid, "salt": salt, "vk": vkh, "cltp": 2010}
383 5
    if mode == 4:
384 5
        params["foot"] = 1
385 5
    req = sess.post(posturl, data=params)
386 5
    if req.status_code == 200:
387 5
        return req.text
388
    else:
389 5
        return None
390
391
392 5
def parse_tcl_download_request(body, mode=4):
393
    """
394
    Extract file URL and encryt slave URL from TCL update server response.
395
396
    :param data: The data to parse.
397
    :type data: str
398
399
    :param mode: 4 if downloading autoloaders, 2 if downloading OTA deltas.
400
    :type mode: int
401
    """
402 5
    root = ElementTree.fromstring(body)
403 5
    slavelist = root.find("SLAVE_LIST").findall("SLAVE")
404 5
    slave = random.choice(slavelist).text
405 5
    dlurl = root.find("FILE_LIST").find("FILE").find("DOWNLOAD_URL").text
406 5
    eslave = root.find("SLAVE_LIST").findall("ENCRYPT_SLAVE")
407 5
    encslave = None if mode == 2 or not eslave else random.choice(eslave).text
408 5
    return "http://{0}{1}".format(slave, dlurl), encslave
409
410
411 5
@pem_wrapper
412 5
def encrypt_header(address, encslave, session=None):
413
    """
414
    Check encrypted header.
415
416
    :param address: File URL minus host.
417
    :type address: str
418
419
    :param encslave: Server hosting header script.
420
    :type encslave: str
421
422
    :param session: Requests session object, default is created on the fly.
423
    :type session: requests.Session()
424
    """
425 5
    sess = generic_session(session)
426 5
    encs = {b"YWNjb3VudA==" : b"emhlbmdodWEuZ2Fv", b"cGFzc3dvcmQ=": b"cWFydUQ0b2s="}
427 5
    params = {base64.b64decode(key): base64.b64decode(val) for key, val in encs.items()}
428 5
    params[b"address"] = bytes(address, "utf-8")
429 5
    url = "http://" + encslave + "/encrypt_header.php"
430 5
    req = sess.post(url, data=params)
431 5
    if req.status_code == 206:  # partial
432 5
        contentlength = int(req.headers["Content-Length"])
433 5
        sentinel = "HEADER FOUND" if contentlength == 4194320 else "NO HEADER FOUND"
434 5
        return sentinel
435
    else:
436 5
        return None
437
438
439 5
@pem_wrapper
440 5
def carrier_checker(mcc, mnc, session=None):
441
    """
442
    Query BlackBerry World to map a MCC and a MNC to a country and carrier.
443
444
    :param mcc: Country code.
445
    :type mcc: int
446
447
    :param mnc: Network code.
448
    :type mnc: int
449
450
    :param session: Requests session object, default is created on the fly.
451
    :type session: requests.Session()
452
    """
453 5
    session = generic_session(session)
454 5
    url = "http://appworld.blackberry.com/ClientAPI/checkcarrier?homemcc={0}&homemnc={1}&devicevendorid=-1&pin=0".format(mcc, mnc)
455 5
    user_agent = {'User-agent': 'AppWorld/5.1.0.60'}
456 5
    req = session.get(url, headers=user_agent)
457 5
    root = ElementTree.fromstring(req.text)
458 5
    for child in root:
459 5
        if child.tag == "country":
460 5
            country = child.get("name")
461 5
        if child.tag == "carrier":
462 5
            carrier = child.get("name")
463 5
    return country, carrier
464
465
466 5
def return_npc(mcc, mnc):
467
    """
468
    Format MCC and MNC into a NPC.
469
470
    :param mcc: Country code.
471
    :type mcc: int
472
473
    :param mnc: Network code.
474
    :type mnc: int
475
    """
476 5
    return "{0}{1}30".format(str(mcc).zfill(3), str(mnc).zfill(3))
477
478
479 5
@pem_wrapper
480 5
def carrier_query(npc, device, upgrade=False, blitz=False, forced=None, session=None):
481
    """
482
    Query BlackBerry servers, check which update is out for a carrier.
483
484
    :param npc: MCC + MNC (see `func:return_npc`)
485
    :type npc: int
486
487
    :param device: Hexadecimal hardware ID.
488
    :type device: str
489
490
    :param upgrade: Whether to use upgrade files. False by default.
491
    :type upgrade: bool
492
493
    :param blitz: Whether or not to create a blitz package. False by default.
494
    :type blitz: bool
495
496
    :param forced: Force a software release.
497
    :type forced: str
498
499
    :param session: Requests session object, default is created on the fly.
500
    :type session: requests.Session()
501
    """
502 5
    session = generic_session(session)
503 5
    upg = "upgrade" if upgrade else "repair"
504 5
    forced = "latest" if forced is None else forced
505 5
    url = "https://cs.sl.blackberry.com/cse/updateDetails/2.2/"
506 5
    query = '<?xml version="1.0" encoding="UTF-8"?>'
507 5
    query += '<updateDetailRequest version="2.2.1" authEchoTS="1366644680359">'
508 5
    query += "<clientProperties>"
509 5
    query += "<hardware>"
510 5
    query += "<pin>0x2FFFFFB3</pin><bsn>1128121361</bsn>"
511 5
    query += "<imei>004401139269240</imei>"
512 5
    query += "<id>0x{0}</id>".format(device)
513 5
    query += "</hardware>"
514 5
    query += "<network>"
515 5
    query += "<homeNPC>0x{0}</homeNPC>".format(npc)
516 5
    query += "<iccid>89014104255505565333</iccid>"
517 5
    query += "</network>"
518 5
    query += "<software>"
519 5
    query += "<currentLocale>en_US</currentLocale>"
520 5
    query += "<legalLocale>en_US</legalLocale>"
521 5
    query += "</software>"
522 5
    query += "</clientProperties>"
523 5
    query += "<updateDirectives>"
524 5
    query += '<allowPatching type="REDBEND">true</allowPatching>'
525 5
    query += "<upgradeMode>{0}</upgradeMode>".format(upg)
526 5
    query += "<provideDescriptions>false</provideDescriptions>"
527 5
    query += "<provideFiles>true</provideFiles>"
528 5
    query += "<queryType>NOTIFICATION_CHECK</queryType>"
529 5
    query += "</updateDirectives>"
530 5
    query += "<pollType>manual</pollType>"
531 5
    query += "<resultPackageSetCriteria>"
532 5
    query += '<softwareRelease softwareReleaseVersion="{0}" />'.format(forced)
533 5
    query += "<releaseIndependent>"
534 5
    query += '<packageType operation="include">application</packageType>'
535 5
    query += "</releaseIndependent>"
536 5
    query += "</resultPackageSetCriteria>"
537 5
    query += "</updateDetailRequest>"
538 5
    header = {"Content-Type": "text/xml;charset=UTF-8"}
539 5
    req = session.post(url, headers=header, data=query)
540 5
    return parse_carrier_xml(req.text, blitz)
541
542
543 5
def carrier_swver_get(root):
544
    """
545
    Get software release from carrier XML.
546
547
    :param root: ElementTree we're barking up.
548
    :type root: xml.etree.ElementTree.ElementTree
549
    """
550 5
    for child in root.iter("softwareReleaseMetadata"):
551 5
        swver = child.get("softwareReleaseVersion")
552 5
    return swver
553
554
555 5
def carrier_child_fileappend(child, files, baseurl, blitz=False):
556
    """
557
    Append bar file links to a list from a child element.
558
559
    :param child: Child element in use.
560
    :type child: xml.etree.ElementTree.Element
561
562
    :param files: Filelist.
563
    :type files: list(str)
564
565
    :param baseurl: Base URL, URL minus the filename.
566
    :type baseurl: str
567
568
    :param blitz: Whether or not to create a blitz package. False by default.
569
    :type blitz: bool
570
    """
571 5
    if not blitz:
572 5
        files.append(baseurl + child.get("path"))
573
    else:
574 5
        if child.get("type") not in ["system:radio", "system:desktop", "system:os"]:
575 5
            files.append(baseurl + child.get("path"))
576 5
    return files
577
578
579 5
def carrier_child_finder(root, files, baseurl, blitz=False):
580
    """
581
    Extract filenames, radio and OS from child elements.
582
583
    :param root: ElementTree we're barking up.
584
    :type root: xml.etree.ElementTree.ElementTree
585
586
    :param files: Filelist.
587
    :type files: list(str)
588
589
    :param baseurl: Base URL, URL minus the filename.
590
    :type baseurl: str
591
592
    :param blitz: Whether or not to create a blitz package. False by default.
593
    :type blitz: bool
594
    """
595 5
    osver = radver = ""
596 5
    for child in root.iter("package"):
597 5
        files = carrier_child_fileappend(child, files, baseurl, blitz)
598 5
        if child.get("type") == "system:radio":
599 5
            radver = child.get("version")
600 5
        elif child.get("type") == "system:desktop":
601 5
            osver = child.get("version")
602 5
        elif child.get("type") == "system:os":
603 5
            osver = child.get("version")
604 5
    return osver, radver, files
605
606
607 5
def parse_carrier_xml(data, blitz=False):
608
    """
609
    Parse the response to a carrier update request and return the juicy bits.
610
611
    :param data: The data to parse.
612
    :type data: xml
613
614
    :param blitz: Whether or not to create a blitz package. False by default.
615
    :type blitz: bool
616
    """
617 5
    root = ElementTree.fromstring(data)
618 5
    sw_exists = root.find('./data/content/softwareReleaseMetadata')
619 5
    swver = "N/A" if sw_exists is None else ""
620 5
    if sw_exists is not None:
621 5
        swver = carrier_swver_get(root)
622 5
    files = []
623 5
    package_exists = root.find('./data/content/fileSets/fileSet')
624 5
    osver = radver = ""
625 5
    if package_exists is not None:
626 5
        baseurl = "{0}/".format(package_exists.get("url"))
627 5
        osver, radver, files = carrier_child_finder(root, files, baseurl, blitz)
628 5
    return(swver, osver, radver, files)
629
630
631 5
@pem_wrapper
632 5
def sr_lookup(osver, server, session=None):
633
    """
634
    Software release lookup, with choice of server.
635
    :data:`bbarchivist.bbconstants.SERVERLIST` for server list.
636
637
    :param osver: OS version to lookup, 10.x.y.zzzz.
638
    :type osver: str
639
640
    :param server: Server to use.
641
    :type server: str
642
643
    :param session: Requests session object, default is created on the fly.
644
    :type session: requests.Session()
645
    """
646 5
    query = '<?xml version="1.0" encoding="UTF-8"?>'
647 5
    query += '<srVersionLookupRequest version="2.0.0"'
648 5
    query += ' authEchoTS="1366644680359">'
649 5
    query += '<clientProperties><hardware>'
650 5
    query += '<pin>0x2FFFFFB3</pin><bsn>1140011878</bsn>'
651 5
    query += '<imei>004402242176786</imei><id>0x8D00240A</id>'
652 5
    query += '<isBootROMSecure>true</isBootROMSecure>'
653 5
    query += '</hardware>'
654 5
    query += '<network>'
655 5
    query += '<vendorId>0x0</vendorId><homeNPC>0x60</homeNPC>'
656 5
    query += '<currentNPC>0x60</currentNPC><ecid>0x1</ecid>'
657 5
    query += '</network>'
658 5
    query += '<software><currentLocale>en_US</currentLocale>'
659 5
    query += '<legalLocale>en_US</legalLocale>'
660 5
    query += '<osVersion>{0}</osVersion>'.format(osver)
661 5
    query += '<omadmEnabled>false</omadmEnabled>'
662 5
    query += '</software></clientProperties>'
663 5
    query += '</srVersionLookupRequest>'
664 5
    reqtext = sr_lookup_poster(query, server, session)
665 5
    packtext = sr_lookup_xmlparser(reqtext)
666 5
    return packtext
667
668
669 5
def sr_lookup_poster(query, server, session=None):
670
    """
671
    Post the XML payload for a software release lookup.
672
673
    :param query: XML payload.
674
    :type query: str
675
676
    :param server: Server to use.
677
    :type server: str
678
679
    :param session: Requests session object, default is created on the fly.
680
    :type session: requests.Session()
681
    """
682 5
    session = generic_session(session)
683 5
    header = {"Content-Type": "text/xml;charset=UTF-8"}
684 5
    try:
685 5
        req = session.post(server, headers=header, data=query, timeout=1)
686 5
    except (requests.exceptions.Timeout, requests.exceptions.ConnectionError):
687 5
        reqtext = "SR not in system"
688
    else:
689 5
        reqtext = req.text
690 5
    return reqtext
691
692
693 5
def sr_lookup_xmlparser(reqtext):
694
    """
695
    Take the text of a software lookup request response and parse it as XML.
696
697
    :param reqtext: Response text, hopefully XML formatted.
698
    :type reqtext: str
699
    """
700 5
    try:
701 5
        root = ElementTree.fromstring(reqtext)
702 5
    except ElementTree.ParseError:
703 5
        packtext = "SR not in system"
704
    else:
705 5
        packtext = sr_lookup_extractor(root)
706 5
    return packtext
707
708
709 5
def sr_lookup_extractor(root):
710
    """
711
    Take an ElementTree and extract a software release from it.
712
713
    :param root: ElementTree we're barking up.
714
    :type root: xml.etree.ElementTree.ElementTree
715
    """
716 5
    reg = re.compile(r"(\d{1,4}\.)(\d{1,4}\.)(\d{1,4}\.)(\d{1,4})")
717 5
    packages = root.findall('./data/content/')
718 5
    for package in packages:
719 5
        if package.text is not None:
720 5
            match = reg.match(package.text)
721 5
            packtext = package.text if match else "SR not in system"
722 5
            return packtext
723
724
725 5
def sr_lookup_bootstrap(osv, session=None, no2=False):
726
    """
727
    Run lookups for each server for given OS.
728
729
    :param osv: OS to check.
730
    :type osv: str
731
732
    :param session: Requests session object, default is created on the fly.
733
    :type session: requests.Session()
734
735
    :param no2: Whether to skip Alpha2/Beta2 servers. Default is false.
736
    :type no2: bool
737
    """
738 5
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as xec:
739 5
        try:
740 5
            results = {
741
                "p": None,
742
                "a1": None,
743
                "a2": None,
744
                "b1": None,
745
                "b2": None
746
            }
747 5
            if no2:
748 5
                del results["a2"]
749 5
                del results["b2"]
750 5
            for key in results:
751 5
                results[key] = xec.submit(sr_lookup, osv, SERVERS[key], session).result()
752 5
            return results
753 5
        except KeyboardInterrupt:
754 5
            xec.shutdown(wait=False)
755
756
757 5
@pem_wrapper
758 5
def available_bundle_lookup(mcc, mnc, device, session=None):
759
    """
760
    Check which software releases were ever released for a carrier.
761
762
    :param mcc: Country code.
763
    :type mcc: int
764
765
    :param mnc: Network code.
766
    :type mnc: int
767
768
    :param device: Hexadecimal hardware ID.
769
    :type device: str
770
771
    :param session: Requests session object, default is created on the fly.
772
    :type session: requests.Session()
773
    """
774 5
    session = generic_session(session)
775 5
    server = "https://cs.sl.blackberry.com/cse/availableBundles/1.0.0/"
776 5
    npc = return_npc(mcc, mnc)
777 5
    query = '<?xml version="1.0" encoding="UTF-8"?>'
778 5
    query += '<availableBundlesRequest version="1.0.0" '
779 5
    query += 'authEchoTS="1366644680359">'
780 5
    query += '<deviceId><pin>0x2FFFFFB3</pin></deviceId>'
781 5
    query += '<clientProperties><hardware><id>0x{0}</id>'.format(device)
782 5
    query += '<isBootROMSecure>true</isBootROMSecure></hardware>'
783 5
    query += '<network><vendorId>0x0</vendorId><homeNPC>0x{0}</homeNPC>'.format(npc)
784 5
    query += '<currentNPC>0x{0}</currentNPC></network><software>'.format(npc)
785 5
    query += '<currentLocale>en_US</currentLocale>'
786 5
    query += '<legalLocale>en_US</legalLocale>'
787 5
    query += '<osVersion>10.0.0.0</osVersion>'
788 5
    query += '<radioVersion>10.0.0.0</radioVersion></software>'
789 5
    query += '</clientProperties><updateDirectives><bundleVersionFilter>'
790 5
    query += '</bundleVersionFilter></updateDirectives>'
791 5
    query += '</availableBundlesRequest>'
792 5
    header = {"Content-Type": "text/xml;charset=UTF-8"}
793 5
    req = session.post(server, headers=header, data=query)
794 5
    root = ElementTree.fromstring(req.text)
795 5
    package = root.find('./data/content')
796 5
    bundlelist = [child.attrib["version"] for child in package]
797 5
    return bundlelist
798
799
800 5
@pem_wrapper
801 5
def ptcrb_scraper(ptcrbid, session=None):
802
    """
803
    Get the PTCRB results for a given device.
804
805
    :param ptcrbid: Numerical ID from PTCRB (end of URL).
806
    :type ptcrbid: str
807
808
    :param session: Requests session object, default is created on the fly.
809
    :type session: requests.Session()
810
    """
811 5
    baseurl = "https://ptcrb.com/vendor/complete/view_complete_request_guest.cfm?modelid={0}".format(
812
        ptcrbid)
813 5
    sess = generic_session(session)
814 5
    sess.headers.update({"Referer": "https://ptcrb.com/vendor/complete/complete_request.cfm"})
815 5
    soup = generic_soup_parser(baseurl, sess)
816 5
    text = soup.get_text()
817 5
    text = text.replace("\r\n", " ")
818 5
    prelimlist = re.findall("OS .+[^\\n]", text, re.IGNORECASE)
819 5
    if not prelimlist:  # Priv
820 5
        prelimlist = re.findall(r"[A-Z]{3}[0-9]{3}[\s]", text)
821 5
    cleanlist = []
822 5
    for item in prelimlist:
823 5
        if not item.endswith("\r\n"):  # they should hire QC people...
824 5
            cleanlist.append(ptcrb_item_cleaner(item))
825 5
    return cleanlist
826
827
828 5
def space_pad(instring, minlength):
829
    """
830
    Pad a string with spaces until it's the minimum length.
831
832
    :param instring: String to pad.
833
    :type instring: str
834
835
    :param minlength: Pad while len(instring) < minlength.
836
    :type minlength: int
837
    """
838 5
    while len(instring) < minlength:
839 5
        instring += " "
840 5
    return instring
841
842
843 5
def ptcrb_item_cleaner(item):
844
    """
845
    Cleanup poorly formatted PTCRB entries written by an intern.
846
847
    :param item: The item to clean.
848
    :type item: str
849
    """
850 5
    item = item.replace("<td>", "")
851 5
    item = item.replace("</td>", "")
852 5
    item = item.replace("\n", "")
853 5
    item = item.replace(" (SR", ", SR")
854 5
    item = re.sub(r"\s?\((.*)$", "", item)
855 5
    item = re.sub(r"\sSV.*$", "", item)
856 5
    item = item.replace(")", "")
857 5
    item = item.replace(". ", ".")
858 5
    item = item.replace(";", "")
859 5
    item = item.replace("version", "Version")
860 5
    item = item.replace("Verison", "Version")
861 5
    if item.count("OS") > 1:
862 5
        templist = item.split("OS")
863 5
        templist[0] = "OS"
864 5
        item = "".join([templist[0], templist[1]])
865 5
    item = item.replace("SR", "SW Release")
866 5
    item = item.replace(" Version:", ":")
867 5
    item = item.replace("Version ", " ")
868 5
    item = item.replace(":1", ": 1")
869 5
    item = item.replace(", ", " ")
870 5
    item = item.replace("Software", "SW")
871 5
    item = item.replace("  ", " ")
872 5
    item = item.replace("OS ", "OS: ")
873 5
    item = item.replace("Radio ", "Radio: ")
874 5
    item = item.replace("Release ", "Release: ")
875 5
    spaclist = item.split(" ")
876 5
    if len(spaclist) > 1:
877 5
        spaclist[1] = space_pad(spaclist[1], 11)
878 5
        spaclist[3] = space_pad(spaclist[3], 11)
879
    else:
880 5
        spaclist.insert(0, "OS:")
881 5
    item = " ".join(spaclist)
882 5
    item = item.strip()
883 5
    return item
884
885
886 5
@pem_wrapper
887 5
def kernel_scraper(utils=False, session=None):
888
    """
889
    Scrape BlackBerry's GitHub kernel repo for available branches.
890
891
    :param utils: Check android-utils repo instead of android-linux-kernel. Default is False.
892
    :type utils: bool
893
894
    :param session: Requests session object, default is created on the fly.
895
    :type session: requests.Session()
896
    """
897 5
    repo = "android-utils" if utils else "android-linux-kernel"
898 5
    kernlist = []
899 5
    sess = generic_session(session)
900 5
    for page in range(1, 10):
901 5
        url = "https://github.com/blackberry/{0}/branches/all?page={1}".format(repo, page)
902 5
        soup = generic_soup_parser(url, sess)
903 5
        if soup.find("div", {"class": "no-results-message"}):
904 5
            break
905
        else:
906 5
            text = soup.get_text()
907 5
            kernlist.extend(re.findall(r"msm[0-9]{4}\/[A-Z0-9]{6}", text, re.IGNORECASE))
908 5
    return kernlist
909
910
911 5
def root_generator(folder, build, variant="common"):
912
    """
913
    Generate roots for the SHAxxx hash lookup URLs.
914
915
    :param folder: Dictionary of variant: loader name pairs.
916
    :type folder: dict(str: str)
917
918
    :param build: Build to check, 3 letters + 3 numbers.
919
    :type build: str
920
921
    :param variant: Autoloader variant. Default is "common".
922
    :type variant: str
923
    """
924
    #Priv specific
925 5
    privx = "bbfoundation/hashfiles_priv/{0}".format(folder[variant])
926
    #DTEK50 specific
927 5
    dtek50x = "bbSupport/DTEK50" if build[:3] == "AAF" else "bbfoundation/hashfiles_priv/dtek50"
928
    #DTEK60 specific
929 5
    dtek60x = dtek50x  # still uses dtek50 folder, for some reason
930
    #Pack it up
931 5
    roots = {"Priv": privx, "DTEK50": dtek50x, "DTEK60": dtek60x}
932 5
    return roots
933
934
935 5
def make_droid_skeleton_bbm(method, build, device, variant="common"):
936
    """
937
    Make an Android autoloader/hash URL, on the BB Mobile site.
938
939
    :param method: None for regular OS links, "sha256/512" for SHA256 or 512 hash.
940
    :type method: str
941
942
    :param build: Build to check, 3 letters + 3 numbers.
943
    :type build: str
944
945
    :param device: Device to check.
946
    :type device: str
947
948
    :param variant: Autoloader variant. Default is "common".
949
    :type variant: str
950
    """
951 5
    devices = {"KEYone": "qc8953"}
952 5
    base = "bbry_{2}_autoloader_user-{0}-{1}".format(variant, build.upper(), devices[device])
953 5
    if method is None:
954 5
        skel = "http://54.247.87.13/softwareupgrade/BBM/{0}.zip".format(base)
955
    else:
956 5
        skel = "http://54.247.87.13/softwareupgrade/BBM/{0}.{1}sum".format(base, method.lower())
957 5
    return skel
958
959
960 5
def make_droid_skeleton_og(method, build, device, variant="common"):
961
    """
962
    Make an Android autoloader/hash URL, on the original site.
963
964
    :param method: None for regular OS links, "sha256/512" for SHA256 or 512 hash.
965
    :type method: str
966
967
    :param build: Build to check, 3 letters + 3 numbers.
968
    :type build: str
969
970
    :param device: Device to check.
971
    :type device: str
972
973
    :param variant: Autoloader variant. Default is "common".
974
    :type variant: str
975
    """
976 5
    folder = {"vzw-vzw": "verizon", "na-att": "att", "na-tmo": "tmo", "common": "default"}
977 5
    devices = {"Priv": "qc8992", "DTEK50": "qc8952_64_sfi", "DTEK60": "qc8996"}
978 5
    roots = root_generator(folder, build, variant)
979 5
    base = "bbry_{2}_autoloader_user-{0}-{1}".format(variant, build.upper(), devices[device])
980 5
    if method is None:
981 5
        skel = "https://bbapps.download.blackberry.com/Priv/{0}.zip".format(base)
982
    else:
983 5
        skel = "https://ca.blackberry.com/content/dam/{1}/{0}.{2}sum".format(base, roots[device], method.lower())
984 5
    return skel
985
986
987 5
def make_droid_skeleton(method, build, device, variant="common"):
988
    """
989
    Make an Android autoloader/hash URL.
990
991
    :param method: None for regular OS links, "sha256/512" for SHA256 or 512 hash.
992
    :type method: str
993
994
    :param build: Build to check, 3 letters + 3 numbers.
995
    :type build: str
996
997
    :param device: Device to check.
998
    :type device: str
999
1000
    :param variant: Autoloader variant. Default is "common".
1001
    :type variant: str
1002
    """
1003
    # No Aurora
1004 5
    oglist = ("Priv", "DTEK50", "DTEK60")  # BlackBerry
1005 5
    bbmlist = ("KEYone")   # BB Mobile
1006 5
    if device in oglist:
1007 5
        skel = make_droid_skeleton_og(method, build, device, variant)
1008 5
    elif device in bbmlist:
1009 5
        skel = make_droid_skeleton_bbm(method, build, device, variant)
1010 5
    return skel
1011
1012
1013 5
def bulk_droid_skeletons(devs, build, method=None):
1014
    """
1015
    Prepare list of Android autoloader/hash URLs.
1016
1017
    :param devs: List of devices.
1018
    :type devs: list(str)
1019
1020
    :param build: Build to check, 3 letters + 3 numbers.
1021
    :type build: str
1022
1023
    :param method: None for regular OS links, "sha256/512" for SHA256 or 512 hash.
1024
    :type method: str
1025
    """
1026 5
    carrier_variants = {
1027
        "Priv": ("common", "vzw-vzw", "na-tmo", "na-att"),
1028
        "KEYone": ("common", "usa-vzw", "usa-sprint", "global-att")  # verify this
1029
    }
1030 5
    common_variants = ("common", )  # for single-variant devices
1031 5
    carrier_devices = ("Priv", )  # add KEYone when verified
1032 5
    skels = []
1033 5
    for dev in devs:
1034 5
        varlist = carrier_variants[dev] if dev in carrier_devices else common_variants
1035 5
        for var in varlist:
1036 5
            skel = make_droid_skeleton(method, build, dev, var)
1037 5
            skels.append(skel)
1038 5
    return skels
1039
1040
1041 5
def prepare_droid_list(device):
1042
    """
1043
    Convert single devices to a list, if necessary.
1044
1045
    :param device: Device to check.
1046
    :type device: str
1047
    """
1048 5
    if isinstance(device, list):
1049 5
        devs = device
1050
    else:
1051 5
        devs = [device]
1052 5
    return devs
1053
1054
1055 5
def droid_scanner(build, device, method=None, session=None):
1056
    """
1057
    Check for Android autoloaders on BlackBerry's site.
1058
1059
    :param build: Build to check, 3 letters + 3 numbers.
1060
    :type build: str
1061
1062
    :param device: Device to check.
1063
    :type device: str
1064
1065
    :param method: None for regular OS links, "sha256/512" for SHA256 or 512 hash.
1066
    :type method: str
1067
1068
    :param session: Requests session object, default is created on the fly.
1069
    :type session: requests.Session()
1070
    """
1071 5
    devs = prepare_droid_list(device)
1072 5
    skels = bulk_droid_skeletons(devs, build, method)
1073 5
    with concurrent.futures.ThreadPoolExecutor(max_workers=len(skels)) as xec:
1074 5
        results = []
1075 5
        for skel in skels:
1076 5
            avail = xec.submit(availability, skel, session)
1077 5
            if avail.result():
1078 5
                results.append(skel)
1079 5
    return results if results else None
1080
1081
1082 5
def chunker(iterable, inc):
1083
    """
1084
    Convert an iterable into a list of inc sized lists.
1085
1086
    :param iterable: Iterable to chunk.
1087
    :type iterable: list/tuple/string
1088
1089
    :param inc: Increment; how big each chunk is.
1090
    :type inc: int
1091
    """
1092 5
    chunks = [iterable[x:x+inc] for x in range(0, len(iterable), inc)]
1093 5
    return chunks
1094
1095
1096 5
def unicode_filter(intext):
1097
    """
1098
    Remove Unicode crap.
1099
1100
    :param intext: Text to filter.
1101
    :type intext: str
1102
    """
1103 5
    return intext.replace("\u2013", "").strip()
1104
1105
1106 5
def table_header_filter(ptag):
1107
    """
1108
    Validate p tag, to see if it's relevant.
1109
1110
    :param ptag: P tag.
1111
    :type ptag: bs4.element.Tag
1112
    """
1113 5
    valid = ptag.find("b") and "BlackBerry" in ptag.text and not "experts" in ptag.text
1114 5
    return valid
1115
1116
1117 5
def table_headers(pees):
1118
    """
1119
    Generate table headers from list of p tags.
1120
1121
    :param pees: List of p tags.
1122
    :type pees: list(bs4.element.Tag)
1123
    """
1124 5
    bolds = [x.text for x in pees if table_header_filter(x)]
1125 5
    return bolds
1126
1127
1128 5
@pem_wrapper
1129 5
def loader_page_scraper(session=None):
1130
    """
1131
    Return scraped autoloader pages.
1132
1133
    :param session: Requests session object, default is created on the fly.
1134
    :type session: requests.Session()
1135
    """
1136 5
    session = generic_session(session)
1137 5
    loader_page_scraper_og(session)
1138 5
    loader_page_scraper_bbm(session)
1139
1140
1141 5
def loader_page_scraper_og(session=None):
1142
    """
1143
    Return scraped autoloader page, original site.
1144
1145
    :param session: Requests session object, default is created on the fly.
1146
    :type session: requests.Session()
1147
    """
1148 5
    url = "https://ca.blackberry.com/support/smartphones/Android-OS-Reload.html"
1149 5
    soup = generic_soup_parser(url, session)
1150 5
    tables = soup.find_all("table")
1151 5
    headers = table_headers(soup.find_all("p"))
1152 5
    for idx, table in enumerate(tables):
1153 5
        loader_page_chunker_og(idx, table, headers)
1154
1155
1156 5
def loader_page_scraper_bbm(session=None):
1157
    """
1158
    Return scraped autoloader page, new site.
1159
1160
    :param session: Requests session object, default is created on the fly.
1161
    :type session: requests.Session()
1162
    """
1163 5
    url = "https://www.blackberrymobile.com/support/reload-software/"
1164 5
    soup = generic_soup_parser(url, session)
1165 5
    ulls = soup.find_all("ul", {"class": re.compile("list-two special-.")})[1:]
1166 5
    print("~~~BlackBerry KEYone~~~")
1167 5
    for ull in ulls:
1168 5
        loader_page_chunker_bbm(ull)
1169
1170
1171 5
def loader_page_chunker_og(idx, table, headers):
1172
    """
1173
    Given a loader page table, chunk it into lists of table cells.
1174
1175
    :param idx: Index of enumerating tables.
1176
    :type idx: int
1177
1178
    :param table: HTML table tag.
1179
    :type table: bs4.element.Tag
1180
1181
    :param headers: List of table headers.
1182
    :type headers: list(str)
1183
    """
1184 5
    print("~~~{0}~~~".format(headers[idx]))
1185 5
    chunks = chunker(table.find_all("td"), 4)
1186 5
    for chunk in chunks:
1187 5
        loader_page_printer(chunk)
1188 5
    print(" ")
1189
1190
1191 5
def loader_page_chunker_bbm(ull):
1192
    """
1193
    Given a loader page list, chunk it into lists of list items.
1194
1195
    :param ull: HTML unordered list tag.
1196
    :type ull: bs4.element.Tag
1197
    """
1198 5
    chunks = chunker(ull.find_all("li"), 3)
1199 5
    for chunk in chunks:
1200 5
        loader_page_printer(chunk)
1201
1202
1203 5
def loader_page_printer(chunk):
1204
    """
1205
    Print individual cell texts given a list of table cells.
1206
1207
    :param chunk: List of td tags.
1208
    :type chunk: list(bs4.element.Tag)
1209
    """
1210 5
    key = unicode_filter(chunk[0].text)
1211 5
    ver = unicode_filter(chunk[1].text)
1212 5
    link = unicode_filter(chunk[2].find("a")["href"])
1213 5
    print("{0}\n    {1}: {2}".format(key, ver, link))
1214
1215
1216 5
@pem_wrapper
1217 5
def base_metadata(url, session=None):
1218
    """
1219
    Get BBNDK metadata, base function.
1220
1221
    :param url: URL to check.
1222
    :type url: str
1223
1224
    :param session: Requests session object, default is created on the fly.
1225
    :type session: requests.Session()
1226
    """
1227 5
    session = generic_session(session)
1228 5
    req = session.get(url)
1229 5
    data = req.content
1230 5
    entries = data.split(b"\n")
1231 5
    metadata = [entry.split(b",")[1].decode("utf-8") for entry in entries if entry]
1232 5
    return metadata
1233
1234
1235 5
def ndk_metadata(session=None):
1236
    """
1237
    Get BBNDK target metadata.
1238
1239
    :param session: Requests session object, default is created on the fly.
1240
    :type session: requests.Session()
1241
    """
1242 5
    data = base_metadata("http://downloads.blackberry.com/upr/developers/update/bbndk/metadata", session)
1243 5
    metadata = [entry for entry in data if entry.startswith(("10.0", "10.1", "10.2"))]
1244 5
    return metadata
1245
1246
1247 5
def sim_metadata(session=None):
1248
    """
1249
    Get BBNDK simulator metadata.
1250
1251
    :param session: Requests session object, default is created on the fly.
1252
    :type session: requests.Session()
1253
    """
1254 5
    metadata = base_metadata("http://downloads.blackberry.com/upr/developers/update/bbndk/simulator/simulator_metadata", session)
1255 5
    return metadata
1256
1257
1258 5
def runtime_metadata(session=None):
1259
    """
1260
    Get BBNDK runtime metadata.
1261
1262
    :param session: Requests session object, default is created on the fly.
1263
    :type session: requests.Session()
1264
    """
1265 5
    metadata = base_metadata("http://downloads.blackberry.com/upr/developers/update/bbndk/runtime/runtime_metadata", session)
1266 5
    return metadata
1267
1268
1269 5
def series_generator(osversion):
1270
    """
1271
    Generate series/branch name from OS version.
1272
1273
    :param osversion: OS version.
1274
    :type osversion: str
1275
    """
1276 5
    splits = osversion.split(".")
1277 5
    return "BB{0}_{1}_{2}".format(*splits[0:3])
1278
1279
1280 5
@pem_wrapper
1281 5
def devalpha_urls(osversion, skel, session=None):
1282
    """
1283
    Check individual Dev Alpha autoloader URLs.
1284
1285
    :param osversion: OS version.
1286
    :type osversion: str
1287
1288
    :param skel: Individual skeleton format to try.
1289
    :type skel: str
1290
1291
    :param session: Requests session object, default is created on the fly.
1292
    :type session: requests.Session()
1293
    """
1294 5
    session = generic_session(session)
1295 5
    url = "http://downloads.blackberry.com/upr/developers/downloads/{0}{1}.exe".format(skel, osversion)
1296 5
    req = session.head(url)
1297 5
    if req.status_code == 200:
1298 5
        finals = (url, req.headers["content-length"])
1299
    else:
1300 5
        finals = ()
1301 5
    return finals
1302
1303
1304 5
def devalpha_urls_serieshandler(osversion, skeletons):
1305
    """
1306
    Process list of candidate Dev Alpha autoloader URLs.
1307
1308
    :param osversion: OS version.
1309
    :type osversion: str
1310
1311
    :param skeletons: List of skeleton formats to try.
1312
    :type skeletons: list
1313
    """
1314 5
    skels = skeletons
1315 5
    for idx, skel in enumerate(skeletons):
1316 5
        if "<SERIES>" in skel:
1317 5
            skels[idx] = skel.replace("<SERIES>", series_generator(osversion))
1318 5
    return skels
1319
1320
1321 5
def devalpha_urls_bulk(osversion, skeletons, xec, session=None):
1322
    """
1323
    Construct list of valid Dev Alpha autoloader URLs.
1324
1325
    :param osversion: OS version.
1326
    :type osversion: str
1327
1328
    :param skeletons: List of skeleton formats to try.
1329
    :type skeletons: list
1330
1331
    :param xec: ThreadPoolExecutor instance.
1332
    :type xec: concurrent.futures.ThreadPoolExecutor
1333
1334
    :param session: Requests session object, default is created on the fly.
1335
    :type session: requests.Session()
1336
    """
1337 5
    finals = {}
1338 5
    skels = devalpha_urls_serieshandler(osversion, skeletons)
1339 5
    for skel in skels:
1340 5
        final = xec.submit(devalpha_urls, osversion, skel, session).result()
1341 5
        if final:
1342 5
            finals[final[0]] = final[1]
1343 5
    return finals
1344
1345
1346 5
def devalpha_urls_bootstrap(osversion, skeletons, session=None):
1347
    """
1348
    Get list of valid Dev Alpha autoloader URLs.
1349
1350
    :param osversion: OS version.
1351
    :type osversion: str
1352
1353
    :param skeletons: List of skeleton formats to try.
1354
    :type skeletons: list
1355
1356
    :param session: Requests session object, default is created on the fly.
1357
    :type session: requests.Session()
1358
    """
1359 5
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as xec:
1360 5
        try:
1361 5
            return devalpha_urls_bulk(osversion, skeletons, xec, session)
1362 5
        except KeyboardInterrupt:
1363 5
            xec.shutdown(wait=False)
1364
1365
1366 5
def dev_dupe_dicter(finals):
1367
    """
1368
    Prepare dictionary to clean duplicate autoloaders.
1369
1370
    :param finals: Dict of URL:content-length pairs.
1371
    :type finals: dict(str: str)
1372
    """
1373 5
    revo = {}
1374 5
    for key, val in finals.items():
1375 5
        revo.setdefault(val, set()).add(key)
1376 5
    return revo
1377
1378
1379 5
def dev_dupe_remover(finals, dupelist):
1380
    """
1381
    Filter dictionary of autoloader entries.
1382
1383
    :param finals: Dict of URL:content-length pairs.
1384
    :type finals: dict(str: str)
1385
1386
    :param dupelist: List of duplicate URLs.
1387
    :type duplist: list(str)
1388
    """
1389 5
    for dupe in dupelist:
1390 5
        for entry in dupe:
1391 5
            if "DevAlpha" in entry:
1392 5
                del finals[entry]
1393 5
    return finals
1394
1395
1396 5
def dev_dupe_cleaner(finals):
1397
    """
1398
    Clean duplicate autoloader entries.
1399
1400
    :param finals: Dict of URL:content-length pairs.
1401
    :type finals: dict(str: str)
1402
    """
1403 5
    revo = dev_dupe_dicter(finals)
1404 5
    dupelist = [val for key, val in revo.items() if len(val) > 1]
1405 5
    finals = dev_dupe_remover(finals, dupelist)
1406
    return finals
1407