Passed
Push — master ( 274f62...7831d2 )
by John
06:44
created

bbarchivist.networkutils.download_request_prep()   B

Complexity

Conditions 2

Size

Total Lines 37
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 7
CRAP Score 2

Importance

Changes 0
Metric Value
cc 2
eloc 7
nop 9
dl 0
loc 37
ccs 7
cts 7
cp 1
crap 2
rs 8.8571
c 0
b 0
f 0

How to fix   Many Parameters   

Many Parameters

Methods with many parameters are not only hard to understand, but their parameters also often become inconsistent when you need more, or different data.

There are several approaches to avoid long parameter lists:

1
#!/usr/bin/env python3
0 ignored issues
show
coding-style introduced by
Too many lines in module (1128/1000)
Loading history...
2 5
"""This module is used for network connections; APIs, downloading, etc."""
3
4 5
import concurrent.futures  # multiprocessing/threading
5 5
import glob  # pem file lookup
6 5
import os  # filesystem read
7 5
import re  # regexes
8
9 5
import requests  # downloading
10 5
from bs4 import BeautifulSoup  # scraping
11 5
from bbarchivist import utilities  # parse filesize
12 5
from bbarchivist import xmlutils  # xml work
13 5
from bbarchivist.bbconstants import SERVERS  # lookup servers
14
15 5
__author__ = "Thurask"
16 5
__license__ = "WTFPL v2"
17 5
__copyright__ = "2015-2018 Thurask"
18
19
20 5
def grab_pem():
21
    """
22
    Work with either local cacerts or system cacerts.
23
    """
24 5
    try:
25 5
        pemfile = glob.glob(os.path.join(os.getcwd(), "cacert.pem"))[0]
26 5
    except IndexError:
27 5
        return requests.certs.where()  # no local cacerts
28
    else:
29 5
        return os.path.abspath(pemfile)  # local cacerts
30
31
32 5
def pem_wrapper(method):
33
    """
34
    Decorator to set REQUESTS_CA_BUNDLE.
35
36
    :param method: Method to use.
37
    :type method: function
38
    """
39 5
    def wrapper(*args, **kwargs):
40
        """
41
        Set REQUESTS_CA_BUNDLE before doing function.
42
        """
43 5
        os.environ["REQUESTS_CA_BUNDLE"] = grab_pem()
44 5
        return method(*args, **kwargs)
45 5
    return wrapper
46
47
48 5
def try_try_again(method):
49
    """
50
    Decorator to absorb timeouts, proxy errors, and other common exceptions.
51
52
    :param method: Method to use.
53
    :type method: function
54
    """
55 5
    def wrapper(*args, **kwargs):
56
        """
57
        Try function, try it again up to five times, and leave gracefully.
58
        """
59 5
        tries = 5
60 5
        for _ in range(tries):
61 5
            try:
62 5
                result = method(*args, **kwargs)
63 5
            except (requests.exceptions.Timeout, requests.exceptions.ConnectionError, requests.exceptions.ProxyError):
64 5
                continue
65
            else:
66 5
                break
67
        else:
68 5
            result = None
69 5
        return result
70 5
    return wrapper
71
72
73 5
def generic_session(session=None):
74
    """
75
    Create a Requests session object on the fly, if need be.
76
77
    :param session: Requests session object, created if this is None.
78
    :type session: requests.Session()
79
    """
80 5
    sess = requests.Session() if session is None else session
81 5
    return sess
82
83
84 5
def generic_soup_parser(url, session=None):
85
    """
86
    Get a BeautifulSoup HTML parser for some URL.
87
88
    :param url: The URL to check.
89
    :type url: str
90
91
    :param session: Requests session object, default is created on the fly.
92
    :type session: requests.Session()
93
    """
94 5
    session = generic_session(session)
95 5
    req = session.get(url)
96 5
    soup = BeautifulSoup(req.content, "html.parser")
97 5
    return soup
98
99
100 5
@pem_wrapper
101 5
def get_length(url, session=None):
102
    """
103
    Get content-length header from some URL.
104
105
    :param url: The URL to check.
106
    :type url: str
107
108
    :param session: Requests session object, default is created on the fly.
109
    :type session: requests.Session()
110
    """
111 5
    session = generic_session(session)
112 5
    if url is None:
113 5
        return 0
114 5
    try:
115 5
        heads = session.head(url)
116 5
        fsize = heads.headers['content-length']
117 5
        return int(fsize)
118 5
    except requests.ConnectionError:
119 5
        return 0
120
121
122 5
@pem_wrapper
123 5
def download(url, output_directory=None, session=None):
124
    """
125
    Download file from given URL.
126
127
    :param url: URL to download from.
128
    :type url: str
129
130
    :param output_directory: Download folder. Default is local.
131
    :type output_directory: str
132
133
    :param session: Requests session object, default is created on the fly.
134
    :type session: requests.Session()
135
    """
136 5
    session = generic_session(session)
137 5
    output_directory = utilities.dirhandler(output_directory, os.getcwd())
138 5
    lfname = url.split('/')[-1]
139 5
    sname = utilities.stripper(lfname)
