Passed
Push — master ( 57fbcd...e67727 )
by John
02:40
created

bbarchivist.networkutils.make_droid_skeleton()   B

Complexity

Conditions 3

Size

Total Lines 24
Code Lines 8

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 8
CRAP Score 3

Importance

Changes 0
Metric Value
cc 3
eloc 8
nop 4
dl 0
loc 24
rs 8.9713
c 0
b 0
f 0
ccs 8
cts 8
cp 1
crap 3
1
#!/usr/bin/env python3
0 ignored issues
show
coding-style introduced by
Too many lines in module (1415/1000)
Loading history...
2 5
"""This module is used for network connections; APIs, downloading, etc."""
3
4 5
import base64  # encoding
5 5
import binascii  # encoding
6 5
import concurrent.futures  # multiprocessing/threading
7 5
import glob  # pem file lookup
8 5
import hashlib  # salt
9 5
import os  # filesystem read
10 5
import random  # salt
11 5
import re  # regexes
12 5
import time  # salt
13 5
import zlib  # encoding
14
15 5
import requests  # downloading
16 5
from bs4 import BeautifulSoup  # scraping
17 5
from bbarchivist import utilities  # parse filesize
18 5
from bbarchivist import xmlutils  # xml work
19 5
from bbarchivist.bbconstants import SERVERS, TCLMASTERS  # lookup servers
20
21 5
__author__ = "Thurask"
22 5
__license__ = "WTFPL v2"
23 5
__copyright__ = "2015-2018 Thurask"
24
25
26 5
def grab_pem():
27
    """
28
    Work with either local cacerts or system cacerts.
29
    """
30 5
    try:
31 5
        pemfile = glob.glob(os.path.join(os.getcwd(), "cacert.pem"))[0]
32 5
    except IndexError:
33 5
        return requests.certs.where()  # no local cacerts
34
    else:
35 5
        return os.path.abspath(pemfile)  # local cacerts
36
37
38 5
def pem_wrapper(method):
39
    """
40
    Decorator to set REQUESTS_CA_BUNDLE.
41
42
    :param method: Method to use.
43
    :type method: function
44
    """
45 5
    def wrapper(*args, **kwargs):
46
        """
47
        Set REQUESTS_CA_BUNDLE before doing function.
48
        """
49 5
        os.environ["REQUESTS_CA_BUNDLE"] = grab_pem()
50 5
        return method(*args, **kwargs)
51 5
    return wrapper
52
53
54 5
def try_try_again(method):
55
    """
56
    Decorator to absorb timeouts, proxy errors, and other common exceptions.
57
58
    :param method: Method to use.
59
    :type method: function
60
    """
61 5
    def wrapper(*args, **kwargs):
62
        """
63
        Try function, try it again up to five times, and leave gracefully.
64
        """
65 5
        tries = 5
66 5
        for _ in range(tries):
67 5
            try:
68 5
                result = method(*args, **kwargs)
69 5
            except (requests.exceptions.Timeout, requests.exceptions.ConnectionError, requests.exceptions.ProxyError):
70 5
                continue
71
            else:
72 5
                break
73
        else:
74 5
            result = None
75 5
        return result
76 5
    return wrapper
77
78
79 5
def generic_session(session=None):
80
    """
81
    Create a Requests session object on the fly, if need be.
82
83
    :param session: Requests session object, created if this is None.
84
    :type session: requests.Session()
85
    """
86 5
    sess = requests.Session() if session is None else session
87 5
    return sess
88
89
90 5
def generic_soup_parser(url, session=None):
91
    """
92
    Get a BeautifulSoup HTML parser for some URL.
93
94
    :param url: The URL to check.
95
    :type url: str
96
97
    :param session: Requests session object, default is created on the fly.
98
    :type session: requests.Session()
99
    """
100 5
    session = generic_session(session)
101 5
    req = session.get(url)
102 5
    soup = BeautifulSoup(req.content, "html.parser")
103 5
    return soup
104
105
106 5
@pem_wrapper
107 5
def get_length(url, session=None):
108
    """
109
    Get content-length header from some URL.
110
111
    :param url: The URL to check.
112
    :type url: str
113
114
    :param session: Requests session object, default is created on the fly.
115
    :type session: requests.Session()
116
    """
117 5
    session = generic_session(session)
118 5
    if url is None:
119 5
        return 0
120 5
    try:
121 5
        heads = session.head(url)
122 5
        fsize = heads.headers['content-length']
123 5
        return int(fsize)
124 5
    except requests.ConnectionError:
125 5
        return 0
126
127
128 5
@pem_wrapper
129 5
def download(url, output_directory=None, session=None):
130
    """
131
    Download file from given URL.
132
133
    :param url: URL to download from.
134
    :type url: str
135
136
    :param output_directory: Download folder. Default is local.
137
    :type output_directory: str
138
139
    :param session: Requests session object, default is created on the fly.
140
    :type session: requests.Session()
141
    """
142 5
    session = generic_session(session)
143 5
    output_directory = utilities.dirhandler(output_directory, os.getcwd())
144 5
    lfname = url.split('/')[-1]
145 5
    sname = utilities.stripper(lfname)
146 5
    fname = os.path.join(output_directory, lfname)
147 5
    download_writer(url, fname, lfname, sname, session)
148 5
    remove_empty_download(fname)
149
150
151 5
def remove_empty_download(fname):
152
    """
153
    Remove file if it's empty.
154
155
    :param fname: File path.
156
    :type fname: str
157
    """
158 5
    if os.stat(fname).st_size == 0:
159 5
        os.remove(fname)
160
161
162 5
def download_writer(url, fname, lfname, sname, session=None):
163
    """
164
    Download file and write to disk.
165
166
    :param url: URL to download from.
167
    :type url: str
168
169
    :param fname: File path.
170
    :type fname: str
171
172
    :param lfname: Long filename.
173
    :type lfname: str
174
175
    :param sname: Short name, for printing to screen.
176
    :type sname: str
177
178
    :param session: Requests session object, default is created on the fly.
179
    :type session: requests.Session()
180
    """
181 5
    with open(fname, "wb") as file:
182 5
        req = session.get(url, stream=True)
183 5
        clength = req.headers['content-length']
184 5
        fsize = utilities.fsizer(clength)
185 5
        if req.status_code == 200:  # 200 OK
186 5
            print("DOWNLOADING {0} [{1}]".format(sname, fsize))
187 5
            for chunk in req.iter_content(chunk_size=1024):
188 5
                file.write(chunk)
