Passed
Push — master ( 57fbcd...e67727 )
by John
02:40
created

bbarchivist.networkutils.download_request_prep()   B

Complexity

Conditions 2

Size

Total Lines 37
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 7
CRAP Score 2

Importance

Changes 0
Metric Value
cc 2
eloc 7
nop 9
dl 0
loc 37
ccs 7
cts 7
cp 1
crap 2
rs 8.8571
c 0
b 0
f 0

How to fix   Many Parameters   

Many Parameters

Methods with many parameters are not only hard to understand, but their parameters also often become inconsistent when you need more, or different data.

There are several approaches to avoid long parameter lists:

1
#!/usr/bin/env python3
0 ignored issues
show
coding-style introduced by
Too many lines in module (1415/1000)
Loading history...
2 5
"""This module is used for network connections; APIs, downloading, etc."""
3
4 5
import base64  # encoding
5 5
import binascii  # encoding
6 5
import concurrent.futures  # multiprocessing/threading
7 5
import glob  # pem file lookup
8 5
import hashlib  # salt
9 5
import os  # filesystem read
10 5
import random  # salt
11 5
import re  # regexes
12 5
import time  # salt
13 5
import zlib  # encoding
14
15 5
import requests  # downloading
16 5
from bs4 import BeautifulSoup  # scraping
17 5
from bbarchivist import utilities  # parse filesize
18 5
from bbarchivist import xmlutils  # xml work
19 5
from bbarchivist.bbconstants import SERVERS, TCLMASTERS  # lookup servers
20
21 5
__author__ = "Thurask"
22 5
__license__ = "WTFPL v2"
23 5
__copyright__ = "2015-2018 Thurask"
24
25
26 5
def grab_pem():
27
    """
28
    Work with either local cacerts or system cacerts.
29
    """
30 5
    try:
31 5
        pemfile = glob.glob(os.path.join(os.getcwd(), "cacert.pem"))[0]
32 5
    except IndexError:
33 5
        return requests.certs.where()  # no local cacerts
34
    else:
35 5
        return os.path.abspath(pemfile)  # local cacerts
36
37
38 5
def pem_wrapper(method):
39
    """
40
    Decorator to set REQUESTS_CA_BUNDLE.
41
42
    :param method: Method to use.
43
    :type method: function
44
    """
45 5
    def wrapper(*args, **kwargs):
46
        """
47
        Set REQUESTS_CA_BUNDLE before doing function.
48
        """
49 5
        os.environ["REQUESTS_CA_BUNDLE"] = grab_pem()
50 5
        return method(*args, **kwargs)
51 5
    return wrapper
52
53
54 5
def try_try_again(method):
55
    """
56
    Decorator to absorb timeouts, proxy errors, and other common exceptions.
57
58
    :param method: Method to use.
59
    :type method: function
60
    """
61 5
    def wrapper(*args, **kwargs):
62
        """
63
        Try function, try it again up to five times, and leave gracefully.
64
        """
65 5
        tries = 5
66 5
        for _ in range(tries):
67 5
            try:
68 5
                result = method(*args, **kwargs)
69 5
            except (requests.exceptions.Timeout, requests.exceptions.ConnectionError, requests.exceptions.ProxyError):
70 5
                continue
71
            else:
72 5
                break
73
        else:
74 5
            result = None
75 5
        return result
76 5
    return wrapper
77
78
79 5
def generic_session(session=None):
80
    """
81
    Create a Requests session object on the fly, if need be.
82
83
    :param session: Requests session object, created if this is None.
84
    :type session: requests.Session()
85
    """
86 5
    sess = requests.Session() if session is None else session
87 5
    return sess
88
89
90 5
def generic_soup_parser(url, session=None):
91
    """
92
    Get a BeautifulSoup HTML parser for some URL.
93
94
    :param url: The URL to check.
95
    :type url: str
96
97
    :param session: Requests session object, default is created on the fly.
98
    :type session: requests.Session()
99
    """
100 5
    session = generic_session(session)
101 5
    req = session.get(url)
102 5
    soup = BeautifulSoup(req.content, "html.parser")
103 5
    return soup
104
105
106 5
@pem_wrapper
107 5
def get_length(url, session=None):
108
    """
109
    Get content-length header from some URL.
110
111
    :param url: The URL to check.
112
    :type url: str
113
114
    :param session: Requests session object, default is created on the fly.
115
    :type session: requests.Session()
116
    """
117 5
    session = generic_session(session)
118 5
    if url is None:
119 5
        return 0
120 5
    try:
121 5
        heads = session.head(url)
122 5
        fsize = heads.headers['content-length']
123 5
        return int(fsize)
124 5
    except requests.ConnectionError:
125 5
        return 0
126
127
128 5
@pem_wrapper
129 5
def download(url, output_directory=None, session=None):
130
    """
131
    Download file from given URL.
132
133
    :param url: URL to download from.
134
    :type url: str
135
136
    :param output_directory: Download folder. Default is local.
137
    :type output_directory: str
138
139
    :param session: Requests session object, default is created on the fly.
140
    :type session: requests.Session()
141
    """
142 5
    session = generic_session(session)
143 5
    output_directory = utilities.dirhandler(output_directory, os.getcwd())
144 5
    lfname = url.split('/')[-1]
145 5
    sname = utilities.stripper(lfname)
146 5
    fname = os.path.join(output_directory, lfname)
147 5
    download_writer(url, fname, lfname, sname, session)
148 5
    remove_empty_download(fname)
149
150
151 5
def remove_empty_download(fname):
152
    """
153
    Remove file if it's empty.
154
155
    :param fname: File path.
156
    :type fname: str
157
    """
158 5
    if os.stat(fname).st_size == 0:
159 5
        os.remove(fname)
160
161
162 5
def download_writer(url, fname, lfname, sname, session=None):
163
    """
164
    Download file and write to disk.
165
166
    :param url: URL to download from.
167
    :type url: str
168
169
    :param fname: File path.
170
    :type fname: str
171
172
    :param lfname: Long filename.
173
    :type lfname: str
174
175
    :param sname: Short name, for printing to screen.
176
    :type sname: str
177
178
    :param session: Requests session object, default is created on the fly.
179
    :type session: requests.Session()
180
    """
181 5
    with open(fname, "wb") as file:
182 5
        req = session.get(url, stream=True)
183 5
        clength = req.headers['content-length']
184 5
        fsize = utilities.fsizer(clength)
185 5
        if req.status_code == 200:  # 200 OK
