Passed
Push — master ( 816317...cc1553 )
by John
03:01
created

bbarchivist.networkutils.ptcrb_item_cleaner()   A

Complexity

Conditions 2

Size

Total Lines 41
Code Lines 35

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 35
CRAP Score 2

Importance

Changes 0
Metric Value
cc 2
eloc 35
nop 1
dl 0
loc 41
ccs 35
cts 35
cp 1
crap 2
rs 9.0399
c 0
b 0
f 0
1
#!/usr/bin/env python3
0 ignored issues
show
coding-style introduced by
Too many lines in module (1128/1000)
Loading history...
2 5
"""This module is used for network connections; APIs, downloading, etc."""
3
4 5
import concurrent.futures  # multiprocessing/threading
5 5
import glob  # pem file lookup
6 5
import os  # filesystem read
7 5
import re  # regexes
8
9 5
import requests  # downloading
10 5
from bs4 import BeautifulSoup  # scraping
11 5
from bbarchivist import utilities  # parse filesize
12 5
from bbarchivist import xmlutils  # xml work
13 5
from bbarchivist.bbconstants import SERVERS  # lookup servers
14
15 5
__author__ = "Thurask"
16 5
__license__ = "WTFPL v2"
17 5
__copyright__ = "2015-2018 Thurask"
18
19
20 5
def grab_pem():
21
    """
22
    Work with either local cacerts or system cacerts.
23
    """
24 5
    try:
25 5
        pemfile = glob.glob(os.path.join(os.getcwd(), "cacert.pem"))[0]
26 5
    except IndexError:
27 5
        return requests.certs.where()  # no local cacerts
28
    else:
29 5
        return os.path.abspath(pemfile)  # local cacerts
30
31
32 5
def pem_wrapper(method):
33
    """
34
    Decorator to set REQUESTS_CA_BUNDLE.
35
36
    :param method: Method to use.
37
    :type method: function
38
    """
39 5
    def wrapper(*args, **kwargs):
40
        """
41
        Set REQUESTS_CA_BUNDLE before doing function.
42
        """
43 5
        os.environ["REQUESTS_CA_BUNDLE"] = grab_pem()
44 5
        return method(*args, **kwargs)
45 5
    return wrapper
46
47
48 5
def try_try_again(method):
49
    """
50
    Decorator to absorb timeouts, proxy errors, and other common exceptions.
51
52
    :param method: Method to use.
53
    :type method: function
54
    """
55 5
    def wrapper(*args, **kwargs):
56
        """
57
        Try function, try it again up to five times, and leave gracefully.
58
        """
59 5
        tries = 5
60 5
        for _ in range(tries):
61 5
            try:
62 5
                result = method(*args, **kwargs)
63 5
            except (requests.exceptions.Timeout, requests.exceptions.ConnectionError, requests.exceptions.ProxyError):
64 5
                continue
65
            else:
66 5
                break
67
        else:
68 5
            result = None
69 5
        return result
70 5
    return wrapper
71
72
73 5
def generic_session(session=None):
74
    """
75
    Create a Requests session object on the fly, if need be.
76
77
    :param session: Requests session object, created if this is None.
78
    :type session: requests.Session()
79
    """
80 5
    sess = requests.Session() if session is None else session
81 5
    return sess
82
83
84 5
def generic_soup_parser(url, session=None):
85
    """
86
    Get a BeautifulSoup HTML parser for some URL.
87
88
    :param url: The URL to check.
89
    :type url: str
90
91
    :param session: Requests session object, default is created on the fly.
92
    :type session: requests.Session()
93
    """
94 5
    session = generic_session(session)
95 5
    req = session.get(url)
96 5
    soup = BeautifulSoup(req.content, "html.parser")
97 5
    return soup
98
99
100 5
@pem_wrapper
101 5
def get_length(url, session=None):
102
    """
103
    Get content-length header from some URL.
104
105
    :param url: The URL to check.
106
    :type url: str
107
108
    :param session: Requests session object, default is created on the fly.
109
    :type session: requests.Session()
110
    """
111 5
    session = generic_session(session)
112 5
    if url is None:
113 5
        return 0
114 5
    try:
115 5
        heads = session.head(url)
116 5
        fsize = heads.headers['content-length']
117 5
        return int(fsize)
118 5
    except requests.ConnectionError:
119 5
        return 0
120
121
122 5
@pem_wrapper
123 5
def download(url, output_directory=None, session=None):
124
    """
125
    Download file from given URL.
126
127
    :param url: URL to download from.
128
    :type url: str
129
130
    :param output_directory: Download folder. Default is local.
131
    :type output_directory: str
132
133
    :param session: Requests session object, default is created on the fly.
134
    :type session: requests.Session()
135
    """
136 5
    session = generic_session(session)
137 5
    output_directory = utilities.dirhandler(output_directory, os.getcwd())
138 5
    lfname = url.split('/')[-1]
139 5
    sname = utilities.stripper(lfname)
140 5
    fname = os.path.join(output_directory, lfname)
141 5
    download_writer(url, fname, lfname, sname, session)
