bbarchivist.networkutils   F
last analyzed

Complexity

Total Complexity 125

Size/Duplication

Total Lines 1133
Duplicated Lines 0 %

Test Coverage

Coverage 100%

Importance

Changes 0
Metric Value
eloc 445
dl 0
loc 1133
rs 2
c 0
b 0
f 0
ccs 437
cts 437
cp 1
wmc 125

58 Functions

Rating   Name   Duplication   Size   Complexity  
A download_bootstrap() 0 29 5
A table_headers() 0 9 1
A grab_pem() 0 10 3
A ptcrb_item_cleaner() 0 41 2
A kernel_scraper() 0 23 4
A loader_page_scraper_bbm() 0 13 2
A loader_page_printer() 0 11 1
A remove_empty_download() 0 9 2
A bulk_droid_skeletons() 0 26 4
A series_generator() 0 9 1
A getcode() 0 18 2
A ptcrb_scraper() 0 19 1
A table_header_filter() 0 9 1
A download_writer() 0 29 4
A droid_scanner() 0 21 3
A make_droid_skeleton() 0 24 3
A droid_scanner_worker() 0 19 3
A dev_dupe_remover() 0 15 4
A get_length() 0 20 3
A availability() 0 14 1
A pem_wrapper() 0 14 1
A base_metadata_url() 0 10 2
A make_droid_skeleton_bbm() 0 23 2
A prepare_droid_list() 0 12 2
A chunker() 0 12 1
A download() 0 21 1
A devalpha_urls() 0 23 2
A space_pad() 0 13 2
A carrier_checker() 0 21 1
A make_droid_skeleton_og() 0 27 2
A ptcrb_cleaner_spaces() 0 14 3
A root_generator() 0 22 2
A loader_page_chunker_og() 0 18 2
A loader_page_scraper() 0 11 1
A try_try_again() 0 23 5
A sim_metadata() 0 10 1
A dev_dupe_cleaner() 0 11 1
A generic_session() 0 14 2
A return_npc() 0 11 1
A download_android_tools() 0 17 3
A carrier_query() 0 31 3
A devalpha_urls_bootstrap() 0 18 3
A generic_soup_parser() 0 14 1
A sr_lookup() 0 19 1
A base_metadata() 0 17 1
A loader_page_chunker_bbm() 0 10 2
A ptcrb_cleaner_multios() 0 12 2
A sr_lookup_poster() 0 22 3
A available_bundle_lookup() 0 25 1
A devalpha_urls_serieshandler() 0 15 3
A ndk_metadata() 0 11 1
A unicode_filter() 0 8 1
A sr_lookup_bootstrap() 0 30 5
A runtime_metadata() 0 10 1
A clean_availability() 0 14 3
A devalpha_urls_bulk() 0 23 3
A loader_page_scraper_og() 0 13 2
A dev_dupe_dicter() 0 11 2

How to fix   Complexity   

Complexity

Complex classes like bbarchivist.networkutils often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
#!/usr/bin/env python3
2 5
"""This module is used for network connections; APIs, downloading, etc."""
3
4 5
import concurrent.futures  # multiprocessing/threading
5 5
import glob  # pem file lookup
6 5
import os  # filesystem read
7 5
import re  # regexes
8
9 5
import requests  # downloading
10 5
import user_agent  # user agent
11 5
from bbarchivist import utilities  # parse filesize
12 5
from bbarchivist import xmlutils  # xml work
13 5
from bbarchivist.bbconstants import SERVERS  # lookup servers
14 5
from bs4 import BeautifulSoup  # scraping
15
16 5
__author__ = "Thurask"
17 5
__license__ = "WTFPL v2"
18 5
__copyright__ = "2015-2019 Thurask"
19
20
21 5
def grab_pem():
22
    """
23
    Work with either local cacerts or system cacerts.
24
    """
25 5
    try:
26 5
        pemfile = glob.glob(os.path.join(os.getcwd(), "cacert.pem"))[0]
27 5
    except IndexError:
28 5
        return requests.certs.where()  # no local cacerts
29
    else:
30 5
        return os.path.abspath(pemfile)  # local cacerts
31
32
33 5
def pem_wrapper(method):
34
    """
35
    Decorator to set REQUESTS_CA_BUNDLE.
36
37
    :param method: Method to use.
38
    :type method: function
39
    """
40 5
    def wrapper(*args, **kwargs):
41
        """
42
        Set REQUESTS_CA_BUNDLE before doing function.
43
        """
44 5
        os.environ["REQUESTS_CA_BUNDLE"] = grab_pem()
45 5
        return method(*args, **kwargs)
46 5
    return wrapper
47
48
49 5
def try_try_again(method):
50
    """
51
    Decorator to absorb timeouts, proxy errors, and other common exceptions.
52
53
    :param method: Method to use.
54
    :type method: function
55
    """
56 5
    def wrapper(*args, **kwargs):
57
        """
58
        Try function, try it again up to five times, and leave gracefully.
59
        """
60 5
        tries = 5
61 5
        for _ in range(tries):
62 5
            try:
63 5
                result = method(*args, **kwargs)
64 5
            except (requests.exceptions.Timeout, requests.exceptions.ConnectionError, requests.exceptions.ProxyError):
65 5
                continue
66
            else:
67 5
                break
68
        else:
69 5
            result = None
70 5
        return result
71 5
    return wrapper