140 5
    fname = os.path.join(output_directory, lfname)
141 5
    download_writer(url, fname, lfname, sname, session)
142 5
    remove_empty_download(fname)
143
144
145 5
def remove_empty_download(fname):
146
    """
147
    Remove file if it's empty.
148
149
    :param fname: File path.
150
    :type fname: str
151
    """
152 5
    if os.stat(fname).st_size == 0:
153 5
        os.remove(fname)
154
155
156 5
def download_writer(url, fname, lfname, sname, session=None):
157
    """
158
    Download file and write to disk.
159
160
    :param url: URL to download from.
161
    :type url: str
162
163
    :param fname: File path.
164
    :type fname: str
165
166
    :param lfname: Long filename.
167
    :type lfname: str
168
169
    :param sname: Short name, for printing to screen.
170
    :type sname: str
171
172
    :param session: Requests session object, default is created on the fly.
173
    :type session: requests.Session()
174
    """
175 5
    with open(fname, "wb") as file:
176 5
        req = session.get(url, stream=True)
177 5
        clength = req.headers['content-length']
178 5
        fsize = utilities.fsizer(clength)
179 5
        if req.status_code == 200:  # 200 OK
180 5
            print("DOWNLOADING {0} [{1}]".format(sname, fsize))
181 5
            for chunk in req.iter_content(chunk_size=1024):
182 5
                file.write(chunk)
183
        else:
184 5
            print("ERROR: HTTP {0} IN {1}".format(req.status_code, lfname))
185
186
187 5
def download_bootstrap(urls, outdir=None, workers=5, session=None):
188
    """
189
    Run downloaders for each file in given URL iterable.
190
191
    :param urls: URLs to download.
192
    :type urls: list
193
194
    :param outdir: Download folder. Default is handled in :func:`download`.
195
    :type outdir: str
196
197
    :param workers: Number of worker processes. Default is 5.
198
    :type workers: int
199
200
    :param session: Requests session object, default is created on the fly.
201
    :type session: requests.Session()
202
    """
203 5
    workers = len(urls) if len(urls) < workers else workers
204 5
    spinman = utilities.SpinManager()
205 5
    with concurrent.futures.ThreadPoolExecutor(max_workers=workers) as xec:
206 5
        try:
207 5
            spinman.start()
208 5
            for url in urls:
209 5
                xec.submit(download, url, outdir, session)
210 5
        except (KeyboardInterrupt, SystemExit):
211 5
            xec.shutdown()
212 5
            spinman.stop()
213 5
    spinman.stop()
214 5
    utilities.spinner_clear()
215 5
    utilities.line_begin()
216
217
218 5
def download_android_tools(downloaddir=None):
219
    """
220
    Download Android SDK platform tools.
221
222
    :param downloaddir: Directory name, default is "plattools".
223
    :type downloaddir: str
224
    """
225 5
    if downloaddir is None:
226 5
        downloaddir = "plattools"
227 5
    if os.path.exists(downloaddir):
228 5
        os.removedirs(downloaddir)
229 5
    os.mkdir(downloaddir)
230 5
    platforms = ("windows", "linux", "darwin")
231 5
    baseurl = "https://dl.google.com/android/repository/platform-tools-latest"
232 5
    dlurls = ["{1}-{0}.zip".format(plat, baseurl) for plat in platforms]
233 5
    sess = generic_session()
234 5
    download_bootstrap(dlurls, outdir="plattools", session=sess)
235
236
237 5
@pem_wrapper
238 5
def getcode(url, session=None):
239
    """
240
    Return status code of given URL.
241
242
    :param url: URL to check.
243
    :type url: str
244
245
    :param session: Requests session object, default is created on the fly.
246
    :type session: requests.Session()
247
    """
248 5
    session = generic_session(session)
249 5
    try:
250 5
        shead = session.head(url)
251 5
        status = int(shead.status_code)
252 5
        return status
253 5
    except requests.ConnectionError:
254 5
        return 404
255
256
257 5
@pem_wrapper
258 5
def availability(url, session=None):
259
    """
260
    Check HTTP status code of given URL.
261
    200 or 301-308 is OK, else is not.
262
263
    :param url: URL to check.
264
    :type url: str
265
266
    :param session: Requests session object, default is created on the fly.
267
    :type session: requests.Session()
268
    """
269 5
    status = getcode(url, session)
270 5
    return status == 200 or 300 < status <= 308
271
272
273 5
def clean_availability(results, server):
274
    """
275
    Clean availability for autolookup script.
276
277
    :param results: Result dict.
278
    :type results: dict(str: str)
279
280
    :param server: Server, key for result dict.
281
    :type server: str
282
    """
283 5
    marker = "PD" if server == "p" else server.upper()
284 5
    rel = results[server.lower()]
285 5
    avail = marker if rel != "SR not in system" and rel is not None else "  "
286 5
    return rel, avail