186 5
            print("DOWNLOADING {0} [{1}]".format(sname, fsize))
187 5
            for chunk in req.iter_content(chunk_size=1024):
188 5
                file.write(chunk)
189
        else:
190 5
            print("ERROR: HTTP {0} IN {1}".format(req.status_code, lfname))
191
192
193 5
def download_bootstrap(urls, outdir=None, workers=5, session=None):
194
    """
195
    Run downloaders for each file in given URL iterable.
196
197
    :param urls: URLs to download.
198
    :type urls: list
199
200
    :param outdir: Download folder. Default is handled in :func:`download`.
201
    :type outdir: str
202
203
    :param workers: Number of worker processes. Default is 5.
204
    :type workers: int
205
206
    :param session: Requests session object, default is created on the fly.
207
    :type session: requests.Session()
208
    """
209 5
    workers = len(urls) if len(urls) < workers else workers
210 5
    spinman = utilities.SpinManager()
211 5
    with concurrent.futures.ThreadPoolExecutor(max_workers=workers) as xec:
212 5
        try:
213 5
            spinman.start()
214 5
            for url in urls:
215 5
                xec.submit(download, url, outdir, session)
216 5
        except (KeyboardInterrupt, SystemExit):
217 5
            xec.shutdown()
218 5
            spinman.stop()
219 5
    spinman.stop()
220 5
    utilities.spinner_clear()
221 5
    utilities.line_begin()
222
223
224 5
def download_android_tools(downloaddir=None):
225
    """
226
    Download Android SDK platform tools.
227
228
    :param downloaddir: Directory name, default is "plattools".
229
    :type downloaddir: str
230
    """
231 5
    if downloaddir is None:
232 5
        downloaddir = "plattools"
233 5
    if os.path.exists(downloaddir):
234 5
        os.removedirs(downloaddir)
235 5
    os.mkdir(downloaddir)
236 5
    platforms = ("windows", "linux", "darwin")
237 5
    baseurl = "https://dl.google.com/android/repository/platform-tools-latest"
238 5
    dlurls = ["{1}-{0}.zip".format(plat, baseurl) for plat in platforms]
239 5
    sess = generic_session()
240 5
    download_bootstrap(dlurls, outdir="plattools", session=sess)
241
242
243 5
@pem_wrapper
244 5
def getcode(url, session=None):
245
    """
246
    Return status code of given URL.
247
248
    :param url: URL to check.
249
    :type url: str
250
251
    :param session: Requests session object, default is created on the fly.
252
    :type session: requests.Session()
253
    """
254 5
    session = generic_session(session)
255 5
    try:
256 5
        shead = session.head(url)
257 5
        status = int(shead.status_code)
258 5
        return status
259 5
    except requests.ConnectionError:
260 5
        return 404
261
262
263 5
@pem_wrapper
264 5
def availability(url, session=None):
265
    """
266
    Check HTTP status code of given URL.
267
    200 or 301-308 is OK, else is not.
268
269
    :param url: URL to check.
270
    :type url: str
271
272
    :param session: Requests session object, default is created on the fly.
273
    :type session: requests.Session()
274
    """
275 5
    status = getcode(url, session)
276 5
    return status == 200 or 300 < status <= 308
277
278
279 5
def clean_availability(results, server):
280
    """
281
    Clean availability for autolookup script.
282
283
    :param results: Result dict.
284
    :type results: dict(str: str)
285
286
    :param server: Server, key for result dict.
287
    :type server: str
288
    """
289 5
    marker = "PD" if server == "p" else server.upper()
290 5
    rel = results[server.lower()]
291 5
    avail = marker if rel != "SR not in system" and rel is not None else "  "
292 5
    return rel, avail
293
294
295 5
def tcl_master():
296
    """
297
    Get a random master server.
298
    """
299 5
    return random.choice(TCLMASTERS)
300
301
302 5
def tcl_default_id(devid):
303
    """
304
    Get an IMEI or a serial number or something.
305
306
    :param devid: Return default if this is None.
307
    :type devid: str
308
    """
309 5
    if devid is None:
310 5
        devid = "543212345000000"
311 5
    return devid
312
313
314 5
def check_prep(curef, mode=4, fvver="AAA000", cltp=2010, cktp=2, rtd=1, chnl=2, devid=None):
315
    """
316
    Prepare variables for TCL update check.
317
318
    :param curef: PRD of the phone variant to check.
319
    :type curef: str
320
321
    :param mode: 4 if downloading autoloaders, 2 if downloading OTA deltas.
322
    :type mode: int
323
324
    :param fvver: Initial software version, must be specific if downloading OTA deltas.
325
    :type fvver: str
326
327
    :param cltp: 2010 to always show latest version, 10 to show actual updates. Default is 2010.
328
    :type cltp: int
329
330
    :param cktp: 2 if checking manually, 1 if checking automatically. Default is 2.
331
    :type cktp: int
332
333
    :param rtd: 2 if rooted, 1 if not. Default is 1.
334
    :type rtd: int
335
336
    :param chnl: 2 if checking on WiFi, 1 if checking on mobile. Default is 2.
337
    :type chnl: int
338
339
    :param devid: Serial number/IMEI. Default is fake, not that it matters.
340
    :type devid: str
341
    """
342 5
    devid = tcl_default_id(devid)
343 5
    geturl = "http://{0}/check.php".format(tcl_master())
344 5
    params = {"id": devid, "curef": curef, "fv": fvver, "mode": mode, "type": "Firmware", "cltp": cltp, "cktp": cktp, "rtd": rtd, "chnl": chnl}
345 5
    return geturl, params
346
347
348 5
@pem_wrapper
349 5
@try_try_again
350 5
def tcl_check(curef, session=None, mode=4, fvver="AAA000", export=False):
351
    """
352
    Check TCL server for updates.
353
354
    :param curef: PRD of the phone variant to check.
355
    :type curef: str
356
357
    :param session: Requests session object, default is created on the fly.
358
    :type session: requests.Session()
359
360
    :param mode: 4 if downloading autoloaders, 2 if downloading OTA deltas.
361
    :type mode: int
362
363
    :param fvver: Initial software version, must be specific if downloading OTA deltas.
364
    :type fvver: str
365
366
    :param export: Whether to export XML response to file. Default is False.
367
    :type export: bool
368
    """
369 5
    sess = generic_session(session)
370 5
    geturl, params = check_prep(curef, mode, fvver)
371 5
    req = sess.get(geturl, params=params)
372 5
    if req.status_code == 200:
373 5
        req.encoding = "utf-8"
374 5
        response = req.text