142 5
    remove_empty_download(fname)
143
144
145 5
def remove_empty_download(fname):
146
    """
147
    Remove file if it's empty.
148
149
    :param fname: File path.
150
    :type fname: str
151
    """
152 5
    if os.stat(fname).st_size == 0:
153 5
        os.remove(fname)
154
155
156 5
def download_writer(url, fname, lfname, sname, session=None):
157
    """
158
    Download file and write to disk.
159
160
    :param url: URL to download from.
161
    :type url: str
162
163
    :param fname: File path.
164
    :type fname: str
165
166
    :param lfname: Long filename.
167
    :type lfname: str
168
169
    :param sname: Short name, for printing to screen.
170
    :type sname: str
171
172
    :param session: Requests session object, default is created on the fly.
173
    :type session: requests.Session()
174
    """
175 5
    with open(fname, "wb") as file:
176 5
        req = session.get(url, stream=True)
177 5
        clength = req.headers['content-length']
178 5
        fsize = utilities.fsizer(clength)
179 5
        if req.status_code == 200:  # 200 OK
180 5
            print("DOWNLOADING {0} [{1}]".format(sname, fsize))
181 5
            for chunk in req.iter_content(chunk_size=1024):
182 5
                file.write(chunk)
183
        else:
184 5
            print("ERROR: HTTP {0} IN {1}".format(req.status_code, lfname))
185
186
187 5
def download_bootstrap(urls, outdir=None, workers=5, session=None):
188
    """
189
    Run downloaders for each file in given URL iterable.
190
191
    :param urls: URLs to download.
192
    :type urls: list
193
194
    :param outdir: Download folder. Default is handled in :func:`download`.
195
    :type outdir: str
196
197
    :param workers: Number of worker processes. Default is 5.
198
    :type workers: int
199
200
    :param session: Requests session object, default is created on the fly.
201
    :type session: requests.Session()
202
    """
203 5
    workers = len(urls) if len(urls) < workers else workers
204 5
    spinman = utilities.SpinManager()
205 5
    with concurrent.futures.ThreadPoolExecutor(max_workers=workers) as xec:
206 5
        try:
207 5
            spinman.start()
208 5
            for url in urls:
209 5
                xec.submit(download, url, outdir, session)
210 5
        except (KeyboardInterrupt, SystemExit):
211 5
            xec.shutdown()
212 5
            spinman.stop()
213 5
    spinman.stop()
214 5
    utilities.spinner_clear()
215 5
    utilities.line_begin()
216
217
218 5
def download_android_tools(downloaddir=None):
219
    """
220
    Download Android SDK platform tools.
221
222
    :param downloaddir: Directory name, default is "plattools".
223
    :type downloaddir: str
224
    """
225 5
    if downloaddir is None:
226 5
        downloaddir = "plattools"
227 5
    if os.path.exists(downloaddir):
228 5
        os.removedirs(downloaddir)
229 5
    os.mkdir(downloaddir)
230 5
    platforms = ("windows", "linux", "darwin")
231 5
    baseurl = "https://dl.google.com/android/repository/platform-tools-latest"
232 5
    dlurls = ["{1}-{0}.zip".format(plat, baseurl) for plat in platforms]
233 5
    sess = generic_session()
234 5
    download_bootstrap(dlurls, outdir="plattools", session=sess)
235
236
237 5
@pem_wrapper
238 5
def getcode(url, session=None):
239
    """
240
    Return status code of given URL.
241
242
    :param url: URL to check.
243
    :type url: str
244
245
    :param session: Requests session object, default is created on the fly.
246
    :type session: requests.Session()
247
    """
248 5
    session = generic_session(session)
249 5
    try:
250 5
        shead = session.head(url)
251 5
        status = int(shead.status_code)
252 5
        return status
253 5
    except requests.ConnectionError:
254 5
        return 404
255
256
257 5
@pem_wrapper
258 5
def availability(url, session=None):
259
    """
260
    Check HTTP status code of given URL.
261
    200 or 301-308 is OK, else is not.
262
263
    :param url: URL to check.
264
    :type url: str
265
266
    :param session: Requests session object, default is created on the fly.
267
    :type session: requests.Session()
268
    """
269 5
    status = getcode(url, session)
270 5
    return status == 200 or 300 < status <= 308
271
272
273 5
def clean_availability(results, server):
274
    """
275
    Clean availability for autolookup script.
276
277
    :param results: Result dict.
278
    :type results: dict(str: str)
279
280
    :param server: Server, key for result dict.
281
    :type server: str
282
    """
283 5
    marker = "PD" if server == "p" else server.upper()
284 5
    rel = results[server.lower()]
285 5
    avail = marker if rel != "SR not in system" and rel is not None else "  "
286 5
    return rel, avail
287
288
289 5
@pem_wrapper
290 5
def carrier_checker(mcc, mnc, session=None):
291
    """
292
    Query BlackBerry World to map a MCC and a MNC to a country and carrier.
293
294
    :param mcc: Country code.
295
    :type mcc: int
296
297
    :param mnc: Network code.
298
    :type mnc: int
299
300
    :param session: Requests session object, default is created on the fly.
301
    :type session: requests.Session()
302
    """