287
288
289 5
@pem_wrapper
290 5
def carrier_checker(mcc, mnc, session=None):
291
    """
292
    Query BlackBerry World to map a MCC and a MNC to a country and carrier.
293
294
    :param mcc: Country code.
295
    :type mcc: int
296
297
    :param mnc: Network code.
298
    :type mnc: int
299
300
    :param session: Requests session object, default is created on the fly.
301
    :type session: requests.Session()
302
    """
303 5
    session = generic_session(session)
304 5
    baseurl = "http://appworld.blackberry.com/ClientAPI/checkcarrier"
305 5
    url = "{2}?homemcc={0}&homemnc={1}&devicevendorid=-1&pin=0".format(mcc, mnc, baseurl)
306 5
    user_agent = {'User-agent': 'AppWorld/5.1.0.60'}
307 5
    req = session.get(url, headers=user_agent)
308 5
    country, carrier = xmlutils.cchecker_get_tags(req.text)
309 5
    return country, carrier
310
311
312 5
def return_npc(mcc, mnc):
313
    """
314
    Format MCC and MNC into a NPC.
315
316
    :param mcc: Country code.
317
    :type mcc: int
318
319
    :param mnc: Network code.
320
    :type mnc: int
321
    """
322 5
    return "{0}{1}30".format(str(mcc).zfill(3), str(mnc).zfill(3))
323
324
325 5
@pem_wrapper
326 5
def carrier_query(npc, device, upgrade=False, blitz=False, forced=None, session=None):
327
    """
328
    Query BlackBerry servers, check which update is out for a carrier.
329
330
    :param npc: MCC + MNC (see `func:return_npc`)
331
    :type npc: int
332
333
    :param device: Hexadecimal hardware ID.
334
    :type device: str
335
336
    :param upgrade: Whether to use upgrade files. False by default.
337
    :type upgrade: bool
338
339
    :param blitz: Whether or not to create a blitz package. False by default.
340
    :type blitz: bool
341
342
    :param forced: Force a software release.
343
    :type forced: str
344
345
    :param session: Requests session object, default is created on the fly.
346
    :type session: requests.Session()
347
    """
348 5
    session = generic_session(session)
349 5
    upg = "upgrade" if upgrade else "repair"
350 5
    forced = "latest" if forced is None else forced
351 5
    url = "https://cs.sl.blackberry.com/cse/updateDetails/2.2/"
352 5
    query = xmlutils.prep_carrier_query(npc, device, upg, forced)
353 5
    header = {"Content-Type": "text/xml;charset=UTF-8"}
354 5
    req = session.post(url, headers=header, data=query)
355 5
    return xmlutils.parse_carrier_xml(req.text, blitz)
356
357
358 5
@pem_wrapper
359 5
def sr_lookup(osver, server, session=None):
360
    """
361
    Software release lookup, with choice of server.
362
    :data:`bbarchivist.bbconstants.SERVERLIST` for server list.
363
364
    :param osver: OS version to lookup, 10.x.y.zzzz.
365
    :type osver: str
366
367
    :param server: Server to use.
368
    :type server: str
369
370
    :param session: Requests session object, default is created on the fly.
371
    :type session: requests.Session()
372
    """
373 5
    query = xmlutils.prep_sr_lookup(osver)
374 5
    reqtext = sr_lookup_poster(query, server, session)
375 5
    packtext = xmlutils.parse_sr_lookup(reqtext)
376 5
    return packtext
377
378
379 5
def sr_lookup_poster(query, server, session=None):
380
    """
381
    Post the XML payload for a software release lookup.
382
383
    :param query: XML payload.
384
    :type query: str
385
386
    :param server: Server to use.
387
    :type server: str
388
389
    :param session: Requests session object, default is created on the fly.
390
    :type session: requests.Session()
391
    """
392 5
    session = generic_session(session)
393 5
    header = {"Content-Type": "text/xml;charset=UTF-8"}
394 5
    try:
395 5
        req = session.post(server, headers=header, data=query, timeout=1)
396 5
    except (requests.exceptions.Timeout, requests.exceptions.ConnectionError):
397 5
        reqtext = "SR not in system"
398
    else:
399 5
        reqtext = req.text
400 5
    return reqtext
401
402
403 5
def sr_lookup_bootstrap(osv, session=None, no2=False):
404
    """
405
    Run lookups for each server for given OS.
406
407
    :param osv: OS to check.
408
    :type osv: str
409
410
    :param session: Requests session object, default is created on the fly.
411
    :type session: requests.Session()
412
413
    :param no2: Whether to skip Alpha2/Beta2 servers. Default is false.
414
    :type no2: bool
415
    """
416 5
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as xec:
417 5
        try:
418 5
            results = {
419
                "p": None,
420
                "a1": None,
421
                "a2": None,
422
                "b1": None,
423
                "b2": None
424
            }
425 5
            if no2:
426 5
                del results["a2"]
427 5
                del results["b2"]
428 5
            for key in results:
429 5
                results[key] = xec.submit(sr_lookup, osv, SERVERS[key], session).result()
430 5
            return results
431 5
        except KeyboardInterrupt:
432 5
            xec.shutdown(wait=False)