375 5
        if export:
376 5
            salt = tcl_salt()
377 5
            xmlutils.dump_tcl_xml(response, salt)
378
    else:
379 5
        response = None
380 5
    return response
381
382
383 5
def tcl_salt():
384
    """
385
    Generate salt value for TCL server tools.
386
    """
387 5
    millis = round(time.time() * 1000)
388 5
    tail = "{0:06d}".format(random.randint(0, 999999))
389 5
    return "{0}{1}".format(str(millis), tail)
390
391
392 5
def unpack_vdkey():
393
    """
394
    Draw the curtain back.
395
    """
396 5
    vdkey = b"eJwdjwEOwDAIAr8kKFr//7HhmqXp8AIIDrYAgg8byiUXrwRJRXja+d6iNxu0AhUooDCN9rd6rDLxmGIakUVWo3IGCTRWqCAt6X4jGEIUAxgN0eYWnp+LkpHQAg/PsO90ELsy0Npm/n2HbtPndFgGEV31R9OmT4O4nrddjc3Qt6nWscx7e+WRHq5UnOudtjw5skuV09pFhvmqnOEIs4ljPeel1wfLYUF4\n"
397 5
    vdk = zlib.decompress(binascii.a2b_base64(vdkey))
398 5
    return vdk.decode("utf-8")
399
400
401 5
def vkhash(curef, tvver, fwid, salt, mode=4, fvver="AAA000", cltp=2010, devid=None):
402
    """
403
    Generate hash from TCL update server variables.
404
405
    :param curef: PRD of the phone variant to check.
406
    :type curef: str
407
408
    :param tvver: Target software version.
409
    :type tvver: str
410
411
    :param fwid: Firmware ID for desired download file.
412
    :type fwid: str
413
414
    :param salt: Salt hash.
415
    :type salt: str
416
417
    :param mode: 4 if downloading autoloaders, 2 if downloading OTA deltas.
418
    :type mode: int
419
420
    :param fvver: Initial software version, must be specific if downloading OTA deltas.
421
    :type fvver: str
422
423
    :param cltp: 2010 to always show latest version, 10 to show actual updates. Default is 2010.
424
    :type cltp: int
425
426
    :param devid: Serial number/IMEI. Default is fake, not that it matters.
427
    :type devid: str
428
    """
429 5
    vdk = unpack_vdkey()
430 5
    devid = tcl_default_id(devid)
431 5
    query = "id={0}&salt={1}&curef={2}&fv={3}&tv={4}&type={5}&fw_id={6}&mode={7}&cltp={8}{9}".format(devid, salt, curef, fvver, tvver, "Firmware", fwid, mode, cltp, vdk)
432 5
    engine = hashlib.sha1()
433 5
    engine.update(bytes(query, "utf-8"))
434 5
    return engine.hexdigest()
435
436
437 5
def download_request_prep(curef, tvver, fwid, salt, vkh, mode=4, fvver="AAA000", cltp=2010, devid=None):
438
    """
439
    Prepare variables for download server check.
440
441
    :param curef: PRD of the phone variant to check.
442
    :type curef: str
443
444
    :param tvver: Target software version.
445
    :type tvver: str
446
447
    :param fwid: Firmware ID for desired download file.
448
    :type fwid: str
449
450
    :param salt: Salt hash.
451
    :type salt: str
452
453
    :param vkh: VDKey-based hash.
454
    :type vkh: str
455
456
    :param mode: 4 if downloading autoloaders, 2 if downloading OTA deltas.
457
    :type mode: int
458
459
    :param fvver: Initial software version, must be specific if downloading OTA deltas.
460
    :type fvver: str
461
462
    :param cltp: 2010 to always show latest version, 10 to show actual updates. Default is 2010.
463
    :type cltp: int
464
465
    :param devid: Serial number/IMEI. Default is fake, not that it matters.
466
    :type devid: str
467
    """
468 5
    devid = tcl_default_id(devid)
469 5
    posturl = "http://{0}/download_request.php".format(tcl_master())
470 5
    params = {"id": devid, "curef": curef, "fv": fvver, "mode": mode, "type": "Firmware", "tv": tvver, "fw_id": fwid, "salt": salt, "vk": vkh, "cltp": cltp}
471 5
    if mode == 4:
472 5
        params["foot"] = 1
473 5
    return posturl, params
474
475
476 5
@pem_wrapper
477 5
@try_try_again
478 5
def tcl_download_request(curef, tvver, fwid, salt, vkh, session=None, mode=4, fvver="AAA000", export=False):
479
    """
480
    Check TCL server for download URLs.
481
482
    :param curef: PRD of the phone variant to check.
483
    :type curef: str
484
485
    :param tvver: Target software version.
486
    :type tvver: str
487
488
    :param fwid: Firmware ID for desired download file.
489
    :type fwid: str
490
491
    :param salt: Salt hash.
492
    :type salt: str
493
494
    :param vkh: VDKey-based hash.
495
    :type vkh: str
496
497
    :param session: Requests session object, default is created on the fly.
498
    :type session: requests.Session()
499
500
    :param mode: 4 if downloading autoloaders, 2 if downloading OTA deltas.
501
    :type mode: int
502
503
    :param fvver: Initial software version, must be specific if downloading OTA deltas.
504
    :type fvver: str
505
506
    :param export: Whether to export XML response to file. Default is False.
507
    :type export: bool
508
    """
509 5
    sess = generic_session(session)
510 5
    posturl, params = download_request_prep(curef, tvver, fwid, salt, vkh, mode, fvver)
511 5
    req = sess.post(posturl, data=params)
512 5
    if req.status_code == 200:
513 5
        req.encoding = "utf-8"
514 5
        response = req.text
515 5
        if export:
516 5
            xmlutils.dump_tcl_xml(response, salt)
517
    else:
518 5
        response = None
519 5
    return response
520
521
522 5
def encrypt_header_prep(address, encslave):
523
    """
524
    Prepare variables for encrypted header check.
525
526
    :param address: File URL minus host.
527
    :type address: str
528
529
    :param encslave: Server hosting header script.
530
    :type encslave: str
531
    """
532 5
    encs = {b"YWNjb3VudA==" : b"emhlbmdodWEuZ2Fv", b"cGFzc3dvcmQ=": b"cWFydUQ0b2s="}
533 5
    params = {base64.b64decode(key): base64.b64decode(val) for key, val in encs.items()}
534 5
    params[b"address"] = bytes(address, "utf-8")
535 5
    posturl = "http://{0}/encrypt_header.php".format(encslave)