189
        else:
190 5
            print("ERROR: HTTP {0} IN {1}".format(req.status_code, lfname))
191
192
193 5
def download_bootstrap(urls, outdir=None, workers=5, session=None):
194
    """
195
    Run downloaders for each file in given URL iterable.
196
197
    :param urls: URLs to download.
198
    :type urls: list
199
200
    :param outdir: Download folder. Default is handled in :func:`download`.
201
    :type outdir: str
202
203
    :param workers: Number of worker processes. Default is 5.
204
    :type workers: int
205
206
    :param session: Requests session object, default is created on the fly.
207
    :type session: requests.Session()
208
    """
209 5
    workers = len(urls) if len(urls) < workers else workers
210 5
    spinman = utilities.SpinManager()
211 5
    with concurrent.futures.ThreadPoolExecutor(max_workers=workers) as xec:
212 5
        try:
213 5
            spinman.start()
214 5
            for url in urls:
215 5
                xec.submit(download, url, outdir, session)
216 5
        except (KeyboardInterrupt, SystemExit):
217 5
            xec.shutdown()
218 5
            spinman.stop()
219 5
    spinman.stop()
220 5
    utilities.spinner_clear()
221 5
    utilities.line_begin()
222
223
224 5
def download_android_tools(downloaddir=None):
225
    """
226
    Download Android SDK platform tools.
227
228
    :param downloaddir: Directory name, default is "plattools".
229
    :type downloaddir: str
230
    """
231 5
    if downloaddir is None:
232 5
        downloaddir = "plattools"
233 5
    if os.path.exists(downloaddir):
234 5
        os.removedirs(downloaddir)
235 5
    os.mkdir(downloaddir)
236 5
    platforms = ("windows", "linux", "darwin")
237 5
    baseurl = "https://dl.google.com/android/repository/platform-tools-latest"
238 5
    dlurls = ["{1}-{0}.zip".format(plat, baseurl) for plat in platforms]
239 5
    sess = generic_session()
240 5
    download_bootstrap(dlurls, outdir="plattools", session=sess)
241
242
243 5
@pem_wrapper
244 5
def getcode(url, session=None):
245
    """
246
    Return status code of given URL.
247
248
    :param url: URL to check.
249
    :type url: str
250
251
    :param session: Requests session object, default is created on the fly.
252
    :type session: requests.Session()
253
    """
254 5
    session = generic_session(session)
255 5
    try:
256 5
        shead = session.head(url)
257 5
        status = int(shead.status_code)
258 5
        return status
259 5
    except requests.ConnectionError:
260 5
        return 404
261
262
263 5
@pem_wrapper
264 5
def availability(url, session=None):
265
    """
266
    Check HTTP status code of given URL.
267
    200 or 301-308 is OK, else is not.
268
269
    :param url: URL to check.
270
    :type url: str
271
272
    :param session: Requests session object, default is created on the fly.
273
    :type session: requests.Session()
274
    """
275 5
    status = getcode(url, session)
276 5
    return status == 200 or 300 < status <= 308
277
278
279 5
def clean_availability(results, server):
280
    """
281
    Clean availability for autolookup script.
282
283
    :param results: Result dict.
284
    :type results: dict(str: str)
285
286
    :param server: Server, key for result dict.
287
    :type server: str
288
    """
289 5
    marker = "PD" if server == "p" else server.upper()
290 5
    rel = results[server.lower()]
291 5
    avail = marker if rel != "SR not in system" and rel is not None else "  "
292 5
    return rel, avail
293
294
295 5
def tcl_master():
296
    """
297
    Get a random master server.
298
    """
299 5
    return random.choice(TCLMASTERS)
300
301
302 5
def tcl_default_id(devid):
303
    """
304
    Get an IMEI or a serial number or something.
305
306
    :param devid: Return default if this is None.
307
    :type devid: str
308
    """
309 5
    if devid is None:
310 5
        devid = "543212345000000"
311 5
    return devid
312
313
314 5
def check_prep(curef, mode=4, fvver="AAA000", cltp=2010, cktp=2, rtd=1, chnl=2, devid=None):
315
    """
316
    Prepare variables for TCL update check.
317
318
    :param curef: PRD of the phone variant to check.
319
    :type curef: str
320
321
    :param mode: 4 if downloading autoloaders, 2 if downloading OTA deltas.
322
    :type mode: int
323
324
    :param fvver: Initial software version, must be specific if downloading OTA deltas.
325
    :type fvver: str
326
327
    :param cltp: 2010 to always show latest version, 10 to show actual updates. Default is 2010.
328
    :type cltp: int
329
330
    :param cktp: 2 if checking manually, 1 if checking automatically. Default is 2.
331
    :type cktp: int
332
333
    :param rtd: 2 if rooted, 1 if not. Default is 1.
334
    :type rtd: int
335
336
    :param chnl: 2 if checking on WiFi, 1 if checking on mobile. Default is 2.
337
    :type chnl: int
338
339
    :param devid: Serial number/IMEI. Default is fake, not that it matters.
340
    :type devid: str
341
    """
342 5
    devid = tcl_default_id(devid)
343 5
    geturl = "http://{0}/check.php".format(tcl_master())
344 5
    params = {"id": devid, "curef": curef, "fv": fvver, "mode": mode, "type": "Firmware", "cltp": cltp, "cktp": cktp, "rtd": rtd, "chnl": chnl}
345 5
    return geturl, params
346
347
348 5
@pem_wrapper
349 5
@try_try_again
350 5
def tcl_check(curef, session=None, mode=4, fvver="AAA000", export=False):
351
    """
352
    Check TCL server for updates.
353
354
    :param curef: PRD of the phone variant to check.
355
    :type curef: str
356
357
    :param session: Requests session object, default is created on the fly.
358
    :type session: requests.Session()
359
360
    :param mode: 4 if downloading autoloaders, 2 if downloading OTA deltas.
361
    :type mode: int
362
363
    :param fvver: Initial software version, must be specific if downloading OTA deltas.
364
    :type fvver: str
365
366
    :param export: Whether to export XML response to file. Default is False.
367
    :type export: bool
368
    """
369 5
    sess = generic_session(session)
370 5
    geturl, params = check_prep(curef, mode, fvver)
371 5
    req = sess.get(geturl, params=params)
372 5
    if req.status_code == 200:
373 5
        req.encoding = "utf-8"
374 5
        response = req.text
375 5
        if export:
376 5
            salt = tcl_salt()
377 5
            xmlutils.dump_tcl_xml(response, salt)