72
73
74 5
def generic_session(session=None, uagent_type=None):
75
    """
76
    Create a Requests session object on the fly, if need be.
77
78
    :param session: Requests session object, created if this is None.
79
    :type session: requests.Session()
80
81
    :param uagent_type: To force a desktop/tablet/smartphone User-Agent. Default is None.
82
    :type uagent_type: string
83
    """
84 5
    sess = requests.Session() if session is None else session
85 5
    uagent = user_agent.generate_user_agent(device_type=uagent_type)
86 5
    sess.headers.update({"User-Agent": uagent})
87 5
    return sess
88
89
90 5
def generic_soup_parser(url, session=None):
91
    """
92
    Get a BeautifulSoup HTML parser for some URL.
93
94
    :param url: The URL to check.
95
    :type url: str
96
97
    :param session: Requests session object, default is created on the fly.
98
    :type session: requests.Session()
99
    """
100 5
    session = generic_session(session)
101 5
    req = session.get(url)
102 5
    soup = BeautifulSoup(req.content, "html.parser")
103 5
    return soup
104
105
106 5
@pem_wrapper
107 5
def get_length(url, session=None):
108
    """
109
    Get content-length header from some URL.
110
111
    :param url: The URL to check.
112
    :type url: str
113
114
    :param session: Requests session object, default is created on the fly.
115
    :type session: requests.Session()
116
    """
117 5
    session = generic_session(session)
118 5
    if url is None:
119 5
        return 0
120 5
    try:
121 5
        heads = session.head(url)
122 5
        fsize = heads.headers['content-length']
123 5
        return int(fsize)
124 5
    except requests.ConnectionError:
125 5
        return 0
126
127
128 5
@pem_wrapper
129 5
def download(url, output_directory=None, session=None):
130
    """
131
    Download file from given URL.
132
133
    :param url: URL to download from.
134
    :type url: str
135
136
    :param output_directory: Download folder. Default is local.
137
    :type output_directory: str
138
139
    :param session: Requests session object, default is created on the fly.
140
    :type session: requests.Session()
141
    """
142 5
    session = generic_session(session)
143 5
    output_directory = utilities.dirhandler(output_directory, os.getcwd())
144 5
    lfname = url.split('/')[-1]
145 5
    sname = utilities.stripper(lfname)
146 5
    fname = os.path.join(output_directory, lfname)
147 5
    download_writer(url, fname, lfname, sname, session)
148 5
    remove_empty_download(fname)
149
150
151 5
def remove_empty_download(fname):
152
    """
153
    Remove file if it's empty.
154
155
    :param fname: File path.
156
    :type fname: str
157
    """
158 5
    if os.stat(fname).st_size == 0:
159 5
        os.remove(fname)
160
161
162 5
def download_writer(url, fname, lfname, sname, session=None):
163
    """
164
    Download file and write to disk.
165
166
    :param url: URL to download from.
167
    :type url: str
168
169
    :param fname: File path.
170
    :type fname: str
171
172
    :param lfname: Long filename.
173
    :type lfname: str
174
175
    :param sname: Short name, for printing to screen.
176
    :type sname: str
177
178
    :param session: Requests session object, default is created on the fly.
179
    :type session: requests.Session()
180
    """
181 5
    with open(fname, "wb") as file:
182 5
        req = session.get(url, stream=True)
183 5
        clength = req.headers['content-length']
184 5
        fsize = utilities.fsizer(clength)
185 5
        if req.status_code == 200:  # 200 OK
186 5
            print("DOWNLOADING {0} [{1}]".format(sname, fsize))
187 5
            for chunk in req.iter_content(chunk_size=1024):
188 5
                file.write(chunk)
189
        else:
190 5
            print("ERROR: HTTP {0} IN {1}".format(req.status_code, lfname))
191
192
193 5
def download_bootstrap(urls, outdir=None, workers=5, session=None):
194
    """
195
    Run downloaders for each file in given URL iterable.
196
197
    :param urls: URLs to download.
198
    :type urls: list
199
200
    :param outdir: Download folder. Default is handled in :func:`download`.
201
    :type outdir: str
202
203
    :param workers: Number of worker processes. Default is 5.
204
    :type workers: int
205
206
    :param session: Requests session object, default is created on the fly.
207
    :type session: requests.Session()
208
    """
209 5
    workers = len(urls) if len(urls) < workers else workers
210 5
    spinman = utilities.SpinManager()
211 5
    with concurrent.futures.ThreadPoolExecutor(max_workers=workers) as xec:
212 5
        try:
213 5
            spinman.start()
214 5
            for url in urls:
215 5
                xec.submit(download, url, outdir, session)
216 5
        except (KeyboardInterrupt, SystemExit):
217 5
            xec.shutdown()
218 5
            spinman.stop()
219 5
    spinman.stop()
220 5
    utilities.spinner_clear()
221 5
    utilities.line_begin()
222
223
224 5
def download_android_tools(downloaddir=None):
225
    """
226
    Download Android SDK platform tools.
227
228
    :param downloaddir: Directory name, default is "plattools".
229
    :type downloaddir: str
230
    """
231 5
    if downloaddir is None:
232 5
        downloaddir = "plattools"
233 5
    if os.path.exists(downloaddir):
234 5
        os.removedirs(downloaddir)
235 5
    os.mkdir(downloaddir)
236 5
    platforms = ("windows", "linux", "darwin")