433
434
435 5
@pem_wrapper
436 5
def available_bundle_lookup(mcc, mnc, device, session=None):
437
    """
438
    Check which software releases were ever released for a carrier.
439
440
    :param mcc: Country code.
441
    :type mcc: int
442
443
    :param mnc: Network code.
444
    :type mnc: int
445
446
    :param device: Hexadecimal hardware ID.
447
    :type device: str
448
449
    :param session: Requests session object, default is created on the fly.
450
    :type session: requests.Session()
451
    """
452 5
    session = generic_session(session)
453 5
    server = "https://cs.sl.blackberry.com/cse/availableBundles/1.0.0/"
454 5
    npc = return_npc(mcc, mnc)
455 5
    query = xmlutils.prep_available_bundle(device, npc)
456 5
    header = {"Content-Type": "text/xml;charset=UTF-8"}
457 5
    req = session.post(server, headers=header, data=query)
458 5
    bundlelist = xmlutils.parse_available_bundle(req.text)
459 5
    return bundlelist
460
461
462 5
@pem_wrapper
463 5
def ptcrb_scraper(ptcrbid, session=None):
464
    """
465
    Get the PTCRB results for a given device.
466
467
    :param ptcrbid: Numerical ID from PTCRB (end of URL).
468
    :type ptcrbid: str
469
470
    :param session: Requests session object, default is created on the fly.
471
    :type session: requests.Session()
472
    """
473 5
    baseurl = "https://www.ptcrb.com/certified-devices/device-details/?model={0}".format(ptcrbid)
474 5
    sess = generic_session(session)
475 5
    useragent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/61.0.3163.79 Safari/537.36"
476 5
    sess.headers.update({"User-agent": useragent})
477 5
    soup = generic_soup_parser(baseurl, sess)
478 5
    certtable = soup.find_all("table")[1]
479 5
    tds = certtable.find_all("td")[1::2]  # every other
480 5
    prelimlist = [tdx.text for tdx in tds]
481 5
    cleanlist = [ptcrb_item_cleaner(item.strip()) for item in prelimlist]
482 5
    return cleanlist
483
484
485 5
def space_pad(instring, minlength):
486
    """
487
    Pad a string with spaces until it's the minimum length.
488
489
    :param instring: String to pad.
490
    :type instring: str
491
492
    :param minlength: Pad while len(instring) < minlength.
493
    :type minlength: int
494
    """
495 5
    while len(instring) < minlength:
496 5
        instring += " "
497 5
    return instring
498
499
500 5
def ptcrb_cleaner_multios(item):
501
    """
502
    Discard multiple entries for "OS".
503
504
    :param item: The item to clean.
505
    :type item: str
506
    """
507 5
    if item.count("OS") > 1:
508 5
        templist = item.split("OS")
509 5
        templist[0] = "OS"
510 5
        item = "".join([templist[0], templist[1]])
511 5
    return item
512
513
514 5
def ptcrb_cleaner_spaces(item):
515
    """
516
    Pad item with spaces to the right length.
517
518
    :param item: The item to clean.
519
    :type item: str
520
    """
521 5
    spaclist = item.split(" ")
522 5
    if len(spaclist) > 1:
523 5
        spaclist[1] = space_pad(spaclist[1], 11)
524 5
    if len(spaclist) > 3:
525 5
        spaclist[3] = space_pad(spaclist[3], 11)
526 5
    item = " ".join(spaclist)
527 5
    return item
528
529
530 5
def ptcrb_item_cleaner(item):
531
    """
532
    Cleanup poorly formatted PTCRB entries written by an intern.
533
534
    :param item: The item to clean.
535
    :type item: str
536
    """
537 5
    item = item.replace("<td>", "")
538 5
    item = item.replace("</td>", "")
539 5
    item = item.replace("\n", "")
540 5
    item = item.replace("SW: OS", "OS")
541 5
    item = item.replace("Software Version: OS", "OS")
542 5
    item = item.replace(" (SR", ", SR")
543 5
    item = re.sub(r"\s?\((.*)$", "", item)
544 5
    item = re.sub(r"\sSV.*$", "", item)
545 5
    item = item.replace(")", "")
546 5
    item = item.replace(". ", ".")
547 5
    item = item.replace(";", "")
548 5
    item = item.replace("version", "Version")
549 5
    item = item.replace("Verison", "Version")
550 5
    item = ptcrb_cleaner_multios(item)
551 5
    item = item.replace("SR10", "SR 10")
552 5
    item = item.replace("SR", "SW Release")
553 5
    item = item.replace(" Version:", ":")
554 5
    item = item.replace("Version ", " ")
555 5
    item = item.replace(":1", ": 1")
556 5
    item = item.replace(", ", " ")
557 5
    item = item.replace(",", " ")
558 5
    item = item.replace("Software", "SW")
559 5
    item = item.replace("  ", " ")
560 5
    item = item.replace("OS ", "OS: ")
561 5
    item = item.replace("Radio ", "Radio: ")
562 5
    item = item.replace("Release ", "Release: ")
563 5
    item = ptcrb_cleaner_spaces(item)
564 5
    item = item.strip()
565 5
    item = item.replace("\r", "")
566 5
    if item.startswith("10"):