303 5
    session = generic_session(session)
304 5
    baseurl = "http://appworld.blackberry.com/ClientAPI/checkcarrier"
305 5
    url = "{2}?homemcc={0}&homemnc={1}&devicevendorid=-1&pin=0".format(mcc, mnc, baseurl)
306 5
    user_agent = {'User-agent': 'AppWorld/5.1.0.60'}
307 5
    req = session.get(url, headers=user_agent)
308 5
    country, carrier = xmlutils.cchecker_get_tags(req.text)
309 5
    return country, carrier
310
311
312 5
def return_npc(mcc, mnc):
313
    """
314
    Format MCC and MNC into a NPC.
315
316
    :param mcc: Country code.
317
    :type mcc: int
318
319
    :param mnc: Network code.
320
    :type mnc: int
321
    """
322 5
    return "{0}{1}30".format(str(mcc).zfill(3), str(mnc).zfill(3))
323
324
325 5
@pem_wrapper
326 5
def carrier_query(npc, device, upgrade=False, blitz=False, forced=None, session=None):
327
    """
328
    Query BlackBerry servers, check which update is out for a carrier.
329
330
    :param npc: MCC + MNC (see `func:return_npc`)
331
    :type npc: int
332
333
    :param device: Hexadecimal hardware ID.
334
    :type device: str
335
336
    :param upgrade: Whether to use upgrade files. False by default.
337
    :type upgrade: bool
338
339
    :param blitz: Whether or not to create a blitz package. False by default.
340
    :type blitz: bool
341
342
    :param forced: Force a software release.
343
    :type forced: str
344
345
    :param session: Requests session object, default is created on the fly.
346
    :type session: requests.Session()
347
    """
348 5
    session = generic_session(session)
349 5
    upg = "upgrade" if upgrade else "repair"
350 5
    forced = "latest" if forced is None else forced
351 5
    url = "https://cs.sl.blackberry.com/cse/updateDetails/2.2/"
352 5
    query = xmlutils.prep_carrier_query(npc, device, upg, forced)
353 5
    header = {"Content-Type": "text/xml;charset=UTF-8"}
354 5
    req = session.post(url, headers=header, data=query)
355 5
    return xmlutils.parse_carrier_xml(req.text, blitz)
356
357
358 5
@pem_wrapper
359 5
def sr_lookup(osver, server, session=None):
360
    """
361
    Software release lookup, with choice of server.
362
    :data:`bbarchivist.bbconstants.SERVERLIST` for server list.
363
364
    :param osver: OS version to lookup, 10.x.y.zzzz.
365
    :type osver: str
366
367
    :param server: Server to use.
368
    :type server: str
369
370
    :param session: Requests session object, default is created on the fly.
371
    :type session: requests.Session()
372
    """
373 5
    query = xmlutils.prep_sr_lookup(osver)
374 5
    reqtext = sr_lookup_poster(query, server, session)
375 5
    packtext = xmlutils.parse_sr_lookup(reqtext)
376 5
    return packtext
377
378
379 5
def sr_lookup_poster(query, server, session=None):
380
    """
381
    Post the XML payload for a software release lookup.
382
383
    :param query: XML payload.
384
    :type query: str
385
386
    :param server: Server to use.
387
    :type server: str
388
389
    :param session: Requests session object, default is created on the fly.
390
    :type session: requests.Session()
391
    """
392 5
    session = generic_session(session)
393 5
    header = {"Content-Type": "text/xml;charset=UTF-8"}
394 5
    try:
395 5
        req = session.post(server, headers=header, data=query, timeout=1)
396 5
    except (requests.exceptions.Timeout, requests.exceptions.ConnectionError):
397 5
        reqtext = "SR not in system"
398
    else:
399 5
        reqtext = req.text
400 5
    return reqtext
401
402
403 5
def sr_lookup_bootstrap(osv, session=None, no2=False):
404
    """
405
    Run lookups for each server for given OS.
406
407
    :param osv: OS to check.
408
    :type osv: str
409
410
    :param session: Requests session object, default is created on the fly.
411
    :type session: requests.Session()
412
413
    :param no2: Whether to skip Alpha2/Beta2 servers. Default is false.
414
    :type no2: bool
415
    """
416 5
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as xec:
417 5
        try:
418 5
            results = {
419
                "p": None,
420
                "a1": None,
421
                "a2": None,
422
                "b1": None,
423
                "b2": None
424
            }
425 5
            if no2:
426 5
                del results["a2"]
427 5
                del results["b2"]
428 5
            for key in results:
429 5
                results[key] = xec.submit(sr_lookup, osv, SERVERS[key], session).result()
430 5
            return results
431 5
        except KeyboardInterrupt:
432 5
            xec.shutdown(wait=False)