536 5
    return posturl, params
537
538
539 5
@pem_wrapper
540 5
def encrypt_header(address, encslave, session=None):
541
    """
542
    Check encrypted header.
543
544
    :param address: File URL minus host.
545
    :type address: str
546
547
    :param encslave: Server hosting header script.
548
    :type encslave: str
549
550
    :param session: Requests session object, default is created on the fly.
551
    :type session: requests.Session()
552
    """
553 5
    sess = generic_session(session)
554 5
    posturl, params = encrypt_header_prep(address, encslave)
555 5
    req = sess.post(posturl, data=params)
556 5
    if req.status_code == 206:  # partial
557 5
        contentlength = int(req.headers["Content-Length"])
558 5
        sentinel = "HEADER FOUND" if contentlength == 4194320 else "NO HEADER FOUND"
559
    else:
560 5
        sentinel = None
561 5
    return sentinel
562
563
564 5
@pem_wrapper
565
def remote_prd_info():
566
    """
567
    Get list of remote OTA versions.
568
    """
569 5
    dburl = "https://tclota.birth-online.de/json_lastupdates.php"
570 5
    req = requests.get(dburl)
571 5
    reqj = req.json()
572 5
    otadict = {val["curef"]: val["last_ota"] for val in reqj.values() if val["last_ota"] is not None}
573 5
    return otadict
574
575
576 5
@pem_wrapper
577 5
def carrier_checker(mcc, mnc, session=None):
578
    """
579
    Query BlackBerry World to map a MCC and a MNC to a country and carrier.
580
581
    :param mcc: Country code.
582
    :type mcc: int
583
584
    :param mnc: Network code.
585
    :type mnc: int
586
587
    :param session: Requests session object, default is created on the fly.
588
    :type session: requests.Session()
589
    """
590 5
    session = generic_session(session)
591 5
    baseurl = "http://appworld.blackberry.com/ClientAPI/checkcarrier"
592 5
    url = "{2}?homemcc={0}&homemnc={1}&devicevendorid=-1&pin=0".format(mcc, mnc, baseurl)
593 5
    user_agent = {'User-agent': 'AppWorld/5.1.0.60'}
594 5
    req = session.get(url, headers=user_agent)
595 5
    country, carrier = xmlutils.cchecker_get_tags(req.text)
596 5
    return country, carrier
597
598
599 5
def return_npc(mcc, mnc):
600
    """
601
    Format MCC and MNC into a NPC.
602
603
    :param mcc: Country code.
604
    :type mcc: int
605
606
    :param mnc: Network code.
607
    :type mnc: int
608
    """
609 5
    return "{0}{1}30".format(str(mcc).zfill(3), str(mnc).zfill(3))
610
611
612 5
@pem_wrapper
613 5
def carrier_query(npc, device, upgrade=False, blitz=False, forced=None, session=None):
614
    """
615
    Query BlackBerry servers, check which update is out for a carrier.
616
617
    :param npc: MCC + MNC (see `func:return_npc`)
618
    :type npc: int
619
620
    :param device: Hexadecimal hardware ID.
621
    :type device: str
622
623
    :param upgrade: Whether to use upgrade files. False by default.
624
    :type upgrade: bool
625
626
    :param blitz: Whether or not to create a blitz package. False by default.
627
    :type blitz: bool
628
629
    :param forced: Force a software release.
630
    :type forced: str
631
632
    :param session: Requests session object, default is created on the fly.
633
    :type session: requests.Session()
634
    """
635 5
    session = generic_session(session)
636 5
    upg = "upgrade" if upgrade else "repair"
637 5
    forced = "latest" if forced is None else forced
638 5
    url = "https://cs.sl.blackberry.com/cse/updateDetails/2.2/"
639 5
    query = xmlutils.prep_carrier_query(npc, device, upg, forced)
640 5
    header = {"Content-Type": "text/xml;charset=UTF-8"}
641 5
    req = session.post(url, headers=header, data=query)
642 5
    return xmlutils.parse_carrier_xml(req.text, blitz)
643
644
645 5
@pem_wrapper
646 5
def sr_lookup(osver, server, session=None):
647
    """
648
    Software release lookup, with choice of server.
649
    :data:`bbarchivist.bbconstants.SERVERLIST` for server list.
650
651
    :param osver: OS version to lookup, 10.x.y.zzzz.
652
    :type osver: str
653
654
    :param server: Server to use.
655
    :type server: str
656
657
    :param session: Requests session object, default is created on the fly.
658
    :type session: requests.Session()
659
    """
660 5
    query = xmlutils.prep_sr_lookup(osver)
661 5
    reqtext = sr_lookup_poster(query, server, session)
662 5
    packtext = xmlutils.parse_sr_lookup(reqtext)
663 5
    return packtext
664
665
666 5
def sr_lookup_poster(query, server, session=None):
667
    """
668
    Post the XML payload for a software release lookup.
669
670
    :param query: XML payload.
671
    :type query: str
672
673
    :param server: Server to use.
674
    :type server: str
675
676
    :param session: Requests session object, default is created on the fly.
677
    :type session: requests.Session()
678
    """
679 5
    session = generic_session(session)
680 5
    header = {"Content-Type": "text/xml;charset=UTF-8"}
681 5
    try:
682 5
        req = session.post(server, headers=header, data=query, timeout=1)
683 5
    except (requests.exceptions.Timeout, requests.exceptions.ConnectionError):
684 5
        reqtext = "SR not in system"
685
    else:
686 5
        reqtext = req.text
687 5
    return reqtext
688
689
690 5
def sr_lookup_bootstrap(osv, session=None, no2=False):
691
    """
692
    Run lookups for each server for given OS.
693
694
    :param osv: OS to check.
695
    :type osv: str
696
697
    :param session: Requests session object, default is created on the fly.
698
    :type session: requests.Session()
699
700
    :param no2: Whether to skip Alpha2/Beta2 servers. Default is false.
701
    :type no2: bool
702
    """
703 5
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as xec:
704 5
        try:
705 5
            results = {
706
                "p": None,
707
                "a1": None,
708
                "a2": None,
709
                "b1": None,
710
                "b2": None
711
            }
712 5
            if no2:
713 5
                del results["a2"]
714 5
                del results["b2"]
715 5
            for key in results:
716 5
                results[key] = xec.submit(sr_lookup, osv, SERVERS[key], session).result()
717 5
            return results
718 5
        except KeyboardInterrupt:
719 5
            xec.shutdown(wait=False)