567 5
        item = "OS: {0}".format(item)
568 5
    item = item.replace(":    ", ": ")
569 5
    item = item.replace(":   ", ": ")
570 5
    return item
571
572
573 5
@pem_wrapper
574 5
def kernel_scraper(utils=False, session=None):
575
    """
576
    Scrape BlackBerry's GitHub kernel repo for available branches.
577
578
    :param utils: Check android-utils repo instead of android-linux-kernel. Default is False.
579
    :type utils: bool
580
581
    :param session: Requests session object, default is created on the fly.
582
    :type session: requests.Session()
583
    """
584 5
    repo = "android-utils" if utils else "android-linux-kernel"
585 5
    kernlist = []
586 5
    sess = generic_session(session)
587 5
    for page in range(1, 10):
588 5
        url = "https://github.com/blackberry/{0}/branches/all?page={1}".format(repo, page)
589 5
        soup = generic_soup_parser(url, sess)
590 5
        if soup.find("div", {"class": "no-results-message"}):
591 5
            break
592
        else:
593 5
            text = soup.get_text()
594 5
            kernlist.extend(re.findall(r"msm[0-9]{4}\/[A-Z0-9]{6}", text, re.IGNORECASE))
595 5
    return kernlist
596
597
598 5
def root_generator(folder, build, variant="common"):
599
    """
600
    Generate roots for the SHAxxx hash lookup URLs.
601
602
    :param folder: Dictionary of variant: loader name pairs.
603
    :type folder: dict(str: str)
604
605
    :param build: Build to check, 3 letters + 3 numbers.
606
    :type build: str
607
608
    :param variant: Autoloader variant. Default is "common".
609
    :type variant: str
610
    """
611
    #Priv specific
612 5
    privx = "bbfoundation/hashfiles_priv/{0}".format(folder[variant])
613
    #DTEK50 specific
614 5
    dtek50x = "bbSupport/DTEK50" if build[:3] == "AAF" else "bbfoundation/hashfiles_priv/dtek50"
615
    #DTEK60 specific
616 5
    dtek60x = dtek50x  # still uses dtek50 folder, for some reason
617
    #Pack it up
618 5
    roots = {"Priv": privx, "DTEK50": dtek50x, "DTEK60": dtek60x}
619 5
    return roots
620
621
622 5
def make_droid_skeleton_bbm(method, build, device, variant="common"):
623
    """
624
    Make an Android autoloader/hash URL, on the BB Mobile site.
625
626
    :param method: None for regular OS links, "sha256/512" for SHA256 or 512 hash.
627
    :type method: str
628
629
    :param build: Build to check, 3 letters + 3 numbers.
630
    :type build: str
631
632
    :param device: Device to check.
633
    :type device: str
634
635
    :param variant: Autoloader variant. Default is "common".
636
    :type variant: str
637
    """
638 5
    devices = {"KEYone": "qc8953", "Motion": "qc8953krypton", "KEY2": "sdm660"}
639 5
    base = "bbry_{2}_autoloader_user-{0}-{1}".format(variant, build.upper(), devices[device])
640 5
    if method is None:
641 5
        skel = "http://54.247.87.13/softwareupgrade/BBM/{0}.zip".format(base)
642
    else:
643 5
        skel = "http://54.247.87.13/softwareupgrade/BBM/{0}.{1}sum".format(base, method.lower())
644 5
    return skel
645
646
647 5
def make_droid_skeleton_og(method, build, device, variant="common"):
648
    """
649
    Make an Android autoloader/hash URL, on the original site.
650
651
    :param method: None for regular OS links, "sha256/512" for SHA256 or 512 hash.
652
    :type method: str
653
654
    :param build: Build to check, 3 letters + 3 numbers.
655
    :type build: str
656
657
    :param device: Device to check.
658
    :type device: str
659
660
    :param variant: Autoloader variant. Default is "common".
661
    :type variant: str
662
    """
663 5
    folder = {"vzw-vzw": "verizon", "na-att": "att", "na-tmo": "tmo", "common": "default"}
664 5
    devices = {"Priv": "qc8992", "DTEK50": "qc8952_64_sfi", "DTEK60": "qc8996"}
665 5
    roots = root_generator(folder, build, variant)
666 5
    base = "bbry_{2}_autoloader_user-{0}-{1}".format(variant, build.upper(), devices[device])
667 5
    if method is None:
668 5
        baseurl = "https://bbapps.download.blackberry.com/Priv"
669 5
        skel = "{1}/{0}.zip".format(base, baseurl)
670
    else:
671 5
        baseurl = "https://ca.blackberry.com/content/dam"
672 5
        skel = "{3}/{1}/{0}.{2}sum".format(base, roots[device], method.lower(), baseurl)
673 5
    return skel
674
675
676 5
def make_droid_skeleton(method, build, device, variant="common"):
677
    """
678
    Make an Android autoloader/hash URL.
679
680
    :param method: None for regular OS links, "sha256/512" for SHA256 or 512 hash.
681
    :type method: str
682
683
    :param build: Build to check, 3 letters + 3 numbers.
684
    :type build: str
685
686
    :param device: Device to check.
687
    :type device: str
688
689
    :param variant: Autoloader variant. Default is "common".
690
    :type variant: str
691
    """