433
434
435 5
@pem_wrapper
436 5
def available_bundle_lookup(mcc, mnc, device, session=None):
437
    """
438
    Check which software releases were ever released for a carrier.
439
440
    :param mcc: Country code.
441
    :type mcc: int
442
443
    :param mnc: Network code.
444
    :type mnc: int
445
446
    :param device: Hexadecimal hardware ID.
447
    :type device: str
448
449
    :param session: Requests session object, default is created on the fly.
450
    :type session: requests.Session()
451
    """
452 5
    session = generic_session(session)
453 5
    server = "https://cs.sl.blackberry.com/cse/availableBundles/1.0.0/"
454 5
    npc = return_npc(mcc, mnc)
455 5
    query = xmlutils.prep_available_bundle(device, npc)
456 5
    header = {"Content-Type": "text/xml;charset=UTF-8"}
457 5
    req = session.post(server, headers=header, data=query)
458 5
    bundlelist = xmlutils.parse_available_bundle(req.text)
459 5
    return bundlelist
460
461
462 5
@pem_wrapper
463 5
def ptcrb_scraper(ptcrbid, session=None):
464
    """
465
    Get the PTCRB results for a given device.
466
467
    :param ptcrbid: Numerical ID from PTCRB (end of URL).
468
    :type ptcrbid: str
469
470
    :param session: Requests session object, default is created on the fly.
471
    :type session: requests.Session()
472
    """
473 5
    baseurl = "https://www.ptcrb.com/certified-devices/device-details/?model={0}".format(ptcrbid)
474 5
    sess = generic_session(session)
475 5
    useragent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/61.0.3163.79 Safari/537.36"
476 5
    sess.headers.update({"User-agent": useragent})
477 5
    soup = generic_soup_parser(baseurl, sess)
478 5
    certtable = soup.find_all("table")[1]
479 5
    tds = certtable.find_all("td")[1::2]  # every other
480 5
    prelimlist = [tdx.text for tdx in tds]
481 5
    cleanlist = [ptcrb_item_cleaner(item.strip()) for item in prelimlist]
482 5
    return cleanlist
483
484
485 5
def space_pad(instring, minlength):
486
    """
487
    Pad a string with spaces until it's the minimum length.
488
489
    :param instring: String to pad.
490
    :type instring: str
491
492
    :param minlength: Pad while len(instring) < minlength.
493
    :type minlength: int
494
    """
495 5
    while len(instring) < minlength:
496 5
        instring += " "
497 5
    return instring
498
499
500 5
def ptcrb_cleaner_multios(item):
501
    """
502
    Discard multiple entries for "OS".
503
504
    :param item: The item to clean.
505
    :type item: str
506
    """
507 5
    if item.count("OS") > 1:
508 5
        templist = item.split("OS")
509 5
        templist[0] = "OS"
510 5
        item = "".join([templist[0], templist[1]])
511 5
    return item
512
513
514 5
def ptcrb_cleaner_spaces(item):
515
    """
516
    Pad item with spaces to the right length.
517
518
    :param item: The item to clean.
519
    :type item: str
520
    """
521 5
    spaclist = item.split(" ")
522 5
    if len(spaclist) > 1:
523 5
        spaclist[1] = space_pad(spaclist[1], 11)
524 5
    if len(spaclist) > 3:
525 5
        spaclist[3] = space_pad(spaclist[3], 11)
526 5
    item = " ".join(spaclist)
527 5
    return item
528
529
530 5
def ptcrb_item_cleaner(item):
531
    """
532
    Cleanup poorly formatted PTCRB entries written by an intern.
533
534
    :param item: The item to clean.
535
    :type item: str
536
    """
537 5
    item = item.replace("<td>", "")
538 5
    item = item.replace("</td>", "")
539 5
    item = item.replace("\n", "")
540 5
    item = item.replace("SW: OS", "OS")
541 5
    item = item.replace("Software Version: OS", "OS")
542 5
    item = item.replace(" (SR", ", SR")
543 5
    item = re.sub(r"\s?\((.*)$", "", item)
544 5
    item = re.sub(r"\sSV.*$", "", item)
545 5
    item = item.replace(")", "")
546 5
    item = item.replace(". ", ".")
547 5
    item = item.replace(";", "")
548 5
    item = item.replace("version", "Version")
549 5
    item = item.replace("Verison", "Version")
550 5
    item = ptcrb_cleaner_multios(item)
551 5
    item = item.replace("SR10", "SR 10")
552 5
    item = item.replace("SR", "SW Release")
553 5
    item = item.replace(" Version:", ":")
554 5
    item = item.replace("Version ", " ")
555 5
    item = item.replace(":1", ": 1")
556 5
    item = item.replace(", ", " ")
557 5
    item = item.replace(",", " ")
558 5
    item = item.replace("Software", "SW")
559 5
    item = item.replace("  ", " ")
560 5
    item = item.replace("OS ", "OS: ")
561 5
    item = item.replace("Radio ", "Radio: ")
562 5
    item = item.replace("Release ", "Release: ")
563 5
    item = ptcrb_cleaner_spaces(item)
564 5
    item = item.strip()
565 5
    item = item.replace("\r", "")
566 5
    if item.startswith("10"):
567 5
        item = "OS: {0}".format(item)