237 5
    baseurl = "https://dl.google.com/android/repository/platform-tools-latest"
238 5
    dlurls = ["{1}-{0}.zip".format(plat, baseurl) for plat in platforms]
239 5
    sess = generic_session()
240 5
    download_bootstrap(dlurls, outdir="plattools", session=sess)
241
242
243 5
@pem_wrapper
244 5
def getcode(url, session=None):
245
    """
246
    Return status code of given URL.
247
248
    :param url: URL to check.
249
    :type url: str
250
251
    :param session: Requests session object, default is created on the fly.
252
    :type session: requests.Session()
253
    """
254 5
    session = generic_session(session)
255 5
    try:
256 5
        shead = session.head(url)
257 5
        status = int(shead.status_code)
258 5
        return status
259 5
    except requests.ConnectionError:
260 5
        return 404
261
262
263 5
@pem_wrapper
264 5
def availability(url, session=None):
265
    """
266
    Check HTTP status code of given URL.
267
    200 or 301-308 is OK, else is not.
268
269
    :param url: URL to check.
270
    :type url: str
271
272
    :param session: Requests session object, default is created on the fly.
273
    :type session: requests.Session()
274
    """
275 5
    status = getcode(url, session)
276 5
    return status == 200 or 300 < status <= 308
277
278
279 5
def clean_availability(results, server):
280
    """
281
    Clean availability for autolookup script.
282
283
    :param results: Result dict.
284
    :type results: dict(str: str)
285
286
    :param server: Server, key for result dict.
287
    :type server: str
288
    """
289 5
    marker = "PD" if server == "p" else server.upper()
290 5
    rel = results[server.lower()]
291 5
    avail = marker if rel != "SR not in system" and rel is not None else "  "
292 5
    return rel, avail
293
294
295 5
@pem_wrapper
296 5
def carrier_checker(mcc, mnc, session=None):
297
    """
298
    Query BlackBerry World to map a MCC and a MNC to a country and carrier.
299
300
    :param mcc: Country code.
301
    :type mcc: int
302
303
    :param mnc: Network code.
304
    :type mnc: int
305
306
    :param session: Requests session object, default is created on the fly.
307
    :type session: requests.Session()
308
    """
309 5
    session = generic_session(session)
310 5
    baseurl = "http://appworld.blackberry.com/ClientAPI/checkcarrier"
311 5
    url = "{2}?homemcc={0}&homemnc={1}&devicevendorid=-1&pin=0".format(mcc, mnc, baseurl)
312 5
    uagent = {'User-Agent': 'AppWorld/5.1.0.60'}
313 5
    req = session.get(url, headers=uagent)
314 5
    country, carrier = xmlutils.cchecker_get_tags(req.text)
315 5
    return country, carrier
316
317
318 5
def return_npc(mcc, mnc):
319
    """
320
    Format MCC and MNC into a NPC.
321
322
    :param mcc: Country code.
323
    :type mcc: int
324
325
    :param mnc: Network code.
326
    :type mnc: int
327
    """
328 5
    return "{0}{1}30".format(str(mcc).zfill(3), str(mnc).zfill(3))
329
330
331 5
@pem_wrapper
332 5
def carrier_query(npc, device, upgrade=False, blitz=False, forced=None, session=None):
333
    """
334
    Query BlackBerry servers, check which update is out for a carrier.
335
336
    :param npc: MCC + MNC (see `func:return_npc`)
337
    :type npc: int
338
339
    :param device: Hexadecimal hardware ID.
340
    :type device: str
341
342
    :param upgrade: Whether to use upgrade files. False by default.
343
    :type upgrade: bool
344
345
    :param blitz: Whether or not to create a blitz package. False by default.
346
    :type blitz: bool
347
348
    :param forced: Force a software release.
349
    :type forced: str
350
351
    :param session: Requests session object, default is created on the fly.
352
    :type session: requests.Session()
353
    """
354 5
    session = generic_session(session)
355 5
    upg = "upgrade" if upgrade else "repair"
356 5
    forced = "latest" if forced is None else forced
357 5
    url = "https://cs.sl.blackberry.com/cse/updateDetails/2.2/"
358 5
    query = xmlutils.prep_carrier_query(npc, device, upg, forced)
359 5
    header = {"Content-Type": "text/xml;charset=UTF-8"}
360 5
    req = session.post(url, headers=header, data=query)
361 5
    return xmlutils.parse_carrier_xml(req.text, blitz)
362
363
364 5
@pem_wrapper
365 5
def sr_lookup(osver, server, session=None):
366
    """
367
    Software release lookup, with choice of server.
368
    :data:`bbarchivist.bbconstants.SERVERLIST` for server list.
369
370
    :param osver: OS version to lookup, 10.x.y.zzzz.
371
    :type osver: str
372
373
    :param server: Server to use.
374
    :type server: str
375
376
    :param session: Requests session object, default is created on the fly.
377
    :type session: requests.Session()
378
    """
379 5
    query = xmlutils.prep_sr_lookup(osver)
380 5
    reqtext = sr_lookup_poster(query, server, session)
381 5
    packtext = xmlutils.parse_sr_lookup(reqtext)
382 5
    return packtext
383
384
385 5
def sr_lookup_poster(query, server, session=None):
386
    """
387
    Post the XML payload for a software release lookup.
388
389
    :param query: XML payload.
390
    :type query: str
391
392
    :param server: Server to use.
393
    :type server: str
394
395
    :param session: Requests session object, default is created on the fly.
396
    :type session: requests.Session()
397
    """