692
    # No Aurora
693 5
    oglist = ("Priv", "DTEK50", "DTEK60")  # BlackBerry
694 5
    bbmlist = ("KEYone", "Motion", "KEY2")   # BB Mobile
695 5
    if device in oglist:
696 5
        skel = make_droid_skeleton_og(method, build, device, variant)
697 5
    elif device in bbmlist:
698 5
        skel = make_droid_skeleton_bbm(method, build, device, variant)
699 5
    return skel
700
701
702 5
def bulk_droid_skeletons(devs, build, method=None):
703
    """
704
    Prepare list of Android autoloader/hash URLs.
705
706
    :param devs: List of devices.
707
    :type devs: list(str)
708
709
    :param build: Build to check, 3 letters + 3 numbers.
710
    :type build: str
711
712
    :param method: None for regular OS links, "sha256/512" for SHA256 or 512 hash.
713
    :type method: str
714
    """
715 5
    carrier_variants = {
716
        "Priv": ("common", "vzw-vzw", "na-tmo", "na-att"),
717
        "KEYone": ("common", "usa-sprint", "global-att", "china-china")
718
    }
719 5
    common_variants = ("common", )  # for single-variant devices
720 5
    carrier_devices = ("Priv", )  # add KEYone when verified
721 5
    skels = []
722 5
    for dev in devs:
723 5
        varlist = carrier_variants[dev] if dev in carrier_devices else common_variants
724 5
        for var in varlist:
725 5
            skel = make_droid_skeleton(method, build, dev, var)
726 5
            skels.append(skel)
727 5
    return skels
728
729
730 5
def prepare_droid_list(device):
731
    """
732
    Convert single devices to a list, if necessary.
733
734
    :param device: Device to check.
735
    :type device: str
736
    """
737 5
    if isinstance(device, list):
738 5
        devs = device
739
    else:
740 5
        devs = [device]
741 5
    return devs
742
743
744 5
def droid_scanner(build, device, method=None, session=None):
745
    """
746
    Check for Android autoloaders on BlackBerry's site.
747
748
    :param build: Build to check, 3 letters + 3 numbers.
749
    :type build: str
750
751
    :param device: Device to check.
752
    :type device: str
753
754
    :param method: None for regular OS links, "sha256/512" for SHA256 or 512 hash.
755
    :type method: str
756
757
    :param session: Requests session object, default is created on the fly.
758
    :type session: requests.Session()
759
    """
760 5
    devs = prepare_droid_list(device)
761 5
    skels = bulk_droid_skeletons(devs, build, method)
762 5
    with concurrent.futures.ThreadPoolExecutor(max_workers=len(skels)) as xec:
763 5
        results = droid_scanner_worker(xec, skels, session)
764 5
    return results if results else None
765
766
767 5
def droid_scanner_worker(xec, skels, session=None):
768
    """
769
    Worker to check for Android autoloaders.
770
771
    :param xec: ThreadPoolExecutor instance.
772
    :type xec: concurrent.futures.ThreadPoolExecutor
773
774
    :param skels: List of skeleton formats.
775
    :type skels: list(str)
776
777
    :param session: Requests session object, default is created on the fly.
778
    :type session: requests.Session()
779
    """
780 5
    results = []
781 5
    for skel in skels:
782 5
        avail = xec.submit(availability, skel, session)
783 5
        if avail.result():
784 5
            results.append(skel)
785 5
    return results
786
787
788 5
def chunker(iterable, inc):
789
    """
790
    Convert an iterable into a list of inc sized lists.
791
792
    :param iterable: Iterable to chunk.
793
    :type iterable: list/tuple/string
794
795
    :param inc: Increment; how big each chunk is.
796
    :type inc: int
797
    """
798 5
    chunks = [iterable[x:x+inc] for x in range(0, len(iterable), inc)]
799 5
    return chunks
800
801
802 5
def unicode_filter(intext):
803
    """
804
    Remove Unicode crap.
805
806
    :param intext: Text to filter.
807
    :type intext: str
808
    """
809 5
    return intext.replace("\u2013", "").strip()
810
811
812 5
def table_header_filter(ptag):
813
    """
814
    Validate p tag, to see if it's relevant.
815
816
    :param ptag: P tag.
817
    :type ptag: bs4.element.Tag
818
    """
819 5
    valid = ptag.find("b") and "BlackBerry" in ptag.text and not "experts" in ptag.text
820 5
    return valid
821
822
823 5
def table_headers(pees):
824
    """
825
    Generate table headers from list of p tags.
826
827
    :param pees: List of p tags.
828
    :type pees: list(bs4.element.Tag)
829
    """
830 5
    bolds = [x.text for x in pees if table_header_filter(x)]
831 5
    return bolds
832
833
834 5
@pem_wrapper
835 5
def loader_page_scraper(session=None):
836
    """
837
    Return scraped autoloader pages.
838
839
    :param session: Requests session object, default is created on the fly.
840
    :type session: requests.Session()
841
    """