720
721
722 5
@pem_wrapper
723 5
def available_bundle_lookup(mcc, mnc, device, session=None):
724
    """
725
    Check which software releases were ever released for a carrier.
726
727
    :param mcc: Country code.
728
    :type mcc: int
729
730
    :param mnc: Network code.
731
    :type mnc: int
732
733
    :param device: Hexadecimal hardware ID.
734
    :type device: str
735
736
    :param session: Requests session object, default is created on the fly.
737
    :type session: requests.Session()
738
    """
739 5
    session = generic_session(session)
740 5
    server = "https://cs.sl.blackberry.com/cse/availableBundles/1.0.0/"
741 5
    npc = return_npc(mcc, mnc)
742 5
    query = xmlutils.prep_available_bundle(device, npc)
743 5
    header = {"Content-Type": "text/xml;charset=UTF-8"}
744 5
    req = session.post(server, headers=header, data=query)
745 5
    bundlelist = xmlutils.parse_available_bundle(req.text)
746 5
    return bundlelist
747
748
749 5
@pem_wrapper
750 5
def ptcrb_scraper(ptcrbid, session=None):
751
    """
752
    Get the PTCRB results for a given device.
753
754
    :param ptcrbid: Numerical ID from PTCRB (end of URL).
755
    :type ptcrbid: str
756
757
    :param session: Requests session object, default is created on the fly.
758
    :type session: requests.Session()
759
    """
760 5
    baseurl = "https://www.ptcrb.com/certified-devices/device-details/?model={0}".format(ptcrbid)
761 5
    sess = generic_session(session)
762 5
    useragent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/61.0.3163.79 Safari/537.36"
763 5
    sess.headers.update({"User-agent": useragent})
764 5
    soup = generic_soup_parser(baseurl, sess)
765 5
    certtable = soup.find_all("table")[1]
766 5
    tds = certtable.find_all("td")[1::2]  # every other
767 5
    prelimlist = [tdx.text for tdx in tds]
768 5
    cleanlist = [ptcrb_item_cleaner(item.strip()) for item in prelimlist]
769 5
    return cleanlist
770
771
772 5
def space_pad(instring, minlength):
773
    """
774
    Pad a string with spaces until it's the minimum length.
775
776
    :param instring: String to pad.
777
    :type instring: str
778
779
    :param minlength: Pad while len(instring) < minlength.
780
    :type minlength: int
781
    """
782 5
    while len(instring) < minlength:
783 5
        instring += " "
784 5
    return instring
785
786
787 5
def ptcrb_cleaner_multios(item):
788
    """
789
    Discard multiple entries for "OS".
790
791
    :param item: The item to clean.
792
    :type item: str
793
    """
794 5
    if item.count("OS") > 1:
795 5
        templist = item.split("OS")
796 5
        templist[0] = "OS"
797 5
        item = "".join([templist[0], templist[1]])
798 5
    return item
799
800
801 5
def ptcrb_cleaner_spaces(item):
802
    """
803
    Pad item with spaces to the right length.
804
805
    :param item: The item to clean.
806
    :type item: str
807
    """
808 5
    spaclist = item.split(" ")
809 5
    if len(spaclist) > 1:
810 5
        spaclist[1] = space_pad(spaclist[1], 11)
811 5
    if len(spaclist) > 3:
812 5
        spaclist[3] = space_pad(spaclist[3], 11)
813 5
    item = " ".join(spaclist)
814 5
    return item
815
816
817 5
def ptcrb_item_cleaner(item):
818
    """
819
    Cleanup poorly formatted PTCRB entries written by an intern.
820
821
    :param item: The item to clean.
822
    :type item: str
823
    """
824 5
    item = item.replace("<td>", "")
825 5
    item = item.replace("</td>", "")
826 5
    item = item.replace("\n", "")
827 5
    item = item.replace("SW: OS", "OS")
828 5
    item = item.replace("Software Version: OS", "OS")
829 5
    item = item.replace(" (SR", ", SR")
830 5
    item = re.sub(r"\s?\((.*)$", "", item)
831 5
    item = re.sub(r"\sSV.*$", "", item)
832 5
    item = item.replace(")", "")
833 5
    item = item.replace(". ", ".")
834 5
    item = item.replace(";", "")
835 5
    item = item.replace("version", "Version")
836 5
    item = item.replace("Verison", "Version")
837 5
    item = ptcrb_cleaner_multios(item)
838 5
    item = item.replace("SR10", "SR 10")
839 5
    item = item.replace("SR", "SW Release")
840 5
    item = item.replace(" Version:", ":")
841 5
    item = item.replace("Version ", " ")
842 5
    item = item.replace(":1", ": 1")
843 5
    item = item.replace(", ", " ")
844 5
    item = item.replace(",", " ")
845 5
    item = item.replace("Software", "SW")
846 5
    item = item.replace("  ", " ")
847 5
    item = item.replace("OS ", "OS: ")
848 5
    item = item.replace("Radio ", "Radio: ")
849 5
    item = item.replace("Release ", "Release: ")
850 5
    item = ptcrb_cleaner_spaces(item)
851 5
    item = item.strip()
852 5
    item = item.replace("\r", "")
853 5
    if item.startswith("10"):
854 5
        item = "OS: {0}".format(item)
855 5
    item = item.replace(":    ", ": ")
856 5
    item = item.replace(":   ", ": ")
857 5
    return item
858
859
860 5
@pem_wrapper
861 5
def kernel_scraper(utils=False, session=None):
862
    """
863
    Scrape BlackBerry's GitHub kernel repo for available branches.
864
865
    :param utils: Check android-utils repo instead of android-linux-kernel. Default is False.
866
    :type utils: bool
867
868
    :param session: Requests session object, default is created on the fly.
869
    :type session: requests.Session()
870
    """
871 5
    repo = "android-utils" if utils else "android-linux-kernel"
872 5
    kernlist = []
873 5
    sess = generic_session(session)
874 5
    for page in range(1, 10):
875 5
        url = "https://github.com/blackberry/{0}/branches/all?page={1}".format(repo, page)
876 5
        soup = generic_soup_parser(url, sess)
877 5
        if soup.find("div", {"class": "no-results-message"}):
878 5
            break
879
        else:
880 5
            text = soup.get_text()
881 5
            kernlist.extend(re.findall(r"msm[0-9]{4}\/[A-Z0-9]{6}", text, re.IGNORECASE))
882 5
    return kernlist