568 5
    item = item.replace(":    ", ": ")
569 5
    item = item.replace(":   ", ": ")
570 5
    return item
571
572
573 5
@pem_wrapper
574 5
def kernel_scraper(utils=False, session=None):
575
    """
576
    Scrape BlackBerry's GitHub kernel repo for available branches.
577
578
    :param utils: Check android-utils repo instead of android-linux-kernel. Default is False.
579
    :type utils: bool
580
581
    :param session: Requests session object, default is created on the fly.
582
    :type session: requests.Session()
583
    """
584 5
    repo = "android-utils" if utils else "android-linux-kernel"
585 5
    kernlist = []
586 5
    sess = generic_session(session)
587 5
    for page in range(1, 10):
588 5
        url = "https://github.com/blackberry/{0}/branches/all?page={1}".format(repo, page)
589 5
        soup = generic_soup_parser(url, sess)
590 5
        if soup.find("div", {"class": "no-results-message"}):
591 5
            break
592
        else:
593 5
            text = soup.get_text()
594 5
            kernlist.extend(re.findall(r"msm[0-9]{4}\/[A-Z0-9]{6}", text, re.IGNORECASE))
595 5
    return kernlist
596
597
598 5
def root_generator(folder, build, variant="common"):
599
    """
600
    Generate roots for the SHAxxx hash lookup URLs.
601
602
    :param folder: Dictionary of variant: loader name pairs.
603
    :type folder: dict(str: str)
604
605
    :param build: Build to check, 3 letters + 3 numbers.
606
    :type build: str
607
608
    :param variant: Autoloader variant. Default is "common".
609
    :type variant: str
610
    """
611
    #Priv specific
612 5
    privx = "bbfoundation/hashfiles_priv/{0}".format(folder[variant])
613
    #DTEK50 specific
614 5
    dtek50x = "bbSupport/DTEK50" if build[:3] == "AAF" else "bbfoundation/hashfiles_priv/dtek50"
615
    #DTEK60 specific
616 5
    dtek60x = dtek50x  # still uses dtek50 folder, for some reason
617
    #Pack it up
618 5
    roots = {"Priv": privx, "DTEK50": dtek50x, "DTEK60": dtek60x}
619 5
    return roots
620
621
622 5
def make_droid_skeleton_bbm(method, build, device, variant="common"):
623
    """
624
    Make an Android autoloader/hash URL, on the BB Mobile site.
625
626
    :param method: None for regular OS links, "sha256/512" for SHA256 or 512 hash.
627
    :type method: str
628
629
    :param build: Build to check, 3 letters + 3 numbers.
630
    :type build: str
631
632
    :param device: Device to check.
633
    :type device: str
634
635
    :param variant: Autoloader variant. Default is "common".
636
    :type variant: str
637
    """
638 5
    devices = {"KEYone": "qc8953", "Motion": "qc8953krypton", "KEY2": "sdm660"}
639 5
    base = "bbry_{2}_autoloader_user-{0}-{1}".format(variant, build.upper(), devices[device])
640 5
    if method is None:
641 5
        skel = "http://54.247.87.13/softwareupgrade/BBM/{0}.zip".format(base)
642
    else:
643 5
        skel = "http://54.247.87.13/softwareupgrade/BBM/{0}.{1}sum".format(base, method.lower())
644 5
    return skel
645
646
647 5
def make_droid_skeleton_og(method, build, device, variant="common"):
648
    """
649
    Make an Android autoloader/hash URL, on the original site.
650
651
    :param method: None for regular OS links, "sha256/512" for SHA256 or 512 hash.
652
    :type method: str
653
654
    :param build: Build to check, 3 letters + 3 numbers.
655
    :type build: str
656
657
    :param device: Device to check.
658
    :type device: str
659
660
    :param variant: Autoloader variant. Default is "common".
661
    :type variant: str
662
    """
663 5
    folder = {"vzw-vzw": "verizon", "na-att": "att", "na-tmo": "tmo", "common": "default"}
664 5
    devices = {"Priv": "qc8992", "DTEK50": "qc8952_64_sfi", "DTEK60": "qc8996"}
665 5
    roots = root_generator(folder, build, variant)
666 5
    base = "bbry_{2}_autoloader_user-{0}-{1}".format(variant, build.upper(), devices[device])
667 5
    if method is None:
668 5
        baseurl = "https://bbapps.download.blackberry.com/Priv"
669 5
        skel = "{1}/{0}.zip".format(base, baseurl)
670
    else:
671 5
        baseurl = "https://ca.blackberry.com/content/dam"
672 5
        skel = "{3}/{1}/{0}.{2}sum".format(base, roots[device], method.lower(), baseurl)
673 5
    return skel
674
675
676 5
def make_droid_skeleton(method, build, device, variant="common"):
677
    """
678
    Make an Android autoloader/hash URL.
679
680
    :param method: None for regular OS links, "sha256/512" for SHA256 or 512 hash.
681
    :type method: str
682
683
    :param build: Build to check, 3 letters + 3 numbers.
684
    :type build: str
685
686
    :param device: Device to check.
687
    :type device: str
688
689
    :param variant: Autoloader variant. Default is "common".
690
    :type variant: str
691
    """