378
    else:
379 5
        response = None
380 5
    return response
381
382
383 5
def tcl_salt():
384
    """
385
    Generate salt value for TCL server tools.
386
    """
387 5
    millis = round(time.time() * 1000)
388 5
    tail = "{0:06d}".format(random.randint(0, 999999))
389 5
    return "{0}{1}".format(str(millis), tail)
390
391
392 5
def unpack_vdkey():
393
    """
394
    Draw the curtain back.
395
    """
396 5
    vdkey = b"eJwdjwEOwDAIAr8kKFr//7HhmqXp8AIIDrYAgg8byiUXrwRJRXja+d6iNxu0AhUooDCN9rd6rDLxmGIakUVWo3IGCTRWqCAt6X4jGEIUAxgN0eYWnp+LkpHQAg/PsO90ELsy0Npm/n2HbtPndFgGEV31R9OmT4O4nrddjc3Qt6nWscx7e+WRHq5UnOudtjw5skuV09pFhvmqnOEIs4ljPeel1wfLYUF4\n"
397 5
    vdk = zlib.decompress(binascii.a2b_base64(vdkey))
398 5
    return vdk.decode("utf-8")
399
400
401 5
def vkhash(curef, tvver, fwid, salt, mode=4, fvver="AAA000", cltp=2010, devid=None):
402
    """
403
    Generate hash from TCL update server variables.
404
405
    :param curef: PRD of the phone variant to check.
406
    :type curef: str
407
408
    :param tvver: Target software version.
409
    :type tvver: str
410
411
    :param fwid: Firmware ID for desired download file.
412
    :type fwid: str
413
414
    :param salt: Salt hash.
415
    :type salt: str
416
417
    :param mode: 4 if downloading autoloaders, 2 if downloading OTA deltas.
418
    :type mode: int
419
420
    :param fvver: Initial software version, must be specific if downloading OTA deltas.
421
    :type fvver: str
422
423
    :param cltp: 2010 to always show latest version, 10 to show actual updates. Default is 2010.
424
    :type cltp: int
425
426
    :param devid: Serial number/IMEI. Default is fake, not that it matters.
427
    :type devid: str
428
    """
429 5
    vdk = unpack_vdkey()
430 5
    devid = tcl_default_id(devid)
431 5
    query = "id={0}&salt={1}&curef={2}&fv={3}&tv={4}&type={5}&fw_id={6}&mode={7}&cltp={8}{9}".format(devid, salt, curef, fvver, tvver, "Firmware", fwid, mode, cltp, vdk)
432 5
    engine = hashlib.sha1()
433 5
    engine.update(bytes(query, "utf-8"))
434 5
    return engine.hexdigest()
435
436
437 5
def download_request_prep(curef, tvver, fwid, salt, vkh, mode=4, fvver="AAA000", cltp=2010, devid=None):
438
    """
439
    Prepare variables for download server check.
440
441
    :param curef: PRD of the phone variant to check.
442
    :type curef: str
443
444
    :param tvver: Target software version.
445
    :type tvver: str
446
447
    :param fwid: Firmware ID for desired download file.
448
    :type fwid: str
449
450
    :param salt: Salt hash.
451
    :type salt: str
452
453
    :param vkh: VDKey-based hash.
454
    :type vkh: str
455
456
    :param mode: 4 if downloading autoloaders, 2 if downloading OTA deltas.
457
    :type mode: int
458
459
    :param fvver: Initial software version, must be specific if downloading OTA deltas.
460
    :type fvver: str
461
462
    :param cltp: 2010 to always show latest version, 10 to show actual updates. Default is 2010.
463
    :type cltp: int
464
465
    :param devid: Serial number/IMEI. Default is fake, not that it matters.
466
    :type devid: str
467
    """
468 5
    devid = tcl_default_id(devid)
469 5
    posturl = "http://{0}/download_request.php".format(tcl_master())
470 5
    params = {"id": devid, "curef": curef, "fv": fvver, "mode": mode, "type": "Firmware", "tv": tvver, "fw_id": fwid, "salt": salt, "vk": vkh, "cltp": cltp}
471 5
    if mode == 4:
472 5
        params["foot"] = 1
473 5
    return posturl, params
474
475
476 5
@pem_wrapper
477 5
@try_try_again
478 5
def tcl_download_request(curef, tvver, fwid, salt, vkh, session=None, mode=4, fvver="AAA000", export=False):
479
    """
480
    Check TCL server for download URLs.
481
482
    :param curef: PRD of the phone variant to check.
483
    :type curef: str
484
485
    :param tvver: Target software version.
486
    :type tvver: str
487
488
    :param fwid: Firmware ID for desired download file.
489
    :type fwid: str
490
491
    :param salt: Salt hash.
492
    :type salt: str
493
494
    :param vkh: VDKey-based hash.
495
    :type vkh: str
496
497
    :param session: Requests session object, default is created on the fly.
498
    :type session: requests.Session()
499
500
    :param mode: 4 if downloading autoloaders, 2 if downloading OTA deltas.
501
    :type mode: int
502
503
    :param fvver: Initial software version, must be specific if downloading OTA deltas.
504
    :type fvver: str
505
506
    :param export: Whether to export XML response to file. Default is False.
507
    :type export: bool
508
    """
509 5
    sess = generic_session(session)
510 5
    posturl, params = download_request_prep(curef, tvver, fwid, salt, vkh, mode, fvver)
511 5
    req = sess.post(posturl, data=params)
512 5
    if req.status_code == 200:
513 5
        req.encoding = "utf-8"
514 5
        response = req.text
515 5
        if export:
516 5
            xmlutils.dump_tcl_xml(response, salt)
517
    else:
518 5
        response = None
519 5
    return response
520
521
522 5
def encrypt_header_prep(address, encslave):
523
    """
524
    Prepare variables for encrypted header check.
525
526
    :param address: File URL minus host.
527
    :type address: str
528
529
    :param encslave: Server hosting header script.
530
    :type encslave: str
531
    """
532 5
    encs = {b"YWNjb3VudA==" : b"emhlbmdodWEuZ2Fv", b"cGFzc3dvcmQ=": b"cWFydUQ0b2s="}
533 5
    params = {base64.b64decode(key): base64.b64decode(val) for key, val in encs.items()}
534 5
    params[b"address"] = bytes(address, "utf-8")
535 5
    posturl = "http://{0}/encrypt_header.php".format(encslave)
536 5
    return posturl, params