883
884
885 5
def root_generator(folder, build, variant="common"):
886
    """
887
    Generate roots for the SHAxxx hash lookup URLs.
888
889
    :param folder: Dictionary of variant: loader name pairs.
890
    :type folder: dict(str: str)
891
892
    :param build: Build to check, 3 letters + 3 numbers.
893
    :type build: str
894
895
    :param variant: Autoloader variant. Default is "common".
896
    :type variant: str
897
    """
898
    #Priv specific
899 5
    privx = "bbfoundation/hashfiles_priv/{0}".format(folder[variant])
900
    #DTEK50 specific
901 5
    dtek50x = "bbSupport/DTEK50" if build[:3] == "AAF" else "bbfoundation/hashfiles_priv/dtek50"
902
    #DTEK60 specific
903 5
    dtek60x = dtek50x  # still uses dtek50 folder, for some reason
904
    #Pack it up
905 5
    roots = {"Priv": privx, "DTEK50": dtek50x, "DTEK60": dtek60x}
906 5
    return roots
907
908
909 5
def make_droid_skeleton_bbm(method, build, device, variant="common"):
910
    """
911
    Make an Android autoloader/hash URL, on the BB Mobile site.
912
913
    :param method: None for regular OS links, "sha256/512" for SHA256 or 512 hash.
914
    :type method: str
915
916
    :param build: Build to check, 3 letters + 3 numbers.
917
    :type build: str
918
919
    :param device: Device to check.
920
    :type device: str
921
922
    :param variant: Autoloader variant. Default is "common".
923
    :type variant: str
924
    """
925 5
    devices = {"KEYone": "qc8953", "Motion": "qc8953"}
926 5
    base = "bbry_{2}_autoloader_user-{0}-{1}".format(variant, build.upper(), devices[device])
927 5
    if method is None:
928 5
        skel = "http://54.247.87.13/softwareupgrade/BBM/{0}.zip".format(base)
929
    else:
930 5
        skel = "http://54.247.87.13/softwareupgrade/BBM/{0}.{1}sum".format(base, method.lower())
931 5
    return skel
932
933
934 5
def make_droid_skeleton_og(method, build, device, variant="common"):
935
    """
936
    Make an Android autoloader/hash URL, on the original site.
937
938
    :param method: None for regular OS links, "sha256/512" for SHA256 or 512 hash.
939
    :type method: str
940
941
    :param build: Build to check, 3 letters + 3 numbers.
942
    :type build: str
943
944
    :param device: Device to check.
945
    :type device: str
946
947
    :param variant: Autoloader variant. Default is "common".
948
    :type variant: str
949
    """
950 5
    folder = {"vzw-vzw": "verizon", "na-att": "att", "na-tmo": "tmo", "common": "default"}
951 5
    devices = {"Priv": "qc8992", "DTEK50": "qc8952_64_sfi", "DTEK60": "qc8996"}
952 5
    roots = root_generator(folder, build, variant)
953 5
    base = "bbry_{2}_autoloader_user-{0}-{1}".format(variant, build.upper(), devices[device])
954 5
    if method is None:
955 5
        baseurl = "https://bbapps.download.blackberry.com/Priv"
956 5
        skel = "{1}/{0}.zip".format(base, baseurl)
957
    else:
958 5
        baseurl = "https://ca.blackberry.com/content/dam"
959 5
        skel = "{3}/{1}/{0}.{2}sum".format(base, roots[device], method.lower(), baseurl)
960 5
    return skel
961
962
963 5
def make_droid_skeleton(method, build, device, variant="common"):
964
    """
965
    Make an Android autoloader/hash URL.
966
967
    :param method: None for regular OS links, "sha256/512" for SHA256 or 512 hash.
968
    :type method: str
969
970
    :param build: Build to check, 3 letters + 3 numbers.
971
    :type build: str
972
973
    :param device: Device to check.
974
    :type device: str
975
976
    :param variant: Autoloader variant. Default is "common".
977
    :type variant: str
978
    """
979
    # No Aurora
980 5
    oglist = ("Priv", "DTEK50", "DTEK60")  # BlackBerry
981 5
    bbmlist = ("KEYone", "Motion")   # BB Mobile
982 5
    if device in oglist:
983 5
        skel = make_droid_skeleton_og(method, build, device, variant)
984 5
    elif device in bbmlist:
985 5
        skel = make_droid_skeleton_bbm(method, build, device, variant)
986 5
    return skel
987
988
989 5
def bulk_droid_skeletons(devs, build, method=None):
990
    """
991
    Prepare list of Android autoloader/hash URLs.
992
993
    :param devs: List of devices.
994
    :type devs: list(str)
995
996
    :param build: Build to check, 3 letters + 3 numbers.
997
    :type build: str
998
999
    :param method: None for regular OS links, "sha256/512" for SHA256 or 512 hash.
1000
    :type method: str
1001
    """
1002 5
    carrier_variants = {
1003
        "Priv": ("common", "vzw-vzw", "na-tmo", "na-att"),
1004
        "KEYone": ("common", "usa-sprint", "global-att", "china-china")
1005
    }
1006 5
    common_variants = ("common", )  # for single-variant devices
1007 5
    carrier_devices = ("Priv", )  # add KEYone when verified
1008 5
    skels = []
1009 5
    for dev in devs:
1010 5
        varlist = carrier_variants[dev] if dev in carrier_devices else common_variants
1011 5
        for var in varlist:
1012 5
            skel = make_droid_skeleton(method, build, dev, var)
1013 5
            skels.append(skel)
1014 5
    return skels
1015
1016
1017 5
def prepare_droid_list(device):
1018
    """
1019
    Convert single devices to a list, if necessary.
1020
1021
    :param device: Device to check.
1022
    :type device: str
1023
    """
1024 5
    if isinstance(device, list):
1025 5
        devs = device
1026
    else:
1027 5
        devs = [device]
1028 5
    return devs
1029
1030
1031 5
def droid_scanner(build, device, method=None, session=None):
1032
    """
1033
    Check for Android autoloaders on BlackBerry's site.
1034
1035
    :param build: Build to check, 3 letters + 3 numbers.
1036
    :type build: str
1037
1038
    :param device: Device to check.
1039
    :type device: str
1040
1041
    :param method: None for regular OS links, "sha256/512" for SHA256 or 512 hash.
1042
    :type method: str
1043
1044
    :param session: Requests session object, default is created on the fly.
1045
    :type session: requests.Session()
1046
    """
1047 5
    devs = prepare_droid_list(device)
1048 5
    skels = bulk_droid_skeletons(devs, build, method)