398 5
    session = generic_session(session)
399 5
    header = {"Content-Type": "text/xml;charset=UTF-8"}
400 5
    try:
401 5
        req = session.post(server, headers=header, data=query, timeout=1)
402 5
    except (requests.exceptions.Timeout, requests.exceptions.ConnectionError):
403 5
        reqtext = "SR not in system"
404
    else:
405 5
        reqtext = req.text
406 5
    return reqtext
407
408
409 5
def sr_lookup_bootstrap(osv, session=None, no2=False):
410
    """
411
    Run lookups for each server for given OS.
412
413
    :param osv: OS to check.
414
    :type osv: str
415
416
    :param session: Requests session object, default is created on the fly.
417
    :type session: requests.Session()
418
419
    :param no2: Whether to skip Alpha2/Beta2 servers. Default is false.
420
    :type no2: bool
421
    """
422 5
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as xec:
423 5
        try:
424 5
            results = {
425
                "p": None,
426
                "a1": None,
427
                "a2": None,
428
                "b1": None,
429
                "b2": None
430
            }
431 5
            if no2:
432 5
                del results["a2"]
433 5
                del results["b2"]
434 5
            for key in results:
435 5
                results[key] = xec.submit(sr_lookup, osv, SERVERS[key], session).result()
436 5
            return results
437 5
        except KeyboardInterrupt:
438 5
            xec.shutdown(wait=False)
439
440
441 5
@pem_wrapper
442 5
def available_bundle_lookup(mcc, mnc, device, session=None):
443
    """
444
    Check which software releases were ever released for a carrier.
445
446
    :param mcc: Country code.
447
    :type mcc: int
448
449
    :param mnc: Network code.
450
    :type mnc: int
451
452
    :param device: Hexadecimal hardware ID.
453
    :type device: str
454
455
    :param session: Requests session object, default is created on the fly.
456
    :type session: requests.Session()
457
    """
458 5
    session = generic_session(session)
459 5
    server = "https://cs.sl.blackberry.com/cse/availableBundles/1.0.0/"
460 5
    npc = return_npc(mcc, mnc)
461 5
    query = xmlutils.prep_available_bundle(device, npc)
462 5
    header = {"Content-Type": "text/xml;charset=UTF-8"}
463 5
    req = session.post(server, headers=header, data=query)
464 5
    bundlelist = xmlutils.parse_available_bundle(req.text)
465 5
    return bundlelist
466
467
468 5
@pem_wrapper
469 5
def ptcrb_scraper(ptcrbid, session=None):
470
    """
471
    Get the PTCRB results for a given device.
472
473
    :param ptcrbid: Numerical ID from PTCRB (end of URL).
474
    :type ptcrbid: str
475
476
    :param session: Requests session object, default is created on the fly.
477
    :type session: requests.Session()
478
    """
479 5
    baseurl = "https://www.ptcrb.com/certified-devices/device-details/?model={0}".format(ptcrbid)
480 5
    sess = generic_session(session, uagent_type="desktop")
481 5
    soup = generic_soup_parser(baseurl, sess)
482 5
    certtable = soup.find_all("table")[1]
483 5
    tds = certtable.find_all("td")[1::2]  # every other
484 5
    prelimlist = [tdx.text for tdx in tds]
485 5
    cleanlist = [ptcrb_item_cleaner(item.strip()) for item in prelimlist]
486 5
    return cleanlist
487
488
489 5
def space_pad(instring, minlength):
490
    """
491
    Pad a string with spaces until it's the minimum length.
492
493
    :param instring: String to pad.
494
    :type instring: str
495
496
    :param minlength: Pad while len(instring) < minlength.
497
    :type minlength: int
498
    """
499 5
    while len(instring) < minlength:
500 5
        instring += " "
501 5
    return instring
502
503
504 5
def ptcrb_cleaner_multios(item):
505
    """
506
    Discard multiple entries for "OS".
507
508
    :param item: The item to clean.
509
    :type item: str
510
    """
511 5
    if item.count("OS") > 1:
512 5
        templist = item.split("OS")
513 5
        templist[0] = "OS"
514 5
        item = "".join([templist[0], templist[1]])
515 5
    return item
516
517
518 5
def ptcrb_cleaner_spaces(item):
519
    """
520
    Pad item with spaces to the right length.
521
522
    :param item: The item to clean.
523
    :type item: str
524
    """
525 5
    spaclist = item.split(" ")
526 5
    if len(spaclist) > 1:
527 5
        spaclist[1] = space_pad(spaclist[1], 11)
528 5
    if len(spaclist) > 3:
529 5
        spaclist[3] = space_pad(spaclist[3], 11)
530 5
    item = " ".join(spaclist)
531 5
    return item
532
533
534 5
def ptcrb_item_cleaner(item):
535
    """
536
    Cleanup poorly formatted PTCRB entries written by an intern.
537
538
    :param item: The item to clean.
539
    :type item: str
540
    """
541 5
    item = item.replace("<td>", "")
542 5
    item = item.replace("</td>", "")
543 5
    item = item.replace("\n", "")
544 5
    item = item.replace("SW: OS", "OS")
545 5
    item = item.replace("Software Version: OS", "OS")
546 5
    item = item.replace(" (SR", ", SR")
547 5
    item = re.sub(r"\s?\((.*)$", "", item)