692
    # No Aurora
693 5
    oglist = ("Priv", "DTEK50", "DTEK60")  # BlackBerry
694 5
    bbmlist = ("KEYone", "Motion", "KEY2")   # BB Mobile
695 5
    if device in oglist:
696 5
        skel = make_droid_skeleton_og(method, build, device, variant)
697 5
    elif device in bbmlist:
698 5
        skel = make_droid_skeleton_bbm(method, build, device, variant)
699 5
    return skel
700
701
702 5
def bulk_droid_skeletons(devs, build, method=None):
703
    """
704
    Prepare list of Android autoloader/hash URLs.
705
706
    :param devs: List of devices.
707
    :type devs: list(str)
708
709
    :param build: Build to check, 3 letters + 3 numbers.
710
    :type build: str
711
712
    :param method: None for regular OS links, "sha256/512" for SHA256 or 512 hash.
713
    :type method: str
714
    """
715 5
    carrier_variants = {
716
        "Priv": ("common", "vzw-vzw", "na-tmo", "na-att"),
717
        "KEYone": ("common", "usa-sprint", "global-att", "china-china")
718
    }
719 5
    common_variants = ("common", )  # for single-variant devices
720 5
    carrier_devices = ("Priv", )  # add KEYone when verified
721 5
    skels = []
722 5
    for dev in devs:
723 5
        varlist = carrier_variants[dev] if dev in carrier_devices else common_variants
724 5
        for var in varlist:
725 5
            skel = make_droid_skeleton(method, build, dev, var)
726 5
            skels.append(skel)
727 5
    return skels
728
729
730 5
def prepare_droid_list(device):
731
    """
732
    Convert single devices to a list, if necessary.
733
734
    :param device: Device to check.
735
    :type device: str
736
    """
737 5
    if isinstance(device, list):
738 5
        devs = device
739
    else:
740 5
        devs = [device]
741 5
    return devs
742
743
744 5
def droid_scanner(build, device, method=None, session=None):
745
    """
746
    Check for Android autoloaders on BlackBerry's site.
747
748
    :param build: Build to check, 3 letters + 3 numbers.
749
    :type build: str
750
751
    :param device: Device to check.
752
    :type device: str
753
754
    :param method: None for regular OS links, "sha256/512" for SHA256 or 512 hash.
755
    :type method: str
756
757
    :param session: Requests session object, default is created on the fly.
758
    :type session: requests.Session()
759
    """
760 5
    devs = prepare_droid_list(device)
761 5
    skels = bulk_droid_skeletons(devs, build, method)
762 5
    with concurrent.futures.ThreadPoolExecutor(max_workers=len(skels)) as xec:
763 5
        results = droid_scanner_worker(xec, skels, session)
764 5
    return results if results else None
765
766
767 5
def droid_scanner_worker(xec, skels, session=None):
768
    """
769
    Worker to check for Android autoloaders.
770
771
    :param xec: ThreadPoolExecutor instance.
772
    :type xec: concurrent.futures.ThreadPoolExecutor
773
774
    :param skels: List of skeleton formats.
775
    :type skels: list(str)
776
777
    :param session: Requests session object, default is created on the fly.
778
    :type session: requests.Session()
779
    """
780 5
    results = []
781 5
    for skel in skels:
782 5
        avail = xec.submit(availability, skel, session)
783 5
        if avail.result():
784 5
            results.append(skel)
785 5
    return results
786
787
788 5
def chunker(iterable, inc):
789
    """
790
    Convert an iterable into a list of inc sized lists.
791
792
    :param iterable: Iterable to chunk.
793
    :type iterable: list/tuple/string
794
795
    :param inc: Increment; how big each chunk is.
796
    :type inc: int
797
    """
798 5
    chunks = [iterable[x:x+inc] for x in range(0, len(iterable), inc)]
799 5
    return chunks
800
801
802 5
def unicode_filter(intext):
803
    """
804
    Remove Unicode crap.
805
806
    :param intext: Text to filter.
807
    :type intext: str
808
    """
809 5
    return intext.replace("\u2013", "").strip()
810
811
812 5
def table_header_filter(ptag):
813
    """
814
    Validate p tag, to see if it's relevant.
815
816
    :param ptag: P tag.
817
    :type ptag: bs4.element.Tag
818
    """
819 5
    valid = ptag.find("b") and "BlackBerry" in ptag.text and not "experts" in ptag.text
820 5
    return valid
821
822
823 5
def table_headers(pees):
824
    """
825
    Generate table headers from list of p tags.
826
827
    :param pees: List of p tags.
828
    :type pees: list(bs4.element.Tag)
829
    """
830 5
    bolds = [x.text for x in pees if table_header_filter(x)]
831 5
    return bolds
832
833
834 5
@pem_wrapper
835 5
def loader_page_scraper(session=None):
836
    """
837
    Return scraped autoloader pages.
838
839
    :param session: Requests session object, default is created on the fly.
840
    :type session: requests.Session()
841
    """