842 5
    session = generic_session(session)
843 5
    loader_page_scraper_og(session)
844 5
    loader_page_scraper_bbm(session)
845
846
847 5
def loader_page_scraper_og(session=None):
848
    """
849
    Return scraped autoloader page, original site.
850
851
    :param session: Requests session object, default is created on the fly.
852
    :type session: requests.Session()
853
    """
854 5
    url = "https://ca.blackberry.com/support/smartphones/Android-OS-Reload.html"
855 5
    soup = generic_soup_parser(url, session)
856 5
    tables = soup.find_all("table")
857 5
    headers = table_headers(soup.find_all("p"))
858 5
    for idx, table in enumerate(tables):
859 5
        loader_page_chunker_og(idx, table, headers)
860
861
862 5
def loader_page_scraper_bbm(session=None):
863
    """
864
    Return scraped autoloader page, new site.
865
866
    :param session: Requests session object, default is created on the fly.
867
    :type session: requests.Session()
868
    """
869 5
    url = "https://www.blackberrymobile.com/support/reload-software/"
870 5
    soup = generic_soup_parser(url, session)
871 5
    ulls = soup.find_all("ul", {"class": re.compile("list-two special-.")})[1:]
872 5
    print("~~~BlackBerry KEYone~~~")
873 5
    for ull in ulls:
874 5
        loader_page_chunker_bbm(ull)
875
876
877 5
def loader_page_chunker_og(idx, table, headers):
878
    """
879
    Given a loader page table, chunk it into lists of table cells.
880
881
    :param idx: Index of enumerating tables.
882
    :type idx: int
883
884
    :param table: HTML table tag.
885
    :type table: bs4.element.Tag
886
887
    :param headers: List of table headers.
888
    :type headers: list(str)
889
    """
890 5
    print("~~~{0}~~~".format(headers[idx]))
891 5
    chunks = chunker(table.find_all("td"), 4)
892 5
    for chunk in chunks:
893 5
        loader_page_printer(chunk)
894 5
    print(" ")
895
896
897 5
def loader_page_chunker_bbm(ull):
898
    """
899
    Given a loader page list, chunk it into lists of list items.
900
901
    :param ull: HTML unordered list tag.
902
    :type ull: bs4.element.Tag
903
    """
904 5
    chunks = chunker(ull.find_all("li"), 3)
905 5
    for chunk in chunks:
906 5
        loader_page_printer(chunk)
907
908
909 5
def loader_page_printer(chunk):
910
    """
911
    Print individual cell texts given a list of table cells.
912
913
    :param chunk: List of td tags.
914
    :type chunk: list(bs4.element.Tag)
915
    """
916 5
    key = unicode_filter(chunk[0].text)
917 5
    ver = unicode_filter(chunk[1].text)
918 5
    link = unicode_filter(chunk[2].find("a")["href"])
919 5
    print("{0}\n    {1}: {2}".format(key, ver, link))
920
921
922 5
@pem_wrapper
923 5
def base_metadata(url, session=None):
924
    """
925
    Get BBNDK metadata, base function.
926
927
    :param url: URL to check.
928
    :type url: str
929
930
    :param session: Requests session object, default is created on the fly.
931
    :type session: requests.Session()
932
    """
933 5
    session = generic_session(session)
934 5
    req = session.get(url)
935 5
    data = req.content
936 5
    entries = data.split(b"\n")
937 5
    metadata = [entry.split(b",")[1].decode("utf-8") for entry in entries if entry]
938 5
    return metadata
939
940
941 5
def base_metadata_url(alternate=None):
942
    """
943
    Return metadata URL.
944
945
    :param alternate: If the URL is for the simulator metadata. Default is False.
946
    :type alternate: str
947
    """
948 5
    baseurl = "http://downloads.blackberry.com/upr/developers/update/bbndk"
949 5
    tail = "{0}/{0}_metadata".format(alternate) if alternate is not None else "metadata"
950 5
    return "{0}/{1}".format(baseurl, tail)
951
952
953 5
def ndk_metadata(session=None):
954
    """
955
    Get BBNDK target metadata.
956
957
    :param session: Requests session object, default is created on the fly.
958
    :type session: requests.Session()
959
    """
960 5
    ndkurl = base_metadata_url()
961 5
    data = base_metadata(ndkurl, session)
962 5
    metadata = [entry for entry in data if entry.startswith(("10.0", "10.1", "10.2"))]
963 5
    return metadata
964
965
966 5
def sim_metadata(session=None):
967
    """
968
    Get BBNDK simulator metadata.
969
970
    :param session: Requests session object, default is created on the fly.
971
    :type session: requests.Session()
972
    """
973 5
    simurl = base_metadata_url("simulator")
974 5
    metadata = base_metadata(simurl, session)
975 5
    return metadata
976
977
978 5
def runtime_metadata(session=None):
979
    """
980
    Get BBNDK runtime metadata.
981
982
    :param session: Requests session object, default is created on the fly.
983
    :type session: requests.Session()
984
    """
985 5
    rturl = base_metadata_url("runtime")