548 5
    item = re.sub(r"\sSV.*$", "", item)
549 5
    item = item.replace(")", "")
550 5
    item = item.replace(". ", ".")
551 5
    item = item.replace(";", "")
552 5
    item = item.replace("version", "Version")
553 5
    item = item.replace("Verison", "Version")
554 5
    item = ptcrb_cleaner_multios(item)
555 5
    item = item.replace("SR10", "SR 10")
556 5
    item = item.replace("SR", "SW Release")
557 5
    item = item.replace(" Version:", ":")
558 5
    item = item.replace("Version ", " ")
559 5
    item = item.replace(":1", ": 1")
560 5
    item = item.replace(", ", " ")
561 5
    item = item.replace(",", " ")
562 5
    item = item.replace("Software", "SW")
563 5
    item = item.replace("  ", " ")
564 5
    item = item.replace("OS ", "OS: ")
565 5
    item = item.replace("Radio ", "Radio: ")
566 5
    item = item.replace("Release ", "Release: ")
567 5
    item = ptcrb_cleaner_spaces(item)
568 5
    item = item.strip()
569 5
    item = item.replace("\r", "")
570 5
    if item.startswith("10"):
571 5
        item = "OS: {0}".format(item)
572 5
    item = item.replace(":    ", ": ")
573 5
    item = item.replace(":   ", ": ")
574 5
    return item
575
576
577 5
@pem_wrapper
578 5
def kernel_scraper(utils=False, session=None):
579
    """
580
    Scrape BlackBerry's GitHub kernel repo for available branches.
581
582
    :param utils: Check android-utils repo instead of android-linux-kernel. Default is False.
583
    :type utils: bool
584
585
    :param session: Requests session object, default is created on the fly.
586
    :type session: requests.Session()
587
    """
588 5
    repo = "android-utils" if utils else "android-linux-kernel"
589 5
    kernlist = []
590 5
    sess = generic_session(session)
591 5
    for page in range(1, 10):
592 5
        url = "https://github.com/blackberry/{0}/branches/all?page={1}".format(repo, page)
593 5
        soup = generic_soup_parser(url, sess)
594 5
        if soup.find("div", {"class": "no-results-message"}):
595 5
            break
596
        else:
597 5
            text = soup.get_text()
598 5
            kernlist.extend(re.findall(r"msm[0-9]{4}\/[A-Z0-9]{6}", text, re.IGNORECASE))
599 5
    return kernlist
600
601
602 5
def root_generator(folder, build, variant="common"):
603
    """
604
    Generate roots for the SHAxxx hash lookup URLs.
605
606
    :param folder: Dictionary of variant: loader name pairs.
607
    :type folder: dict(str: str)
608
609
    :param build: Build to check, 3 letters + 3 numbers.
610
    :type build: str
611
612
    :param variant: Autoloader variant. Default is "common".
613
    :type variant: str
614
    """
615
    #Priv specific
616 5
    privx = "bbfoundation/hashfiles_priv/{0}".format(folder[variant])
617
    #DTEK50 specific
618 5
    dtek50x = "bbSupport/DTEK50" if build[:3] == "AAF" else "bbfoundation/hashfiles_priv/dtek50"
619
    #DTEK60 specific
620 5
    dtek60x = dtek50x  # still uses dtek50 folder, for some reason
621
    #Pack it up
622 5
    roots = {"Priv": privx, "DTEK50": dtek50x, "DTEK60": dtek60x}
623 5
    return roots
624
625
626 5
def make_droid_skeleton_bbm(method, build, device, variant="common"):
627
    """
628
    Make an Android autoloader/hash URL, on the BB Mobile site.
629
630
    :param method: None for regular OS links, "sha256/512" for SHA256 or 512 hash.
631
    :type method: str
632
633
    :param build: Build to check, 3 letters + 3 numbers.
634
    :type build: str
635
636
    :param device: Device to check.
637
    :type device: str
638
639
    :param variant: Autoloader variant. Default is "common".
640
    :type variant: str
641
    """
642 5
    devices = {"KEYone": "qc8953", "Motion": "qc8953krypton", "KEY2": "sdm660"}
643 5
    base = "bbry_{2}_autoloader_user-{0}-{1}".format(variant, build.upper(), devices[device])
644 5
    if method is None:
645 5
        skel = "http://54.247.87.13/softwareupgrade/BBM/{0}.zip".format(base)
646
    else:
647 5
        skel = "http://54.247.87.13/softwareupgrade/BBM/{0}.{1}sum".format(base, method.lower())
648 5
    return skel
649
650
651 5
def make_droid_skeleton_og(method, build, device, variant="common"):
652
    """
653
    Make an Android autoloader/hash URL, on the original site.
654
655
    :param method: None for regular OS links, "sha256/512" for SHA256 or 512 hash.
656
    :type method: str
657
658
    :param build: Build to check, 3 letters + 3 numbers.
659
    :type build: str
660
661
    :param device: Device to check.
662
    :type device: str
663
664
    :param variant: Autoloader variant. Default is "common".
665
    :type variant: str
666
    """
667 5
    folder = {"vzw-vzw": "verizon", "na-att": "att", "na-tmo": "tmo", "common": "default"}
668 5
    devices = {"Priv": "qc8992", "DTEK50": "qc8952_64_sfi", "DTEK60": "qc8996"}
669 5
    roots = root_generator(folder, build, variant)