1049 5
    with concurrent.futures.ThreadPoolExecutor(max_workers=len(skels)) as xec:
1050 5
        results = droid_scanner_worker(xec, skels, session)
1051 5
    return results if results else None
1052
1053
1054 5
def droid_scanner_worker(xec, skels, session=None):
1055
    """
1056
    Worker to check for Android autoloaders.
1057
1058
    :param xec: ThreadPoolExecutor instance.
1059
    :type xec: concurrent.futures.ThreadPoolExecutor
1060
1061
    :param skels: List of skeleton formats.
1062
    :type skels: list(str)
1063
1064
    :param session: Requests session object, default is created on the fly.
1065
    :type session: requests.Session()
1066
    """
1067 5
    results = []
1068 5
    for skel in skels:
1069 5
        avail = xec.submit(availability, skel, session)
1070 5
        if avail.result():
1071 5
            results.append(skel)
1072 5
    return results
1073
1074
1075 5
def chunker(iterable, inc):
1076
    """
1077
    Convert an iterable into a list of inc sized lists.
1078
1079
    :param iterable: Iterable to chunk.
1080
    :type iterable: list/tuple/string
1081
1082
    :param inc: Increment; how big each chunk is.
1083
    :type inc: int
1084
    """
1085 5
    chunks = [iterable[x:x+inc] for x in range(0, len(iterable), inc)]
1086 5
    return chunks
1087
1088
1089 5
def unicode_filter(intext):
1090
    """
1091
    Remove Unicode crap.
1092
1093
    :param intext: Text to filter.
1094
    :type intext: str
1095
    """
1096 5
    return intext.replace("\u2013", "").strip()
1097
1098
1099 5
def table_header_filter(ptag):
1100
    """
1101
    Validate p tag, to see if it's relevant.
1102
1103
    :param ptag: P tag.
1104
    :type ptag: bs4.element.Tag
1105
    """
1106 5
    valid = ptag.find("b") and "BlackBerry" in ptag.text and not "experts" in ptag.text
1107 5
    return valid
1108
1109
1110 5
def table_headers(pees):
1111
    """
1112
    Generate table headers from list of p tags.
1113
1114
    :param pees: List of p tags.
1115
    :type pees: list(bs4.element.Tag)
1116
    """
1117 5
    bolds = [x.text for x in pees if table_header_filter(x)]
1118 5
    return bolds
1119
1120
1121 5
@pem_wrapper
1122 5
def loader_page_scraper(session=None):
1123
    """
1124
    Return scraped autoloader pages.
1125
1126
    :param session: Requests session object, default is created on the fly.
1127
    :type session: requests.Session()
1128
    """
1129 5
    session = generic_session(session)
1130 5
    loader_page_scraper_og(session)
1131 5
    loader_page_scraper_bbm(session)
1132
1133
1134 5
def loader_page_scraper_og(session=None):
1135
    """
1136
    Return scraped autoloader page, original site.
1137
1138
    :param session: Requests session object, default is created on the fly.
1139
    :type session: requests.Session()
1140
    """
1141 5
    url = "https://ca.blackberry.com/support/smartphones/Android-OS-Reload.html"
1142 5
    soup = generic_soup_parser(url, session)
1143 5
    tables = soup.find_all("table")
1144 5
    headers = table_headers(soup.find_all("p"))
1145 5
    for idx, table in enumerate(tables):
1146 5
        loader_page_chunker_og(idx, table, headers)
1147
1148
1149 5
def loader_page_scraper_bbm(session=None):
1150
    """
1151
    Return scraped autoloader page, new site.
1152
1153
    :param session: Requests session object, default is created on the fly.
1154
    :type session: requests.Session()
1155
    """
1156 5
    url = "https://www.blackberrymobile.com/support/reload-software/"
1157 5
    soup = generic_soup_parser(url, session)
1158 5
    ulls = soup.find_all("ul", {"class": re.compile("list-two special-.")})[1:]
1159 5
    print("~~~BlackBerry KEYone~~~")
1160 5
    for ull in ulls:
1161 5
        loader_page_chunker_bbm(ull)
1162
1163
1164 5
def loader_page_chunker_og(idx, table, headers):
1165
    """
1166
    Given a loader page table, chunk it into lists of table cells.
1167
1168
    :param idx: Index of enumerating tables.
1169
    :type idx: int
1170
1171
    :param table: HTML table tag.
1172
    :type table: bs4.element.Tag
1173
1174
    :param headers: List of table headers.
1175
    :type headers: list(str)
1176
    """
1177 5
    print("~~~{0}~~~".format(headers[idx]))
1178 5
    chunks = chunker(table.find_all("td"), 4)
1179 5
    for chunk in chunks:
1180 5
        loader_page_printer(chunk)
1181 5
    print(" ")
1182
1183
1184 5
def loader_page_chunker_bbm(ull):
1185
    """
1186
    Given a loader page list, chunk it into lists of list items.
1187
1188
    :param ull: HTML unordered list tag.
1189
    :type ull: bs4.element.Tag
1190
    """
1191 5
    chunks = chunker(ull.find_all("li"), 3)
1192 5
    for chunk in chunks:
1193 5
        loader_page_printer(chunk)
1194
1195
1196 5
def loader_page_printer(chunk):
1197
    """
1198
    Print individual cell texts given a list of table cells.
1199
1200
    :param chunk: List of td tags.
1201
    :type chunk: list(bs4.element.Tag)
1202
    """
1203 5
    key = unicode_filter(chunk[0].text)
1204 5
    ver = unicode_filter(chunk[1].text)
1205 5
    link = unicode_filter(chunk[2].find("a")["href"])
1206 5
    print("{0}\n    {1}: {2}".format(key, ver, link))
1207
1208
1209 5
@pem_wrapper
1210 5
def base_metadata(url, session=None):
1211
    """
1212
    Get BBNDK metadata, base function.
1213
1214
    :param url: URL to check.
1215
    :type url: str
1216
1217
    :param session: Requests session object, default is created on the fly.
1218
    :type session: requests.Session()
1219
    """
1220 5
    session = generic_session(session)
1221 5
    req = session.get(url)
1222 5
    data = req.content
1223 5
    entries = data.split(b"\n")
1224 5
    metadata = [entry.split(b",")[1].decode("utf-8") for entry in entries if entry]
1225 5
    return metadata
1226
1227
1228 5
def base_metadata_url(alternate=None):
1229
    """
1230
    Return metadata URL.
1231
1232
    :param alternate: If the URL is for the simulator metadata. Default is False.
1233
    :type alternate: str
1234
    """