537
538
539 5
@pem_wrapper
540 5
def encrypt_header(address, encslave, session=None):
541
    """
542
    Check encrypted header.
543
544
    :param address: File URL minus host.
545
    :type address: str
546
547
    :param encslave: Server hosting header script.
548
    :type encslave: str
549
550
    :param session: Requests session object, default is created on the fly.
551
    :type session: requests.Session()
552
    """
553 5
    sess = generic_session(session)
554 5
    posturl, params = encrypt_header_prep(address, encslave)
555 5
    req = sess.post(posturl, data=params)
556 5
    if req.status_code == 206:  # partial
557 5
        contentlength = int(req.headers["Content-Length"])
558 5
        sentinel = "HEADER FOUND" if contentlength == 4194320 else "NO HEADER FOUND"
559
    else:
560 5
        sentinel = None
561 5
    return sentinel
562
563
564 5
@pem_wrapper
565
def remote_prd_info():
566
    """
567
    Get list of remote OTA versions.
568
    """
569 5
    dburl = "https://tclota.birth-online.de/json_lastupdates.php"
570 5
    req = requests.get(dburl)
571 5
    reqj = req.json()
572 5
    otadict = {val["curef"]: val["last_ota"] for val in reqj.values() if val["last_ota"] is not None}
573 5
    return otadict
574
575
576 5
@pem_wrapper
577 5
def carrier_checker(mcc, mnc, session=None):
578
    """
579
    Query BlackBerry World to map a MCC and a MNC to a country and carrier.
580
581
    :param mcc: Country code.
582
    :type mcc: int
583
584
    :param mnc: Network code.
585
    :type mnc: int
586
587
    :param session: Requests session object, default is created on the fly.
588
    :type session: requests.Session()
589
    """
590 5
    session = generic_session(session)
591 5
    baseurl = "http://appworld.blackberry.com/ClientAPI/checkcarrier"
592 5
    url = "{2}?homemcc={0}&homemnc={1}&devicevendorid=-1&pin=0".format(mcc, mnc, baseurl)
593 5
    user_agent = {'User-agent': 'AppWorld/5.1.0.60'}
594 5
    req = session.get(url, headers=user_agent)
595 5
    country, carrier = xmlutils.cchecker_get_tags(req.text)
596 5
    return country, carrier
597
598
599 5
def return_npc(mcc, mnc):
600
    """
601
    Format MCC and MNC into a NPC.
602
603
    :param mcc: Country code.
604
    :type mcc: int
605
606
    :param mnc: Network code.
607
    :type mnc: int
608
    """
609 5
    return "{0}{1}30".format(str(mcc).zfill(3), str(mnc).zfill(3))
610
611
612 5
@pem_wrapper
613 5
def carrier_query(npc, device, upgrade=False, blitz=False, forced=None, session=None):
614
    """
615
    Query BlackBerry servers, check which update is out for a carrier.
616
617
    :param npc: MCC + MNC (see `func:return_npc`)
618
    :type npc: int
619
620
    :param device: Hexadecimal hardware ID.
621
    :type device: str
622
623
    :param upgrade: Whether to use upgrade files. False by default.
624
    :type upgrade: bool
625
626
    :param blitz: Whether or not to create a blitz package. False by default.
627
    :type blitz: bool
628
629
    :param forced: Force a software release.
630
    :type forced: str
631
632
    :param session: Requests session object, default is created on the fly.
633
    :type session: requests.Session()
634
    """
635 5
    session = generic_session(session)
636 5
    upg = "upgrade" if upgrade else "repair"
637 5
    forced = "latest" if forced is None else forced
638 5
    url = "https://cs.sl.blackberry.com/cse/updateDetails/2.2/"
639 5
    query = xmlutils.prep_carrier_query(npc, device, upg, forced)
640 5
    header = {"Content-Type": "text/xml;charset=UTF-8"}
641 5
    req = session.post(url, headers=header, data=query)
642 5
    return xmlutils.parse_carrier_xml(req.text, blitz)
643
644
645 5
@pem_wrapper
646 5
def sr_lookup(osver, server, session=None):
647
    """
648
    Software release lookup, with choice of server.
649
    :data:`bbarchivist.bbconstants.SERVERLIST` for server list.
650
651
    :param osver: OS version to lookup, 10.x.y.zzzz.
652
    :type osver: str
653
654
    :param server: Server to use.
655
    :type server: str
656
657
    :param session: Requests session object, default is created on the fly.
658
    :type session: requests.Session()
659
    """
660 5
    query = xmlutils.prep_sr_lookup(osver)
661 5
    reqtext = sr_lookup_poster(query, server, session)
662 5
    packtext = xmlutils.parse_sr_lookup(reqtext)
663 5
    return packtext
664
665
666 5
def sr_lookup_poster(query, server, session=None):
667
    """
668
    Post the XML payload for a software release lookup.
669
670
    :param query: XML payload.
671
    :type query: str
672
673
    :param server: Server to use.
674
    :type server: str
675
676
    :param session: Requests session object, default is created on the fly.
677
    :type session: requests.Session()
678
    """
679 5
    session = generic_session(session)
680 5
    header = {"Content-Type": "text/xml;charset=UTF-8"}
681 5
    try:
682 5
        req = session.post(server, headers=header, data=query, timeout=1)
683 5
    except (requests.exceptions.Timeout, requests.exceptions.ConnectionError):
684 5
        reqtext = "SR not in system"
685
    else:
686 5
        reqtext = req.text
687 5
    return reqtext
688
689
690 5
def sr_lookup_bootstrap(osv, session=None, no2=False):
691
    """
692
    Run lookups for each server for given OS.
693
694
    :param osv: OS to check.
695
    :type osv: str
696
697
    :param session: Requests session object, default is created on the fly.
698
    :type session: requests.Session()
699
700
    :param no2: Whether to skip Alpha2/Beta2 servers. Default is false.
701
    :type no2: bool
702
    """
703 5
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as xec:
704 5
        try:
705 5
            results = {
706
                "p": None,
707
                "a1": None,
708
                "a2": None,
709
                "b1": None,
710
                "b2": None
711
            }
712 5
            if no2:
713 5
                del results["a2"]
714 5
                del results["b2"]
715 5
            for key in results:
716 5
                results[key] = xec.submit(sr_lookup, osv, SERVERS[key], session).result()
717 5
            return results
718 5
        except KeyboardInterrupt:
719 5
            xec.shutdown(wait=False)