670 5
    base = "bbry_{2}_autoloader_user-{0}-{1}".format(variant, build.upper(), devices[device])
671 5
    if method is None:
672 5
        baseurl = "https://bbapps.download.blackberry.com/Priv"
673 5
        skel = "{1}/{0}.zip".format(base, baseurl)
674
    else:
675 5
        baseurl = "https://ca.blackberry.com/content/dam"
676 5
        skel = "{3}/{1}/{0}.{2}sum".format(base, roots[device], method.lower(), baseurl)
677 5
    return skel
678
679
680 5
def make_droid_skeleton(method, build, device, variant="common"):
681
    """
682
    Make an Android autoloader/hash URL.
683
684
    :param method: None for regular OS links, "sha256/512" for SHA256 or 512 hash.
685
    :type method: str
686
687
    :param build: Build to check, 3 letters + 3 numbers.
688
    :type build: str
689
690
    :param device: Device to check.
691
    :type device: str
692
693
    :param variant: Autoloader variant. Default is "common".
694
    :type variant: str
695
    """
696
    # No Aurora
697 5
    oglist = ("Priv", "DTEK50", "DTEK60")  # BlackBerry
698 5
    bbmlist = ("KEYone", "Motion", "KEY2", "KEY2LE")   # BB Mobile
699 5
    if device in oglist:
700 5
        skel = make_droid_skeleton_og(method, build, device, variant)
701 5
    elif device in bbmlist:
702 5
        skel = make_droid_skeleton_bbm(method, build, device, variant)
703 5
    return skel
704
705
706 5
def bulk_droid_skeletons(devs, build, method=None):
707
    """
708
    Prepare list of Android autoloader/hash URLs.
709
710
    :param devs: List of devices.
711
    :type devs: list(str)
712
713
    :param build: Build to check, 3 letters + 3 numbers.
714
    :type build: str
715
716
    :param method: None for regular OS links, "sha256/512" for SHA256 or 512 hash.
717
    :type method: str
718
    """
719 5
    carrier_variants = {
720
        "Priv": ("common", "vzw-vzw", "na-tmo", "na-att"),
721
        "KEYone": ("common", "usa-sprint", "global-att", "china-china")
722
    }
723 5
    common_variants = ("common", )  # for single-variant devices
724 5
    carrier_devices = ("Priv", )  # add KEYone when verified
725 5
    skels = []
726 5
    for dev in devs:
727 5
        varlist = carrier_variants[dev] if dev in carrier_devices else common_variants
728 5
        for var in varlist:
729 5
            skel = make_droid_skeleton(method, build, dev, var)
730 5
            skels.append(skel)
731 5
    return skels
732
733
734 5
def prepare_droid_list(device):
735
    """
736
    Convert single devices to a list, if necessary.
737
738
    :param device: Device to check.
739
    :type device: str
740
    """
741 5
    if isinstance(device, list):
742 5
        devs = device
743
    else:
744 5
        devs = [device]
745 5
    return devs
746
747
748 5
def droid_scanner(build, device, method=None, session=None):
749
    """
750
    Check for Android autoloaders on BlackBerry's site.
751
752
    :param build: Build to check, 3 letters + 3 numbers.
753
    :type build: str
754
755
    :param device: Device to check.
756
    :type device: str
757
758
    :param method: None for regular OS links, "sha256/512" for SHA256 or 512 hash.
759
    :type method: str
760
761
    :param session: Requests session object, default is created on the fly.
762
    :type session: requests.Session()
763
    """
764 5
    devs = prepare_droid_list(device)
765 5
    skels = bulk_droid_skeletons(devs, build, method)
766 5
    with concurrent.futures.ThreadPoolExecutor(max_workers=len(skels)) as xec:
767 5
        results = droid_scanner_worker(xec, skels, session)
768 5
    return results if results else None
769
770
771 5
def droid_scanner_worker(xec, skels, session=None):
772
    """
773
    Worker to check for Android autoloaders.
774
775
    :param xec: ThreadPoolExecutor instance.
776
    :type xec: concurrent.futures.ThreadPoolExecutor
777
778
    :param skels: List of skeleton formats.
779
    :type skels: list(str)
780
781
    :param session: Requests session object, default is created on the fly.
782
    :type session: requests.Session()
783
    """
784 5
    results = []
785 5
    for skel in skels:
786 5
        avail = xec.submit(availability, skel, session)
787 5
        if avail.result():
788 5
            results.append(skel)
789 5
    return results
790
791
792 5
def chunker(iterable, inc):
793
    """
794
    Convert an iterable into a list of inc sized lists.
795
796
    :param iterable: Iterable to chunk.
797
    :type iterable: list/tuple/string
798
799
    :param inc: Increment; how big each chunk is.
800
    :type inc: int
801
    """
802 5
    chunks = [iterable[x:x+inc] for x in range(0, len(iterable), inc)]
803 5
    return chunks
804
805
806 5
def unicode_filter(intext):
807
    """
808
    Remove Unicode crap.
809
810
    :param intext: Text to filter.
811
    :type intext: str
812
    """
813 5
    return intext.replace("\u2013", "").strip()
814
815
816 5
def table_header_filter(ptag):
817
    """
818
    Validate p tag, to see if it's relevant.
819
820
    :param ptag: P tag.
821
    :type ptag: bs4.element.Tag
822
    """