1235 5
    baseurl = "http://downloads.blackberry.com/upr/developers/update/bbndk"
1236 5
    tail = "{0}/{0}_metadata".format(alternate) if alternate is not None else "metadata"
1237 5
    return "{0}/{1}".format(baseurl, tail)
1238
1239
1240 5
def ndk_metadata(session=None):
1241
    """
1242
    Get BBNDK target metadata.
1243
1244
    :param session: Requests session object, default is created on the fly.
1245
    :type session: requests.Session()
1246
    """
1247 5
    ndkurl = base_metadata_url()
1248 5
    data = base_metadata(ndkurl, session)
1249 5
    metadata = [entry for entry in data if entry.startswith(("10.0", "10.1", "10.2"))]
1250 5
    return metadata
1251
1252
1253 5
def sim_metadata(session=None):
1254
    """
1255
    Get BBNDK simulator metadata.
1256
1257
    :param session: Requests session object, default is created on the fly.
1258
    :type session: requests.Session()
1259
    """
1260 5
    simurl = base_metadata_url("simulator")
1261 5
    metadata = base_metadata(simurl, session)
1262 5
    return metadata
1263
1264
1265 5
def runtime_metadata(session=None):
1266
    """
1267
    Get BBNDK runtime metadata.
1268
1269
    :param session: Requests session object, default is created on the fly.
1270
    :type session: requests.Session()
1271
    """
1272 5
    rturl = base_metadata_url("runtime")
1273 5
    metadata = base_metadata(rturl, session)
1274 5
    return metadata
1275
1276
1277 5
def series_generator(osversion):
1278
    """
1279
    Generate series/branch name from OS version.
1280
1281
    :param osversion: OS version.
1282
    :type osversion: str
1283
    """
1284 5
    splits = osversion.split(".")
1285 5
    return "BB{0}_{1}_{2}".format(*splits[0:3])
1286
1287
1288 5
@pem_wrapper
1289 5
def devalpha_urls(osversion, skel, session=None):
1290
    """
1291
    Check individual Dev Alpha autoloader URLs.
1292
1293
    :param osversion: OS version.
1294
    :type osversion: str
1295
1296
    :param skel: Individual skeleton format to try.
1297
    :type skel: str
1298
1299
    :param session: Requests session object, default is created on the fly.
1300
    :type session: requests.Session()
1301
    """
1302 5
    session = generic_session(session)
1303 5
    baseurl = "http://downloads.blackberry.com/upr/developers/downloads"
1304 5
    url = "{2}/{0}{1}.exe".format(skel, osversion, baseurl)
1305 5
    req = session.head(url)
1306 5
    if req.status_code == 200:
1307 5
        finals = (url, req.headers["content-length"])
1308
    else:
1309 5
        finals = ()
1310 5
    return finals
1311
1312
1313 5
def devalpha_urls_serieshandler(osversion, skeletons):
1314
    """
1315
    Process list of candidate Dev Alpha autoloader URLs.
1316
1317
    :param osversion: OS version.
1318
    :type osversion: str
1319
1320
    :param skeletons: List of skeleton formats to try.
1321
    :type skeletons: list
1322
    """
1323 5
    skels = skeletons
1324 5
    for idx, skel in enumerate(skeletons):
1325 5
        if "<SERIES>" in skel:
1326 5
            skels[idx] = skel.replace("<SERIES>", series_generator(osversion))
1327 5
    return skels
1328
1329
1330 5
def devalpha_urls_bulk(osversion, skeletons, xec, session=None):
1331
    """
1332
    Construct list of valid Dev Alpha autoloader URLs.
1333
1334
    :param osversion: OS version.
1335
    :type osversion: str
1336
1337
    :param skeletons: List of skeleton formats to try.
1338
    :type skeletons: list
1339
1340
    :param xec: ThreadPoolExecutor instance.
1341
    :type xec: concurrent.futures.ThreadPoolExecutor
1342
1343
    :param session: Requests session object, default is created on the fly.
1344
    :type session: requests.Session()
1345
    """
1346 5
    finals = {}
1347 5
    skels = devalpha_urls_serieshandler(osversion, skeletons)
1348 5
    for skel in skels:
1349 5
        final = xec.submit(devalpha_urls, osversion, skel, session).result()
1350 5
        if final:
1351 5
            finals[final[0]] = final[1]
1352 5
    return finals
1353
1354
1355 5
def devalpha_urls_bootstrap(osversion, skeletons, session=None):
1356
    """
1357
    Get list of valid Dev Alpha autoloader URLs.
1358
1359
    :param osversion: OS version.
1360
    :type osversion: str
1361
1362
    :param skeletons: List of skeleton formats to try.
1363
    :type skeletons: list
1364
1365
    :param session: Requests session object, default is created on the fly.
1366
    :type session: requests.Session()
1367
    """
1368 5
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as xec:
1369 5
        try:
1370 5
            return devalpha_urls_bulk(osversion, skeletons, xec, session)
1371 5
        except KeyboardInterrupt:
1372 5
            xec.shutdown(wait=False)
1373
1374
1375 5
def dev_dupe_dicter(finals):
1376
    """
1377
    Prepare dictionary to clean duplicate autoloaders.
1378
1379
    :param finals: Dict of URL:content-length pairs.
1380
    :type finals: dict(str: str)
1381
    """
1382 5
    revo = {}
1383 5
    for key, val in finals.items():
1384 5
        revo.setdefault(val, set()).add(key)
1385 5
    return revo
1386
1387
1388 5
def dev_dupe_remover(finals, dupelist):
1389
    """
1390
    Filter dictionary of autoloader entries.
1391
1392
    :param finals: Dict of URL:content-length pairs.
1393
    :type finals: dict(str: str)
1394
1395
    :param dupelist: List of duplicate URLs.
1396
    :type duplist: list(str)
1397
    """
1398 5
    for dupe in dupelist:
1399 5
        for entry in dupe:
1400 5
            if "DevAlpha" in entry:
1401 5
                del finals[entry]
1402 5
    return finals
1403
1404
1405 5
def dev_dupe_cleaner(finals):
1406
    """
1407
    Clean duplicate autoloader entries.
1408
1409
    :param finals: Dict of URL:content-length pairs.
1410
    :type finals: dict(str: str)
1411
    """
1412 5
    revo = dev_dupe_dicter(finals)
1413 5
    dupelist = [val for key, val in revo.items() if len(val) > 1]
1414 5
    finals = dev_dupe_remover(finals, dupelist)
1415
    return finals
1416