720
721
722 5
@pem_wrapper
723 5
def available_bundle_lookup(mcc, mnc, device, session=None):
724
    """
725
    Check which software releases were ever released for a carrier.
726
727
    :param mcc: Country code.
728
    :type mcc: int
729
730
    :param mnc: Network code.
731
    :type mnc: int
732
733
    :param device: Hexadecimal hardware ID.
734
    :type device: str
735
736
    :param session: Requests session object, default is created on the fly.
737
    :type session: requests.Session()
738
    """
739 5
    session = generic_session(session)
740 5
    server = "https://cs.sl.blackberry.com/cse/availableBundles/1.0.0/"
741 5
    npc = return_npc(mcc, mnc)
742 5
    query = xmlutils.prep_available_bundle(device, npc)
743 5
    header = {"Content-Type": "text/xml;charset=UTF-8"}
744 5
    req = session.post(server, headers=header, data=query)
745 5
    bundlelist = xmlutils.parse_available_bundle(req.text)
746 5
    return bundlelist
747
748
749 5
@pem_wrapper
750 5
def ptcrb_scraper(ptcrbid, session=None):
751
    """
752
    Get the PTCRB results for a given device.
753
754
    :param ptcrbid: Numerical ID from PTCRB (end of URL).
755
    :type ptcrbid: str
756
757
    :param session: Requests session object, default is created on the fly.
758
    :type session: requests.Session()
759
    """
760 5
    baseurl = "https://www.ptcrb.com/certified-devices/device-details/?model={0}".format(ptcrbid)
761 5
    sess = generic_session(session)
762 5
    useragent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/61.0.3163.79 Safari/537.36"
763 5
    sess.headers.update({"User-agent": useragent})
764 5
    soup = generic_soup_parser(baseurl, sess)
765 5
    certtable = soup.find_all("table")[1]
766 5
    tds = certtable.find_all("td")[1::2]  # every other
767 5
    prelimlist = [tdx.text for tdx in tds]
768 5
    cleanlist = [ptcrb_item_cleaner(item.strip()) for item in prelimlist]
769 5
    return cleanlist
770
771
772 5
def space_pad(instring, minlength):
773
    """
774
    Pad a string with spaces until it's the minimum length.
775
776
    :param instring: String to pad.
777
    :type instring: str
778
779
    :param minlength: Pad while len(instring) < minlength.
780
    :type minlength: int
781
    """
782 5
    while len(instring) < minlength:
783 5
        instring += " "
784 5
    return instring
785
786
787 5
def ptcrb_cleaner_multios(item):
788
    """
789
    Discard multiple entries for "OS".
790
791
    :param item: The item to clean.
792
    :type item: str
793
    """
794 5
    if item.count("OS") > 1:
795 5
        templist = item.split("OS")
796 5
        templist[0] = "OS"
797 5
        item = "".join([templist[0], templist[1]])
798 5
    return item
799
800
801 5
def ptcrb_cleaner_spaces(item):
802
    """
803
    Pad item with spaces to the right length.
804
805
    :param item: The item to clean.
806
    :type item: str
807
    """
808 5
    spaclist = item.split(" ")
809 5
    if len(spaclist) > 1:
810 5
        spaclist[1] = space_pad(spaclist[1], 11)
811 5
    if len(spaclist) > 3:
812 5
        spaclist[3] = space_pad(spaclist[3], 11)
813 5
    item = " ".join(spaclist)
814 5
    return item
815
816
817 5
def ptcrb_item_cleaner(item):
818
    """
819
    Cleanup poorly formatted PTCRB entries written by an intern.
820
821
    :param item: The item to clean.
822
    :type item: str
823
    """
824 5
    item = item.replace("<td>", "")
825 5
    item = item.replace("</td>", "")
826 5
    item = item.replace("\n", "")
827 5
    item = item.replace("SW: OS", "OS")
828 5
    item = item.replace("Software Version: OS", "OS")
829 5
    item = item.replace(" (SR", ", SR")
830 5
    item = re.sub(r"\s?\((.*)$", "", item)
831 5
    item = re.sub(r"\sSV.*$", "", item)
832 5
    item = item.replace(")", "")
833 5
    item = item.replace(". ", ".")
834 5
    item = item.replace(";", "")
835 5
    item = item.replace("version", "Version")
836 5
    item = item.replace("Verison", "Version")
837 5
    item = ptcrb_cleaner_multios(item)
838 5
    item = item.replace("SR10", "SR 10")
839 5
    item = item.replace("SR", "SW Release")
840 5
    item = item.replace(" Version:", ":")
841 5
    item = item.replace("Version ", " ")
842 5
    item = item.replace(":1", ": 1")
843 5
    item = item.replace(", ", " ")
844 5
    item = item.replace(",", " ")
845 5
    item = item.replace("Software", "SW")
846 5
    item = item.replace("  ", " ")
847 5
    item = item.replace("OS ", "OS: ")
848 5
    item = item.replace("Radio ", "Radio: ")
849 5
    item = item.replace("Release ", "Release: ")
850 5
    item = ptcrb_cleaner_spaces(item)
851 5
    item = item.strip()
852 5
    item = item.replace("\r", "")
853 5
    if item.startswith("10"):
854 5
        item = "OS: {0}".format(item)
855 5
    item = item.replace(":    ", ": ")
856 5
    item = item.replace(":   ", ": ")
857 5
    return item
858
859
860 5
@pem_wrapper
861 5
def kernel_scraper(utils=False, session=None):
862
    """
863
    Scrape BlackBerry's GitHub kernel repo for available branches.
864
865
    :param utils: Check android-utils repo instead of android-linux-kernel. Default is False.
866
    :type utils: bool
867
868
    :param session: Requests session object, default is created on the fly.
869
    :type session: requests.Session()
870
    """
871 5
    repo = "android-utils" if utils else "android-linux-kernel"
872 5
    kernlist = []
873 5
    sess = generic_session(session)
874 5
    for page in range(1, 10):
875 5
        url = "https://github.com/blackberry/{0}/branches/all?page={1}".format(repo, page)
876 5
        soup = generic_soup_parser(url, sess)
877 5
        if soup.find("div", {"class": "no-results-message"}):
878 5
            break
879
        else:
880 5
            text = soup.get_text()
881 5
            kernlist.extend(re.findall(r"msm[0-9]{4}\/[A-Z0-9]{6}", text, re.IGNORECASE))
882 5
    return kernlist