986 5
    metadata = base_metadata(rturl, session)
987 5
    return metadata
988
989
990 5
def series_generator(osversion):
991
    """
992
    Generate series/branch name from OS version.
993
994
    :param osversion: OS version.
995
    :type osversion: str
996
    """
997 5
    splits = osversion.split(".")
998 5
    return "BB{0}_{1}_{2}".format(*splits[0:3])
999
1000
1001 5
@pem_wrapper
1002 5
def devalpha_urls(osversion, skel, session=None):
1003
    """
1004
    Check individual Dev Alpha autoloader URLs.
1005
1006
    :param osversion: OS version.
1007
    :type osversion: str
1008
1009
    :param skel: Individual skeleton format to try.
1010
    :type skel: str
1011
1012
    :param session: Requests session object, default is created on the fly.
1013
    :type session: requests.Session()
1014
    """
1015 5
    session = generic_session(session)
1016 5
    baseurl = "http://downloads.blackberry.com/upr/developers/downloads"
1017 5
    url = "{2}/{0}{1}.exe".format(skel, osversion, baseurl)
1018 5
    req = session.head(url)
1019 5
    if req.status_code == 200:
1020 5
        finals = (url, req.headers["content-length"])
1021
    else:
1022 5
        finals = ()
1023 5
    return finals
1024
1025
1026 5
def devalpha_urls_serieshandler(osversion, skeletons):
1027
    """
1028
    Process list of candidate Dev Alpha autoloader URLs.
1029
1030
    :param osversion: OS version.
1031
    :type osversion: str
1032
1033
    :param skeletons: List of skeleton formats to try.
1034
    :type skeletons: list
1035
    """
1036 5
    skels = skeletons
1037 5
    for idx, skel in enumerate(skeletons):
1038 5
        if "<SERIES>" in skel:
1039 5
            skels[idx] = skel.replace("<SERIES>", series_generator(osversion))
1040 5
    return skels
1041
1042
1043 5
def devalpha_urls_bulk(osversion, skeletons, xec, session=None):
1044
    """
1045
    Construct list of valid Dev Alpha autoloader URLs.
1046
1047
    :param osversion: OS version.
1048
    :type osversion: str
1049
1050
    :param skeletons: List of skeleton formats to try.
1051
    :type skeletons: list
1052
1053
    :param xec: ThreadPoolExecutor instance.
1054
    :type xec: concurrent.futures.ThreadPoolExecutor
1055
1056
    :param session: Requests session object, default is created on the fly.
1057
    :type session: requests.Session()
1058
    """
1059 5
    finals = {}
1060 5
    skels = devalpha_urls_serieshandler(osversion, skeletons)
1061 5
    for skel in skels:
1062 5
        final = xec.submit(devalpha_urls, osversion, skel, session).result()
1063 5
        if final:
1064 5
            finals[final[0]] = final[1]
1065 5
    return finals
1066
1067
1068 5
def devalpha_urls_bootstrap(osversion, skeletons, session=None):
1069
    """
1070
    Get list of valid Dev Alpha autoloader URLs.
1071
1072
    :param osversion: OS version.
1073
    :type osversion: str
1074
1075
    :param skeletons: List of skeleton formats to try.
1076
    :type skeletons: list
1077
1078
    :param session: Requests session object, default is created on the fly.
1079
    :type session: requests.Session()
1080
    """
1081 5
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as xec:
1082 5
        try:
1083 5
            return devalpha_urls_bulk(osversion, skeletons, xec, session)
1084 5
        except KeyboardInterrupt:
1085 5
            xec.shutdown(wait=False)
1086
1087
1088 5
def dev_dupe_dicter(finals):
1089
    """
1090
    Prepare dictionary to clean duplicate autoloaders.
1091
1092
    :param finals: Dict of URL:content-length pairs.
1093
    :type finals: dict(str: str)
1094
    """
1095 5
    revo = {}
1096 5
    for key, val in finals.items():
1097 5
        revo.setdefault(val, set()).add(key)
1098 5
    return revo
1099
1100
1101 5
def dev_dupe_remover(finals, dupelist):
1102
    """
1103
    Filter dictionary of autoloader entries.
1104
1105
    :param finals: Dict of URL:content-length pairs.
1106
    :type finals: dict(str: str)
1107
1108
    :param dupelist: List of duplicate URLs.
1109
    :type duplist: list(str)
1110
    """
1111 5
    for dupe in dupelist:
1112 5
        for entry in dupe:
1113 5
            if "DevAlpha" in entry:
1114 5
                del finals[entry]
1115 5
    return finals
1116
1117
1118 5
def dev_dupe_cleaner(finals):
1119
    """
1120
    Clean duplicate autoloader entries.
1121
1122
    :param finals: Dict of URL:content-length pairs.
1123
    :type finals: dict(str: str)
1124
    """
1125 5
    revo = dev_dupe_dicter(finals)
1126 5
    dupelist = [val for key, val in revo.items() if len(val) > 1]
1127 5
    finals = dev_dupe_remover(finals, dupelist)
1128
    return finals
1129