823 5
    valid = ptag.find("b") and "BlackBerry" in ptag.text and not "experts" in ptag.text
824 5
    return valid
825
826
827 5
def table_headers(pees):
828
    """
829
    Generate table headers from list of p tags.
830
831
    :param pees: List of p tags.
832
    :type pees: list(bs4.element.Tag)
833
    """
834 5
    bolds = [x.text for x in pees if table_header_filter(x)]
835 5
    return bolds
836
837
838 5
@pem_wrapper
839 5
def loader_page_scraper(session=None):
840
    """
841
    Return scraped autoloader pages.
842
843
    :param session: Requests session object, default is created on the fly.
844
    :type session: requests.Session()
845
    """
846 5
    session = generic_session(session)
847 5
    loader_page_scraper_og(session)
848 5
    loader_page_scraper_bbm(session)
849
850
851 5
def loader_page_scraper_og(session=None):
852
    """
853
    Return scraped autoloader page, original site.
854
855
    :param session: Requests session object, default is created on the fly.
856
    :type session: requests.Session()
857
    """
858 5
    url = "https://ca.blackberry.com/support/smartphones/Android-OS-Reload.html"
859 5
    soup = generic_soup_parser(url, session)
860 5
    tables = soup.find_all("table")
861 5
    headers = table_headers(soup.find_all("p"))
862 5
    for idx, table in enumerate(tables):
863 5
        loader_page_chunker_og(idx, table, headers)
864
865
866 5
def loader_page_scraper_bbm(session=None):
867
    """
868
    Return scraped autoloader page, new site.
869
870
    :param session: Requests session object, default is created on the fly.
871
    :type session: requests.Session()
872
    """
873 5
    url = "https://www.blackberrymobile.com/support/reload-software/"
874 5
    soup = generic_soup_parser(url, session)
875 5
    ulls = soup.find_all("ul", {"class": re.compile("list-two special-.")})[1:]
876 5
    print("~~~BlackBerry KEYone~~~")
877 5
    for ull in ulls:
878 5
        loader_page_chunker_bbm(ull)
879
880
881 5
def loader_page_chunker_og(idx, table, headers):
882
    """
883
    Given a loader page table, chunk it into lists of table cells.
884
885
    :param idx: Index of enumerating tables.
886
    :type idx: int
887
888
    :param table: HTML table tag.
889
    :type table: bs4.element.Tag
890
891
    :param headers: List of table headers.
892
    :type headers: list(str)
893
    """
894 5
    print("~~~{0}~~~".format(headers[idx]))
895 5
    chunks = chunker(table.find_all("td"), 4)
896 5
    for chunk in chunks:
897 5
        loader_page_printer(chunk)
898 5
    print(" ")
899
900
901 5
def loader_page_chunker_bbm(ull):
902
    """
903
    Given a loader page list, chunk it into lists of list items.
904
905
    :param ull: HTML unordered list tag.
906
    :type ull: bs4.element.Tag
907
    """
908 5
    chunks = chunker(ull.find_all("li"), 3)
909 5
    for chunk in chunks:
910 5
        loader_page_printer(chunk)
911
912
913 5
def loader_page_printer(chunk):
914
    """
915
    Print individual cell texts given a list of table cells.
916
917
    :param chunk: List of td tags.
918
    :type chunk: list(bs4.element.Tag)
919
    """
920 5
    key = unicode_filter(chunk[0].text)
921 5
    ver = unicode_filter(chunk[1].text)
922 5
    link = unicode_filter(chunk[2].find("a")["href"])
923 5
    print("{0}\n    {1}: {2}".format(key, ver, link))
924
925
926 5
@pem_wrapper
927 5
def base_metadata(url, session=None):
928
    """
929
    Get BBNDK metadata, base function.
930
931
    :param url: URL to check.
932
    :type url: str
933
934
    :param session: Requests session object, default is created on the fly.
935
    :type session: requests.Session()
936
    """
937 5
    session = generic_session(session)
938 5
    req = session.get(url)
939 5
    data = req.content
940 5
    entries = data.split(b"\n")
941 5
    metadata = [entry.split(b",")[1].decode("utf-8") for entry in entries if entry]
942 5
    return metadata
943
944
945 5
def base_metadata_url(alternate=None):
946
    """
947
    Return metadata URL.
948
949
    :param alternate: If the URL is for the simulator metadata. Default is False.
950
    :type alternate: str
951
    """
952 5
    baseurl = "http://downloads.blackberry.com/upr/developers/update/bbndk"
953 5
    tail = "{0}/{0}_metadata".format(alternate) if alternate is not None else "metadata"
954 5
    return "{0}/{1}".format(baseurl, tail)
955
956
957 5
def ndk_metadata(session=None):
958
    """
959
    Get BBNDK target metadata.
960
961
    :param session: Requests session object, default is created on the fly.
962
    :type session: requests.Session()
963
    """
964 5
    ndkurl = base_metadata_url()
965 5
    data = base_metadata(ndkurl, session)
966 5
    metadata = [entry for entry in data if entry.startswith(("10.0", "10.1", "10.2"))]
967 5
    return metadata
968
969
970 5
def sim_metadata(session=None):
971
    """
972
    Get BBNDK simulator metadata.
973
974
    :param session: Requests session object, default is created on the fly.
975
    :type session: requests.Session()
976
    """
977 5
    simurl = base_metadata_url("simulator")
978 5
    metadata = base_metadata(simurl, session)