883
884
885 5
def root_generator(folder, build, variant="common"):
886
    """
887
    Generate roots for the SHAxxx hash lookup URLs.
888
889
    :param folder: Dictionary of variant: loader name pairs.
890
    :type folder: dict(str: str)
891
892
    :param build: Build to check, 3 letters + 3 numbers.
893
    :type build: str
894
895
    :param variant: Autoloader variant. Default is "common".
896
    :type variant: str
897
    """
898
    #Priv specific
899 5
    privx = "bbfoundation/hashfiles_priv/{0}".format(folder[variant])
900
    #DTEK50 specific
901 5
    dtek50x = "bbSupport/DTEK50" if build[:3] == "AAF" else "bbfoundation/hashfiles_priv/dtek50"
902
    #DTEK60 specific
903 5
    dtek60x = dtek50x  # still uses dtek50 folder, for some reason
904
    #Pack it up
905 5
    roots = {"Priv": privx, "DTEK50": dtek50x, "DTEK60": dtek60x}
906 5
    return roots
907
908
909 5
def make_droid_skeleton_bbm(method, build, device, variant="common"):
910
    """
911
    Make an Android autoloader/hash URL, on the BB Mobile site.
912
913
    :param method: None for regular OS links, "sha256/512" for SHA256 or 512 hash.
914
    :type method: str
915
916
    :param build: Build to check, 3 letters + 3 numbers.
917
    :type build: str
918
919
    :param device: Device to check.
920
    :type device: str
921
922
    :param variant: Autoloader variant. Default is "common".
923
    :type variant: str
924
    """
925 5
    devices = {"KEYone": "qc8953", "Motion": "qc8953"}
926 5
    base = "bbry_{2}_autoloader_user-{0}-{1}".format(variant, build.upper(), devices[device])
927 5
    if method is None:
928 5
        skel = "http://54.247.87.13/softwareupgrade/BBM/{0}.zip".format(base)
929
    else:
930 5
        skel = "http://54.247.87.13/softwareupgrade/BBM/{0}.{1}sum".format(base, method.lower())
931 5
    return skel
932
933
934 5
def make_droid_skeleton_og(method, build, device, variant="common"):
935
    """
936
    Make an Android autoloader/hash URL, on the original site.
937
938
    :param method: None for regular OS links, "sha256/512" for SHA256 or 512 hash.
939
    :type method: str
940
941
    :param build: Build to check, 3 letters + 3 numbers.
942
    :type build: str
943
944
    :param device: Device to check.
945
    :type device: str
946
947
    :param variant: Autoloader variant. Default is "common".
948
    :type variant: str
949
    """
950 5
    folder = {"vzw-vzw": "verizon", "na-att": "att", "na-tmo": "tmo", "common": "default"}
951 5
    devices = {"Priv": "qc8992", "DTEK50": "qc8952_64_sfi", "DTEK60": "qc8996"}
952 5
    roots = root_generator(folder, build, variant)
953 5
    base = "bbry_{2}_autoloader_user-{0}-{1}".format(variant, build.upper(), devices[device])
954 5
    if method is None:
955 5
        baseurl = "https://bbapps.download.blackberry.com/Priv"
956 5
        skel = "{1}/{0}.zip".format(base, baseurl)
957
    else:
958 5
        baseurl = "https://ca.blackberry.com/content/dam"
959 5
        skel = "{3}/{1}/{0}.{2}sum".format(base, roots[device], method.lower(), baseurl)
960 5
    return skel
961
962
963 5
def make_droid_skeleton(method, build, device, variant="common"):
964
    """
965
    Make an Android autoloader/hash URL.
966
967
    :param method: None for regular OS links, "sha256/512" for SHA256 or 512 hash.
968
    :type method: str
969
970
    :param build: Build to check, 3 letters + 3 numbers.
971
    :type build: str
972
973
    :param device: Device to check.
974
    :type device: str
975
976
    :param variant: Autoloader variant. Default is "common".
977
    :type variant: str
978
    """
979
    # No Aurora
980 5
    oglist = ("Priv", "DTEK50", "DTEK60")  # BlackBerry
981 5
    bbmlist = ("KEYone", "Motion")   # BB Mobile
982 5
    if device in oglist:
983 5
        skel = make_droid_skeleton_og(method, build, device, variant)
984 5
    elif device in bbmlist:
985 5
        skel = make_droid_skeleton_bbm(method, build, device, variant)
986 5
    return skel
987
988
989 5
def bulk_droid_skeletons(devs, build, method=None):
990
    """
991
    Prepare list of Android autoloader/hash URLs.
992
993
    :param devs: List of devices.
994
    :type devs: list(str)
995
996
    :param build: Build to check, 3 letters + 3 numbers.
997
    :type build: str
998
999
    :param method: None for regular OS links, "sha256/512" for SHA256 or 512 hash.
1000
    :type method: str
1001
    """
1002 5
    carrier_variants = {
1003
        "Priv": ("common", "vzw-vzw", "na-tmo", "na-att"),
1004
        "KEYone": ("common", "usa-sprint", "global-att", "china-china")
1005
    }
1006 5
    common_variants = ("common", )  # for single-variant devices
1007 5
    carrier_devices = ("Priv", )  # add KEYone when verified
1008 5
    skels = []
1009 5
    for dev in devs:
1010 5
        varlist = carrier_variants[dev] if dev in carrier_devices else common_variants
1011 5
        for var in varlist:
1012 5
            skel = make_droid_skeleton(method, build, dev, var)
1013 5
            skels.append(skel)
1014 5
    return skels
1015
1016
1017 5
def prepare_droid_list(device):
1018
    """
1019
    Convert single devices to a list, if necessary.
1020
1021
    :param device: Device to check.
1022
    :type device: str
1023
    """
1024 5
    if isinstance(device, list):
1025 5
        devs = device
1026
    else:
1027 5
        devs = [device]
1028 5
    return devs
1029
1030
1031 5
def droid_scanner(build, device, method=None, session=None):
1032
    """
1033
    Check for Android autoloaders on BlackBerry's site.
1034
1035
    :param build: Build to check, 3 letters + 3 numbers.
1036
    :type build: str
1037
1038
    :param device: Device to check.
1039
    :type device: str
1040
1041
    :param method: None for regular OS links, "sha256/512" for SHA256 or 512 hash.
1042
    :type method: str
1043
1044
    :param session: Requests session object, default is created on the fly.
1045
    :type session: requests.Session()
1046
    """
1047 5
    devs = prepare_droid_list(device)
1048 5
    skels = bulk_droid_skeletons(devs, build, method)
1049 5
    with concurrent.futures.ThreadPoolExecutor(max_workers=len(skels)) as xec:
1050 5
        results = droid_scanner_worker(xec, skels, session)
1051 5
    return results if results else None
1052
1053
1054 5
def droid_scanner_worker(xec, skels, session=None):
1055
    """
1056
    Worker to check for Android autoloaders.
1057
1058
    :param xec: ThreadPoolExecutor instance.
1059
    :type xec: concurrent.futures.ThreadPoolExecutor
1060
1061
    :param skels: List of skeleton formats.
1062
    :type skels: list(str)
1063
1064
    :param session: Requests session object, default is created on the fly.
1065
    :type session: requests.Session()
1066
    """
1067 5
    results = []
1068 5
    for skel in skels:
1069 5
        avail = xec.submit(availability, skel, session)
1070 5
        if avail.result():
1071 5
            results.append(skel)
1072 5
    return results
1073
1074
1075 5
def chunker(iterable, inc):
1076
    """
1077
    Convert an iterable into a list of inc sized lists.
1078
1079
    :param iterable: Iterable to chunk.
1080
    :type iterable: list/tuple/string
1081
1082
    :param inc: Increment; how big each chunk is.
1083
    :type inc: int
1084
    """
1085 5
    chunks = [iterable[x:x+inc] for x in range(0, len(iterable), inc)]
1086 5
    return chunks
1087
1088
1089 5
def unicode_filter(intext):
1090
    """
1091
    Remove Unicode crap.
1092
1093
    :param intext: Text to filter.
1094
    :type intext: str
1095
    """
1096 5
    return intext.replace("\u2013", "").strip()
1097
1098
1099 5
def table_header_filter(ptag):
1100
    """
1101
    Validate p tag, to see if it's relevant.
1102
1103
    :param ptag: P tag.
1104
    :type ptag: bs4.element.Tag
1105
    """
1106 5
    valid = ptag.find("b") and "BlackBerry" in ptag.text and not "experts" in ptag.text
1107 5
    return valid
1108
1109
1110 5
def table_headers(pees):
1111
    """
1112
    Generate table headers from list of p tags.
1113
1114
    :param pees: List of p tags.
1115
    :type pees: list(bs4.element.Tag)
1116
    """
1117 5
    bolds = [x.text for x in pees if table_header_filter(x)]
1118 5
    return bolds
1119
1120
1121 5
@pem_wrapper
1122 5
def loader_page_scraper(session=None):
1123
    """
1124
    Return scraped autoloader pages.
1125
1126
    :param session: Requests session object, default is created on the fly.
1127
    :type session: requests.Session()
1128
    """
1129 5
    session = generic_session(session)
1130 5
    loader_page_scraper_og(session)
1131 5
    loader_page_scraper_bbm(session)
1132
1133
1134 5
def loader_page_scraper_og(session=None):
1135
    """
1136
    Return scraped autoloader page, original site.
1137
1138
    :param session: Requests session object, default is created on the fly.
1139
    :type session: requests.Session()
1140
    """
1141 5
    url = "https://ca.blackberry.com/support/smartphones/Android-OS-Reload.html"
1142 5
    soup = generic_soup_parser(url, session)
1143 5
    tables = soup.find_all("table")
1144 5
    headers = table_headers(soup.find_all("p"))
1145 5
    for idx, table in enumerate(tables):
1146 5
        loader_page_chunker_og(idx, table, headers)
1147
1148
1149 5
def loader_page_scraper_bbm(session=None):
1150
    """
1151
    Return scraped autoloader page, new site.
1152
1153
    :param session: Requests session object, default is created on the fly.
1154
    :type session: requests.Session()
1155
    """
1156 5
    url = "https://www.blackberrymobile.com/support/reload-software/"
1157 5
    soup = generic_soup_parser(url, session)
1158 5
    ulls = soup.find_all("ul", {"class": re.compile("list-two special-.")})[1:]
1159 5
    print("~~~BlackBerry KEYone~~~")
1160 5
    for ull in ulls:
1161 5
        loader_page_chunker_bbm(ull)
1162
1163
1164 5
def loader_page_chunker_og(idx, table, headers):
1165
    """
1166
    Given a loader page table, chunk it into lists of table cells.
1167
1168
    :param idx: Index of enumerating tables.
1169
    :type idx: int
1170
1171
    :param table: HTML table tag.
1172
    :type table: bs4.element.Tag
1173
1174
    :param headers: List of table headers.
1175
    :type headers: list(str)
1176
    """
1177 5
    print("~~~{0}~~~".format(headers[idx]))
1178 5
    chunks = chunker(table.find_all("td"), 4)
1179 5
    for chunk in chunks:
1180 5
        loader_page_printer(chunk)
1181 5
    print(" ")
1182
1183
1184 5
def loader_page_chunker_bbm(ull):
1185
    """
1186
    Given a loader page list, chunk it into lists of list items.
1187
1188
    :param ull: HTML unordered list tag.
1189
    :type ull: bs4.element.Tag
1190
    """
1191 5
    chunks = chunker(ull.find_all("li"), 3)
1192 5
    for chunk in chunks:
1193 5
        loader_page_printer(chunk)
1194
1195
1196 5
def loader_page_printer(chunk):
1197
    """
1198
    Print individual cell texts given a list of table cells.
1199
1200
    :param chunk: List of td tags.
1201
    :type chunk: list(bs4.element.Tag)
1202
    """
1203 5
    key = unicode_filter(chunk[0].text)
1204 5
    ver = unicode_filter(chunk[1].text)
1205 5
    link = unicode_filter(chunk[2].find("a")["href"])
1206 5
    print("{0}\n    {1}: {2}".format(key, ver, link))
1207
1208
1209 5
@pem_wrapper
1210 5
def base_metadata(url, session=None):
1211
    """
1212
    Get BBNDK metadata, base function.
1213
1214
    :param url: URL to check.
1215
    :type url: str
1216
1217
    :param session: Requests session object, default is created on the fly.
1218
    :type session: requests.Session()
1219
    """
1220 5
    session = generic_session(session)
1221 5
    req = session.get(url)
1222 5
    data = req.content
1223 5
    entries = data.split(b"\n")
1224 5
    metadata = [entry.split(b",")[1].decode("utf-8") for entry in entries if entry]
1225 5
    return metadata
1226
1227
1228 5
def base_metadata_url(alternate=None):
1229
    """
1230
    Return metadata URL.
1231
1232
    :param alternate: If the URL is for the simulator metadata. Default is False.
1233
    :type alternate: str
1234
    """