842 5
    session = generic_session(session)
843 5
    loader_page_scraper_og(session)
844 5
    loader_page_scraper_bbm(session)
845
846
847 5
def loader_page_scraper_og(session=None):
848
    """
849
    Return scraped autoloader page, original site.
850
851
    :param session: Requests session object, default is created on the fly.
852
    :type session: requests.Session()
853
    """
854 5
    url = "https://ca.blackberry.com/support/smartphones/Android-OS-Reload.html"
855 5
    soup = generic_soup_parser(url, session)
856 5
    tables = soup.find_all("table")
857 5
    headers = table_headers(soup.find_all("p"))
858 5
    for idx, table in enumerate(tables):
859 5
        loader_page_chunker_og(idx, table, headers)
860
861
862 5
def loader_page_scraper_bbm(session=None):
863
    """
864
    Return scraped autoloader page, new site.
865
866
    :param session: Requests session object, default is created on the fly.
867
    :type session: requests.Session()
868
    """
869 5
    url = "https://www.blackberrymobile.com/support/reload-software/"
870 5
    soup = generic_soup_parser(url, session)
871 5
    ulls = soup.find_all("ul", {"class": re.compile("list-two special-.")})[1:]
872 5
    print("~~~BlackBerry KEYone~~~")
873 5
    for ull in ulls:
874 5
        loader_page_chunker_bbm(ull)
875
876
877 5
def loader_page_chunker_og(idx, table, headers):
878
    """
879
    Given a loader page table, chunk it into lists of table cells.
880
881
    :param idx: Index of enumerating tables.
882
    :type idx: int
883
884
    :param table: HTML table tag.
885
    :type table: bs4.element.Tag
886
887
    :param headers: List of table headers.
888
    :type headers: list(str)
889
    """
890 5
    print("~~~{0}~~~".format(headers[idx]))
891 5
    chunks = chunker(table.find_all("td"), 4)
892 5
    for chunk in chunks:
893 5
        loader_page_printer(chunk)
894 5
    print(" ")
895
896
897 5
def loader_page_chunker_bbm(ull):
898
    """
899
    Given a loader page list, chunk it into lists of list items.
900
901
    :param ull: HTML unordered list tag.
902
    :type ull: bs4.element.Tag
903
    """
904 5
    chunks = chunker(ull.find_all("li"), 3)
905 5
    for chunk in chunks:
906 5
        loader_page_printer(chunk)
907
908
909 5
def loader_page_printer(chunk):
910
    """
911
    Print individual cell texts given a list of table cells.
912
913
    :param chunk: List of td tags.
914
    :type chunk: list(bs4.element.Tag)
915
    """
916 5
    key = unicode_filter(chunk[0].text)
917 5
    ver = unicode_filter(chunk[1].text)
918 5
    link = unicode_filter(chunk[2].find("a")["href"])
919 5
    print("{0}\n    {1}: {2}".format(key, ver, link))
920
921
922 5
@pem_wrapper
923 5
def base_metadata(url, session=None):
924
    """
925
    Get BBNDK metadata, base function.
926
927
    :param url: URL to check.
928
    :type url: str
929
930
    :param session: Requests session object, default is created on the fly.
931
    :type session: requests.Session()
932
    """
933 5
    session = generic_session(session)
934 5
    req = session.get(url)
935 5
    data = req.content
936 5
    entries = data.split(b"\n")
937 5
    metadata = [entry.split(b",")[1].decode("utf-8") for entry in entries if entry]
938 5
    return metadata
939
940
941 5
def base_metadata_url(alternate=None):
942
    """
943
    Return metadata URL.
944
945
    :param alternate: If the URL is for the simulator metadata. Default is False.
946
    :type alternate: str
947
    """
948 5
    baseurl = "http://downloads.blackberry.com/upr/developers/update/bbndk"
949 5
    tail = "{0}/{0}_metadata".format(alternate) if alternate is not None else "metadata"
950 5
    return "{0}/{1}".format(baseurl, tail)
951
952
953 5
def ndk_metadata(session=None):
954
    """
955
    Get BBNDK target metadata.
956
957
    :param session: Requests session object, default is created on the fly.
958
    :type session: requests.Session()
959
    """
960 5
    ndkurl = base_metadata_url()
961 5
    data = base_metadata(ndkurl, session)
962 5
    metadata = [entry for entry in data if entry.startswith(("10.0", "10.1", "10.2"))]
963 5
    return metadata
964
965
966 5
def sim_metadata(session=None):
967
    """
968
    Get BBNDK simulator metadata.
969
970
    :param session: Requests session object, default is created on the fly.
971
    :type session: requests.Session()
972
    """
973 5
    simurl = base_metadata_url("simulator")
974 5
    metadata = base_metadata(simurl, session)
975 5
    return metadata
976
977
978 5
def runtime_metadata(session=None):
979
    """
980
    Get BBNDK runtime metadata.
981
982
    :param session: Requests session object, default is created on the fly.
983
    :type session: requests.Session()
984
    """
985 5
    rturl = base_metadata_url("runtime")