979 5
    return metadata
980
981
982 5
def runtime_metadata(session=None):
983
    """
984
    Get BBNDK runtime metadata.
985
986
    :param session: Requests session object, default is created on the fly.
987
    :type session: requests.Session()
988
    """
989 5
    rturl = base_metadata_url("runtime")
990 5
    metadata = base_metadata(rturl, session)
991 5
    return metadata
992
993
994 5
def series_generator(osversion):
995
    """
996
    Generate series/branch name from OS version.
997
998
    :param osversion: OS version.
999
    :type osversion: str
1000
    """
1001 5
    splits = osversion.split(".")
1002 5
    return "BB{0}_{1}_{2}".format(*splits[0:3])
1003
1004
1005 5
@pem_wrapper
1006 5
def devalpha_urls(osversion, skel, session=None):
1007
    """
1008
    Check individual Dev Alpha autoloader URLs.
1009
1010
    :param osversion: OS version.
1011
    :type osversion: str
1012
1013
    :param skel: Individual skeleton format to try.
1014
    :type skel: str
1015
1016
    :param session: Requests session object, default is created on the fly.
1017
    :type session: requests.Session()
1018
    """
1019 5
    session = generic_session(session)
1020 5
    baseurl = "http://downloads.blackberry.com/upr/developers/downloads"
1021 5
    url = "{2}/{0}{1}.exe".format(skel, osversion, baseurl)
1022 5
    req = session.head(url)
1023 5
    if req.status_code == 200:
1024 5
        finals = (url, req.headers["content-length"])
1025
    else:
1026 5
        finals = ()
1027 5
    return finals
1028
1029
1030 5
def devalpha_urls_serieshandler(osversion, skeletons):
1031
    """
1032
    Process list of candidate Dev Alpha autoloader URLs.
1033
1034
    :param osversion: OS version.
1035
    :type osversion: str
1036
1037
    :param skeletons: List of skeleton formats to try.
1038
    :type skeletons: list
1039
    """
1040 5
    skels = skeletons
1041 5
    for idx, skel in enumerate(skeletons):
1042 5
        if "<SERIES>" in skel:
1043 5
            skels[idx] = skel.replace("<SERIES>", series_generator(osversion))
1044 5
    return skels
1045
1046
1047 5
def devalpha_urls_bulk(osversion, skeletons, xec, session=None):
1048
    """
1049
    Construct list of valid Dev Alpha autoloader URLs.
1050
1051
    :param osversion: OS version.
1052
    :type osversion: str
1053
1054
    :param skeletons: List of skeleton formats to try.
1055
    :type skeletons: list
1056
1057
    :param xec: ThreadPoolExecutor instance.
1058
    :type xec: concurrent.futures.ThreadPoolExecutor
1059
1060
    :param session: Requests session object, default is created on the fly.
1061
    :type session: requests.Session()
1062
    """
1063 5
    finals = {}
1064 5
    skels = devalpha_urls_serieshandler(osversion, skeletons)
1065 5
    for skel in skels:
1066 5
        final = xec.submit(devalpha_urls, osversion, skel, session).result()
1067 5
        if final:
1068 5
            finals[final[0]] = final[1]
1069 5
    return finals
1070
1071
1072 5
def devalpha_urls_bootstrap(osversion, skeletons, session=None):
1073
    """
1074
    Get list of valid Dev Alpha autoloader URLs.
1075
1076
    :param osversion: OS version.
1077
    :type osversion: str
1078
1079
    :param skeletons: List of skeleton formats to try.
1080
    :type skeletons: list
1081
1082
    :param session: Requests session object, default is created on the fly.
1083
    :type session: requests.Session()
1084
    """
1085 5
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as xec:
1086 5
        try:
1087 5
            return devalpha_urls_bulk(osversion, skeletons, xec, session)
1088 5
        except KeyboardInterrupt:
1089 5
            xec.shutdown(wait=False)
1090
1091
1092 5
def dev_dupe_dicter(finals):
1093
    """
1094
    Prepare dictionary to clean duplicate autoloaders.
1095
1096
    :param finals: Dict of URL:content-length pairs.
1097
    :type finals: dict(str: str)
1098
    """
1099 5
    revo = {}
1100 5
    for key, val in finals.items():
1101 5
        revo.setdefault(val, set()).add(key)
1102 5
    return revo
1103
1104
1105 5
def dev_dupe_remover(finals, dupelist):
1106
    """
1107
    Filter dictionary of autoloader entries.
1108
1109
    :param finals: Dict of URL:content-length pairs.
1110
    :type finals: dict(str: str)
1111
1112
    :param dupelist: List of duplicate URLs.
1113
    :type duplist: list(str)
1114
    """
1115 5
    for dupe in dupelist:
1116 5
        for entry in dupe:
1117 5
            if "DevAlpha" in entry:
1118 5
                del finals[entry]
1119 5
    return finals
1120
1121
1122 5
def dev_dupe_cleaner(finals):
1123
    """
1124
    Clean duplicate autoloader entries.
1125
1126
    :param finals: Dict of URL:content-length pairs.
1127
    :type finals: dict(str: str)
1128
    """
1129 5
    revo = dev_dupe_dicter(finals)
1130 5
    dupelist = [val for key, val in revo.items() if len(val) > 1]
1131 5
    finals = dev_dupe_remover(finals, dupelist)
1132
    return finals
1133