1235 5
    baseurl = "http://downloads.blackberry.com/upr/developers/update/bbndk"
1236 5
    tail = "{0}/{0}_metadata".format(alternate) if alternate is not None else "metadata"
1237 5
    return "{0}/{1}".format(baseurl, tail)
1238
1239
1240 5
def ndk_metadata(session=None):
1241
    """
1242
    Get BBNDK target metadata.
1243
1244
    :param session: Requests session object, default is created on the fly.
1245
    :type session: requests.Session()
1246
    """
1247 5
    ndkurl = base_metadata_url()
1248 5
    data = base_metadata(ndkurl, session)
1249 5
    metadata = [entry for entry in data if entry.startswith(("10.0", "10.1", "10.2"))]
1250 5
    return metadata
1251
1252
1253 5
def sim_metadata(session=None):
1254
    """
1255
    Get BBNDK simulator metadata.
1256
1257
    :param session: Requests session object, default is created on the fly.
1258
    :type session: requests.Session()
1259
    """
1260 5
    simurl = base_metadata_url("simulator")
1261 5
    metadata = base_metadata(simurl, session)
1262 5
    return metadata
1263
1264
1265 5
def runtime_metadata(session=None):
1266
    """
1267
    Get BBNDK runtime metadata.
1268
1269
    :param session: Requests session object, default is created on the fly.
1270
    :type session: requests.Session()
1271
    """
1272 5
    rturl = base_metadata_url("runtime")
1273 5
    metadata = base_metadata(rturl, session)
1274 5
    return metadata
1275
1276
1277 5
def series_generator(osversion):
1278
    """
1279
    Generate series/branch name from OS version.
1280
1281
    :param osversion: OS version.
1282
    :type osversion: str
1283
    """
1284 5
    splits = osversion.split(".")
1285 5
    return "BB{0}_{1}_{2}".format(*splits[0:3])
1286
1287
1288 5
@pem_wrapper
1289 5
def devalpha_urls(osversion, skel, session=None):
1290
    """
1291
    Check individual Dev Alpha autoloader URLs.
1292
1293
    :param osversion: OS version.
1294
    :type osversion: str
1295
1296
    :param skel: Individual skeleton format to try.
1297
    :type skel: str
1298
1299
    :param session: Requests session object, default is created on the fly.
1300
    :type session: requests.Session()
1301
    """
1302 5
    session = generic_session(session)
1303 5
    baseurl = "http://downloads.blackberry.com/upr/developers/downloads"
1304 5
    url = "{2}/{0}{1}.exe".format(skel, osversion, baseurl)
1305 5
    req = session.head(url)
1306 5
    if req.status_code == 200:
1307 5
        finals = (url, req.headers["content-length"])
1308
    else:
1309 5
        finals = ()
1310 5
    return finals
1311
1312
1313 5
def devalpha_urls_serieshandler(osversion, skeletons):
1314
    """
1315
    Process list of candidate Dev Alpha autoloader URLs.
1316
1317
    :param osversion: OS version.
1318
    :type osversion: str
1319
1320
    :param skeletons: List of skeleton formats to try.
1321
    :type skeletons: list
1322
    """
1323 5
    skels = skeletons
1324 5
    for idx, skel in enumerate(skeletons):
1325 5
        if "<SERIES>" in skel:
1326 5
            skels[idx] = skel.replace("<SERIES>", series_generator(osversion))
1327 5
    return skels
1328
1329
1330 5
def devalpha_urls_bulk(osversion, skeletons, xec, session=None):
1331
    """
1332
    Construct list of valid Dev Alpha autoloader URLs.
1333
1334
    :param osversion: OS version.
1335
    :type osversion: str
1336
1337
    :param skeletons: List of skeleton formats to try.
1338
    :type skeletons: list
1339
1340
    :param xec: ThreadPoolExecutor instance.
1341
    :type xec: concurrent.futures.ThreadPoolExecutor
1342
1343
    :param session: Requests session object, default is created on the fly.
1344
    :type session: requests.Session()
1345
    """
1346 5
    finals = {}
1347 5
    skels = devalpha_urls_serieshandler(osversion, skeletons)
1348 5
    for skel in skels:
1349 5
        final = xec.submit(devalpha_urls, osversion, skel, session).result()
1350 5
        if final:
1351 5
            finals[final[0]] = final[1]
1352 5
    return finals
1353
1354
1355 5
def devalpha_urls_bootstrap(osversion, skeletons, session=None):
1356
    """
1357
    Get list of valid Dev Alpha autoloader URLs.
1358
1359
    :param osversion: OS version.
1360
    :type osversion: str
1361
1362
    :param skeletons: List of skeleton formats to try.
1363
    :type skeletons: list
1364
1365
    :param session: Requests session object, default is created on the fly.
1366
    :type session: requests.Session()
1367
    """
1368 5
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as xec:
1369 5
        try:
1370 5
            return devalpha_urls_bulk(osversion, skeletons, xec, session)
1371 5
        except KeyboardInterrupt:
1372 5
            xec.shutdown(wait=False)
1373
1374
1375 5
def dev_dupe_dicter(finals):
1376
    """
1377
    Prepare dictionary to clean duplicate autoloaders.
1378
1379
    :param finals: Dict of URL:content-length pairs.
1380
    :type finals: dict(str: str)
1381
    """
1382 5
    revo = {}
1383 5
    for key, val in finals.items():
1384 5
        revo.setdefault(val, set()).add(key)
1385 5
    return revo
1386
1387
1388 5
def dev_dupe_remover(finals, dupelist):
1389
    """
1390
    Filter dictionary of autoloader entries.
1391
1392
    :param finals: Dict of URL:content-length pairs.
1393
    :type finals: dict(str: str)
1394
1395
    :param dupelist: List of duplicate URLs.
1396
    :type duplist: list(str)
1397
    """
1398 5
    for dupe in dupelist:
1399 5
        for entry in dupe:
1400 5
            if "DevAlpha" in entry:
1401 5
                del finals[entry]
1402 5
    return finals
1403
1404
1405 5
def dev_dupe_cleaner(finals):
1406
    """
1407
    Clean duplicate autoloader entries.
1408
1409
    :param finals: Dict of URL:content-length pairs.
1410
    :type finals: dict(str: str)
1411
    """
1412 5
    revo = dev_dupe_dicter(finals)
1413 5
    dupelist = [val for key, val in revo.items() if len(val) > 1]
1414 5
    finals = dev_dupe_remover(finals, dupelist)
1415
    return finals
1416