986 5
    metadata = base_metadata(rturl, session)
987 5
    return metadata
988
989
990 5
def series_generator(osversion):
991
    """
992
    Generate series/branch name from OS version.
993
994
    :param osversion: OS version.
995
    :type osversion: str
996
    """
997 5
    splits = osversion.split(".")
998 5
    return "BB{0}_{1}_{2}".format(*splits[0:3])
999
1000
1001 5
@pem_wrapper
1002 5
def devalpha_urls(osversion, skel, session=None):
1003
    """
1004
    Check individual Dev Alpha autoloader URLs.
1005
1006
    :param osversion: OS version.
1007
    :type osversion: str
1008
1009
    :param skel: Individual skeleton format to try.
1010
    :type skel: str
1011
1012
    :param session: Requests session object, default is created on the fly.
1013
    :type session: requests.Session()
1014
    """
1015 5
    session = generic_session(session)
1016 5
    baseurl = "http://downloads.blackberry.com/upr/developers/downloads"
1017 5
    url = "{2}/{0}{1}.exe".format(skel, osversion, baseurl)
1018 5
    req = session.head(url)
1019 5
    if req.status_code == 200:
1020 5
        finals = (url, req.headers["content-length"])
1021
    else:
1022 5
        finals = ()
1023 5
    return finals
1024
1025
1026 5
def devalpha_urls_serieshandler(osversion, skeletons):
1027
    """
1028
    Process list of candidate Dev Alpha autoloader URLs.
1029
1030
    :param osversion: OS version.
1031
    :type osversion: str
1032
1033
    :param skeletons: List of skeleton formats to try.
1034
    :type skeletons: list
1035
    """
1036 5
    skels = skeletons
1037 5
    for idx, skel in enumerate(skeletons):
1038 5
        if "<SERIES>" in skel:
1039 5
            skels[idx] = skel.replace("<SERIES>", series_generator(osversion))
1040 5
    return skels
1041
1042
1043 5
def devalpha_urls_bulk(osversion, skeletons, xec, session=None):
1044
    """
1045
    Construct list of valid Dev Alpha autoloader URLs.
1046
1047
    :param osversion: OS version.
1048
    :type osversion: str
1049
1050
    :param skeletons: List of skeleton formats to try.
1051
    :type skeletons: list
1052
1053
    :param xec: ThreadPoolExecutor instance.
1054
    :type xec: concurrent.futures.ThreadPoolExecutor
1055
1056
    :param session: Requests session object, default is created on the fly.
1057
    :type session: requests.Session()
1058
    """
1059 5
    finals = {}
1060 5
    skels = devalpha_urls_serieshandler(osversion, skeletons)
1061 5
    for skel in skels:
1062 5
        final = xec.submit(devalpha_urls, osversion, skel, session).result()
1063 5
        if final:
1064 5
            finals[final[0]] = final[1]
1065 5
    return finals
1066
1067
1068 5
def devalpha_urls_bootstrap(osversion, skeletons, session=None):
1069
    """
1070
    Get list of valid Dev Alpha autoloader URLs.
1071
1072
    :param osversion: OS version.
1073
    :type osversion: str
1074
1075
    :param skeletons: List of skeleton formats to try.
1076
    :type skeletons: list
1077
1078
    :param session: Requests session object, default is created on the fly.
1079
    :type session: requests.Session()
1080
    """
1081 5
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as xec:
1082 5
        try:
1083 5
            return devalpha_urls_bulk(osversion, skeletons, xec, session)
1084 5
        except KeyboardInterrupt:
1085 5
            xec.shutdown(wait=False)
1086
1087
1088 5
def dev_dupe_dicter(finals):
1089
    """
1090
    Prepare dictionary to clean duplicate autoloaders.
1091
1092
    :param finals: Dict of URL:content-length pairs.
1093
    :type finals: dict(str: str)
1094
    """
1095 5
    revo = {}
1096 5
    for key, val in finals.items():
1097 5
        revo.setdefault(val, set()).add(key)
1098 5
    return revo
1099
1100
1101 5
def dev_dupe_remover(finals, dupelist):
1102
    """
1103
    Filter dictionary of autoloader entries.
1104
1105
    :param finals: Dict of URL:content-length pairs.
1106
    :type finals: dict(str: str)
1107
1108
    :param dupelist: List of duplicate URLs.
1109
    :type duplist: list(str)
1110
    """
1111 5
    for dupe in dupelist:
1112 5
        for entry in dupe:
1113 5
            if "DevAlpha" in entry:
1114 5
                del finals[entry]
1115 5
    return finals
1116
1117
1118 5
def dev_dupe_cleaner(finals):
1119
    """
1120
    Clean duplicate autoloader entries.
1121
1122
    :param finals: Dict of URL:content-length pairs.
1123
    :type finals: dict(str: str)
1124
    """
1125 5
    revo = dev_dupe_dicter(finals)
1126 5
    dupelist = [val for key, val in revo.items() if len(val) > 1]
1127 5
    finals = dev_dupe_remover(finals, dupelist)
1128
    return finals
1129